src.ctf.lttng-live: make lttng_live_trace::trace_class a bt2::TraceClass::Shared
[babeltrace.git] / src / plugins / ctf / lttng-live / lttng-live.cpp
1 /*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
5 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
6 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
7 *
8 * Babeltrace CTF LTTng-live Client Component
9 */
10
11 #include <glib.h>
12 #include <unistd.h>
13
14 #include "common/assert.h"
15 #include "cpp-common/bt2c/fmt.hpp"
16 #include "cpp-common/vendor/fmt/format.h"
17
18 #include "plugins/common/muxing/muxing.h"
19 #include "plugins/common/param-validation/param-validation.h"
20
21 #include "data-stream.hpp"
22 #include "lttng-live.hpp"
23 #include "metadata.hpp"
24
25 #define MAX_QUERY_SIZE (256 * 1024)
26 #define URL_PARAM "url"
27 #define INPUTS_PARAM "inputs"
28 #define SESS_NOT_FOUND_ACTION_PARAM "session-not-found-action"
29 #define SESS_NOT_FOUND_ACTION_CONTINUE_STR "continue"
30 #define SESS_NOT_FOUND_ACTION_FAIL_STR "fail"
31 #define SESS_NOT_FOUND_ACTION_END_STR "end"
32
33 void lttng_live_stream_iterator_set_state(struct lttng_live_stream_iterator *stream_iter,
34 enum lttng_live_stream_state new_state)
35 {
36 BT_CPPLOGD_SPEC(stream_iter->logger,
37 "Setting live stream iterator state: viewer-stream-id={}, "
38 "old-state={}, new-state={}",
39 stream_iter->viewer_stream_id, stream_iter->state, new_state);
40
41 stream_iter->state = new_state;
42 }
43
44 #define LTTNG_LIVE_LOGD_STREAM_ITER(live_stream_iter) \
45 do { \
46 BT_CPPLOGD_SPEC((live_stream_iter)->logger, \
47 "Live stream iterator state={}, " \
48 "last-inact-ts-is-set={}, last-inact-ts-value={}, " \
49 "curr-inact-ts={}", \
50 (live_stream_iter)->state, (live_stream_iter)->last_inactivity_ts.is_set, \
51 (live_stream_iter)->last_inactivity_ts.value, \
52 (live_stream_iter)->current_inactivity_ts); \
53 } while (0);
54
55 bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter)
56 {
57 bool ret;
58
59 if (!msg_iter) {
60 ret = false;
61 goto end;
62 }
63
64 ret = bt_self_message_iterator_is_interrupted(msg_iter->self_msg_iter);
65
66 end:
67 return ret;
68 }
69
70 static struct lttng_live_trace *
71 lttng_live_session_borrow_trace_by_id(struct lttng_live_session *session, uint64_t trace_id)
72 {
73 uint64_t trace_idx;
74 struct lttng_live_trace *ret_trace = NULL;
75
76 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
77 struct lttng_live_trace *trace =
78 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
79 if (trace->id == trace_id) {
80 ret_trace = trace;
81 goto end;
82 }
83 }
84
85 end:
86 return ret_trace;
87 }
88
89 static void lttng_live_destroy_trace(struct lttng_live_trace *trace)
90 {
91 BT_CPPLOGD_SPEC(trace->logger, "Destroying live trace: trace-id={}", trace->id);
92
93 BT_ASSERT(trace->stream_iterators);
94 g_ptr_array_free(trace->stream_iterators, TRUE);
95
96 BT_TRACE_PUT_REF_AND_RESET(trace->trace);
97
98 lttng_live_metadata_fini(trace);
99 delete trace;
100 }
101
102 static struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
103 uint64_t trace_id)
104 {
105 BT_CPPLOGD_SPEC(session->logger, "Creating live trace: session-id={}, trace-id={}", session->id,
106 trace_id);
107
108 lttng_live_trace *trace = new lttng_live_trace {session->logger};
109 trace->session = session;
110 trace->id = trace_id;
111 trace->trace = NULL;
112 trace->stream_iterators =
113 g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_stream_iterator_destroy);
114 BT_ASSERT(trace->stream_iterators);
115 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
116 g_ptr_array_add(session->traces, trace);
117
118 return trace;
119 }
120
121 struct lttng_live_trace *
122 lttng_live_session_borrow_or_create_trace_by_id(struct lttng_live_session *session,
123 uint64_t trace_id)
124 {
125 struct lttng_live_trace *trace;
126
127 trace = lttng_live_session_borrow_trace_by_id(session, trace_id);
128 if (trace) {
129 goto end;
130 }
131
132 /* The session is the owner of the newly created trace. */
133 trace = lttng_live_create_trace(session, trace_id);
134
135 end:
136 return trace;
137 }
138
139 int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint64_t session_id,
140 const char *hostname, const char *session_name)
141 {
142 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
143 "Adding live session: "
144 "session-id={}, hostname=\"{}\", session-name=\"{}\"",
145 session_id, hostname, session_name);
146
147 lttng_live_session *session = new lttng_live_session {lttng_live_msg_iter->logger};
148 session->self_comp = lttng_live_msg_iter->self_comp;
149 session->id = session_id;
150 session->traces = g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_trace);
151 BT_ASSERT(session->traces);
152 session->lttng_live_msg_iter = lttng_live_msg_iter;
153 session->new_streams_needed = true;
154 session->hostname = g_string_new(hostname);
155 BT_ASSERT(session->hostname);
156
157 session->session_name = g_string_new(session_name);
158 BT_ASSERT(session->session_name);
159
160 g_ptr_array_add(lttng_live_msg_iter->sessions, session);
161
162 return 0;
163 }
164
165 static void lttng_live_destroy_session(struct lttng_live_session *session)
166 {
167 if (!session) {
168 goto end;
169 }
170
171 BT_CPPLOGD_SPEC(session->logger,
172 "Destroying live session: "
173 "session-id={}, session-name=\"{}\"",
174 session->id, session->session_name->str);
175 if (session->id != -1ULL) {
176 if (lttng_live_session_detach(session)) {
177 if (!lttng_live_graph_is_canceled(session->lttng_live_msg_iter)) {
178 /* Old relayd cannot detach sessions. */
179 BT_CPPLOGD_SPEC(session->logger, "Unable to detach lttng live session {}",
180 session->id);
181 }
182 }
183 session->id = -1ULL;
184 }
185
186 if (session->traces) {
187 g_ptr_array_free(session->traces, TRUE);
188 }
189
190 if (session->hostname) {
191 g_string_free(session->hostname, TRUE);
192 }
193
194 if (session->session_name) {
195 g_string_free(session->session_name, TRUE);
196 }
197
198 delete session;
199
200 end:
201 return;
202 }
203
204 static void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter)
205 {
206 if (!lttng_live_msg_iter) {
207 goto end;
208 }
209
210 if (lttng_live_msg_iter->sessions) {
211 g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
212 }
213
214 if (lttng_live_msg_iter->viewer_connection) {
215 live_viewer_connection_destroy(lttng_live_msg_iter->viewer_connection);
216 }
217 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
218 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
219
220 /* All stream iterators must be destroyed at this point. */
221 BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0);
222 lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false;
223
224 delete lttng_live_msg_iter;
225
226 end:
227 return;
228 }
229
230 void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
231 {
232 struct lttng_live_msg_iter *lttng_live_msg_iter;
233
234 BT_ASSERT(self_msg_iter);
235
236 lttng_live_msg_iter =
237 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_iter);
238 BT_ASSERT(lttng_live_msg_iter);
239 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
240 }
241
242 static enum lttng_live_iterator_status
243 lttng_live_iterator_next_check_stream_state(struct lttng_live_stream_iterator *lttng_live_stream)
244 {
245 switch (lttng_live_stream->state) {
246 case LTTNG_LIVE_STREAM_QUIESCENT:
247 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
248 break;
249 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
250 /* Invalid state. */
251 BT_CPPLOGF_SPEC(lttng_live_stream->logger, "Unexpected stream state \"ACTIVE_NO_DATA\"");
252 bt_common_abort();
253 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
254 /* Invalid state. */
255 BT_CPPLOGF_SPEC(lttng_live_stream->logger, "Unexpected stream state \"QUIESCENT_NO_DATA\"");
256 bt_common_abort();
257 case LTTNG_LIVE_STREAM_EOF:
258 break;
259 }
260 return LTTNG_LIVE_ITERATOR_STATUS_OK;
261 }
262
263 /*
264 * For active no data stream, fetch next index. As a result of that it can
265 * become either:
266 * - quiescent: won't have events for a bit,
267 * - have data: need to get that data and produce the event,
268 * - have no data on this stream at this point: need to retry (AGAIN) or return
269 * EOF.
270 */
271 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
272 struct lttng_live_msg_iter *lttng_live_msg_iter,
273 struct lttng_live_stream_iterator *lttng_live_stream)
274 {
275 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
276 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
277 struct packet_index index;
278
279 if (lttng_live_stream->trace->metadata_stream_state ==
280 LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
281 BT_CPPLOGD_SPEC(
282 lttng_live_msg_iter->logger,
283 "Need to get an update for the metadata stream before proceeding further with this stream: "
284 "stream-name=\"{}\"",
285 lttng_live_stream->name->str);
286 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
287 goto end;
288 }
289
290 if (lttng_live_stream->trace->session->new_streams_needed) {
291 BT_CPPLOGD_SPEC(
292 lttng_live_msg_iter->logger,
293 "Need to get an update of all streams before proceeding further with this stream: "
294 "stream-name=\"{}\"",
295 lttng_live_stream->name->str);
296 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
297 goto end;
298 }
299
300 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
301 lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
302 goto end;
303 }
304 ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream, &index);
305 if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
306 goto end;
307 }
308
309 BT_ASSERT_DBG(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
310
311 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
312 uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts.value,
313 curr_inact_ts = lttng_live_stream->current_inactivity_ts;
314
315 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA && last_inact_ts == curr_inact_ts) {
316 /*
317 * Because the stream is in the QUIESCENT_NO_DATA
318 * state, we can assert that the last_inactivity_ts was
319 * set and can be safely used in the `if` above.
320 */
321 BT_ASSERT(lttng_live_stream->last_inactivity_ts.is_set);
322
323 ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
324 LTTNG_LIVE_LOGD_STREAM_ITER(lttng_live_stream);
325 } else {
326 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
327 }
328 goto end;
329 }
330
331 lttng_live_stream->base_offset = index.offset;
332 lttng_live_stream->offset = index.offset;
333 lttng_live_stream->len = index.packet_size / CHAR_BIT;
334
335 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
336 "Setting live stream reading info: stream-name=\"{}\", "
337 "viewer-stream-id={}, stream-base-offset={}, stream-offset={}, stream-len={}",
338 lttng_live_stream->name->str, lttng_live_stream->viewer_stream_id,
339 lttng_live_stream->base_offset, lttng_live_stream->offset,
340 lttng_live_stream->len);
341
342 end:
343 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
344 ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
345 }
346 return ret;
347 }
348
349 /*
350 * Creation of the message requires the ctf trace class to be created
351 * beforehand, but the live protocol gives us all streams (including metadata)
352 * at once. So we split it in three steps: getting streams, getting metadata
353 * (which creates the ctf trace class), and then creating the per-stream
354 * messages.
355 */
356 static enum lttng_live_iterator_status
357 lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
358 struct lttng_live_session *session)
359 {
360 enum lttng_live_iterator_status status;
361 uint64_t trace_idx;
362
363 if (!session->attached) {
364 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Attach to session: session-id={}",
365 session->id);
366 enum lttng_live_viewer_status attach_status =
367 lttng_live_session_attach(session, lttng_live_msg_iter->self_msg_iter);
368 if (attach_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
369 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
370 /*
371 * Clear any causes appended in
372 * `lttng_live_attach_session()` as we want to
373 * return gracefully since the graph was
374 * cancelled.
375 */
376 bt_current_thread_clear_error();
377 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
378 } else {
379 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
380 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
381 "Error attaching to LTTng live session");
382 }
383 goto end;
384 }
385 }
386
387 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
388 "Updating all data streams: "
389 "session-id={}, session-name=\"{}\"",
390 session->id, session->session_name->str);
391
392 status = lttng_live_session_get_new_streams(session, lttng_live_msg_iter->self_msg_iter);
393 switch (status) {
394 case LTTNG_LIVE_ITERATOR_STATUS_OK:
395 break;
396 case LTTNG_LIVE_ITERATOR_STATUS_END:
397 /*
398 * We received a `_END` from the `_get_new_streams()` function,
399 * which means no more data will ever be received from the data
400 * streams of this session. But it's possible that the metadata
401 * is incomplete.
402 * The live protocol guarantees that we receive all the
403 * metadata needed before we receive data streams needing it.
404 * But it's possible to receive metadata NOT needed by
405 * data streams after the session was closed. For example, this
406 * could happen if a new event is registered and the session is
407 * stopped before any tracepoint for that event is actually
408 * fired.
409 */
410 BT_CPPLOGD_SPEC(
411 lttng_live_msg_iter->logger,
412 "Updating streams returned _END status. Override status to _OK in order fetch any remaining metadata:"
413 "session-id={}, session-name=\"{}\"",
414 session->id, session->session_name->str);
415 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
416 break;
417 default:
418 goto end;
419 }
420
421 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
422 "Updating metadata stream for session: "
423 "session-id={}, session-name=\"{}\"",
424 session->id, session->session_name->str);
425
426 trace_idx = 0;
427 while (trace_idx < session->traces->len) {
428 struct lttng_live_trace *trace =
429 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
430
431 status = lttng_live_metadata_update(trace);
432 switch (status) {
433 case LTTNG_LIVE_ITERATOR_STATUS_END:
434 case LTTNG_LIVE_ITERATOR_STATUS_OK:
435 trace_idx++;
436 break;
437 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
438 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
439 goto end;
440 default:
441 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
442 "Error updating trace metadata: "
443 "stream-iter-status={}, trace-id={}",
444 status, trace->id);
445 goto end;
446 }
447 }
448
449 /*
450 * Now that we have the metadata we can initialize the downstream
451 * iterator.
452 */
453 status = lttng_live_lazy_msg_init(session, lttng_live_msg_iter->self_msg_iter);
454
455 end:
456 return status;
457 }
458
459 static void
460 lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
461 {
462 uint64_t session_idx, trace_idx;
463
464 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
465 struct lttng_live_session *session =
466 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
467 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
468 "Force marking session as needing new streams: "
469 "session-id={}",
470 session->id);
471 session->new_streams_needed = true;
472 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
473 struct lttng_live_trace *trace =
474 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
475 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
476 "Force marking trace metadata state as needing an update: "
477 "session-id={}, trace-id={}",
478 session->id, trace->id);
479
480 BT_ASSERT(trace->metadata_stream_state != LTTNG_LIVE_METADATA_STREAM_STATE_CLOSED);
481
482 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
483 }
484 }
485 }
486
487 static enum lttng_live_iterator_status
488 lttng_live_iterator_handle_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
489 {
490 enum lttng_live_iterator_status status;
491 enum lttng_live_viewer_status viewer_status;
492 uint64_t session_idx = 0, nr_sessions_opened = 0;
493 struct lttng_live_session *session;
494 enum session_not_found_action sess_not_found_act =
495 lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
496
497 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
498 "Update data and metadata of all sessions: "
499 "live-msg-iter-addr={}",
500 fmt::ptr(lttng_live_msg_iter));
501 /*
502 * In a remotely distant future, we could add a "new
503 * session" flag to the protocol, which would tell us that we
504 * need to query for new sessions even though we have sessions
505 * currently ongoing.
506 */
507 if (lttng_live_msg_iter->sessions->len == 0) {
508 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
509 BT_CPPLOGD_SPEC(
510 lttng_live_msg_iter->logger,
511 "No session found. Exiting in accordance with the `session-not-found-action` parameter");
512 status = LTTNG_LIVE_ITERATOR_STATUS_END;
513 goto end;
514 } else {
515 BT_CPPLOGD_SPEC(
516 lttng_live_msg_iter->logger,
517 "No session found. Try creating a new one in accordance with the `session-not-found-action` parameter");
518 /*
519 * Retry to create a viewer session for the requested
520 * session name.
521 */
522 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
523 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
524 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
525 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
526 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
527 "Error creating LTTng live viewer session");
528 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
529 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
530 } else {
531 bt_common_abort();
532 }
533 goto end;
534 }
535 }
536 }
537
538 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
539 session =
540 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
541 status = lttng_live_get_session(lttng_live_msg_iter, session);
542 switch (status) {
543 case LTTNG_LIVE_ITERATOR_STATUS_OK:
544 case LTTNG_LIVE_ITERATOR_STATUS_END:
545 /*
546 * A session returned `_END`. Other sessions may still
547 * be active so we override the status and continue
548 * looping if needed.
549 */
550 break;
551 default:
552 goto end;
553 }
554 if (!session->closed) {
555 nr_sessions_opened++;
556 }
557 }
558
559 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE && nr_sessions_opened == 0) {
560 status = LTTNG_LIVE_ITERATOR_STATUS_END;
561 } else {
562 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
563 }
564
565 end:
566 return status;
567 }
568
569 static enum lttng_live_iterator_status
570 emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
571 struct lttng_live_stream_iterator *stream_iter, const bt_message **message,
572 uint64_t timestamp)
573 {
574 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
575 bt_message *msg = NULL;
576
577 BT_ASSERT(stream_iter->trace->clock_class);
578
579 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
580 "Emitting inactivity message for stream: ctf-stream-id={}, "
581 "viewer-stream-id={}, timestamp={}",
582 stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id,
583 timestamp);
584
585 msg = bt_message_message_iterator_inactivity_create(lttng_live_msg_iter->self_msg_iter,
586 stream_iter->trace->clock_class, timestamp);
587 if (!msg) {
588 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
589 "Error emitting message iterator inactivity message");
590 goto error;
591 }
592
593 *message = msg;
594 end:
595 return ret;
596
597 error:
598 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
599 bt_message_put_ref(msg);
600 goto end;
601 }
602
603 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
604 struct lttng_live_msg_iter *lttng_live_msg_iter,
605 struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
606 {
607 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
608
609 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
610 return LTTNG_LIVE_ITERATOR_STATUS_OK;
611 }
612
613 /*
614 * Check if we already sent an inactivty message downstream for this
615 * `current_inactivity_ts` value.
616 */
617 if (lttng_live_stream->last_inactivity_ts.is_set &&
618 lttng_live_stream->current_inactivity_ts == lttng_live_stream->last_inactivity_ts.value) {
619 lttng_live_stream_iterator_set_state(lttng_live_stream,
620 LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA);
621
622 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
623 goto end;
624 }
625
626 ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream, message,
627 lttng_live_stream->current_inactivity_ts);
628
629 lttng_live_stream->last_inactivity_ts.value = lttng_live_stream->current_inactivity_ts;
630 lttng_live_stream->last_inactivity_ts.is_set = true;
631 end:
632 return ret;
633 }
634
635 static int live_get_msg_ts_ns(struct lttng_live_msg_iter *lttng_live_msg_iter,
636 const bt_message *msg, int64_t last_msg_ts_ns, int64_t *ts_ns)
637 {
638 const bt_clock_snapshot *clock_snapshot = NULL;
639 int ret = 0;
640
641 BT_ASSERT_DBG(msg);
642 BT_ASSERT_DBG(ts_ns);
643
644 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
645 "Getting message's timestamp: iter-data-addr={}, msg-addr={}, "
646 "last-msg-ts={}",
647 fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns);
648
649 switch (bt_message_get_type(msg)) {
650 case BT_MESSAGE_TYPE_EVENT:
651 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(msg);
652 break;
653 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
654 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
655 break;
656 case BT_MESSAGE_TYPE_PACKET_END:
657 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
658 break;
659 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
660 clock_snapshot =
661 bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
662 break;
663 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
664 clock_snapshot =
665 bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
666 break;
667 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
668 clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const(msg);
669 break;
670 default:
671 /* All the other messages have a higher priority */
672 BT_CPPLOGD_STR_SPEC(lttng_live_msg_iter->logger,
673 "Message has no timestamp: using the last message timestamp.");
674 *ts_ns = last_msg_ts_ns;
675 goto end;
676 }
677
678 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
679 if (ret) {
680 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
681 "Cannot get nanoseconds from Epoch of clock snapshot: "
682 "clock-snapshot-addr={}",
683 fmt::ptr(clock_snapshot));
684 goto error;
685 }
686
687 goto end;
688
689 error:
690 ret = -1;
691
692 end:
693 if (ret == 0) {
694 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
695 "Found message's timestamp: iter-data-addr={}, msg-addr={}, "
696 "last-msg-ts={}, ts={}",
697 fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns, *ts_ns);
698 }
699
700 return ret;
701 }
702
703 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
704 struct lttng_live_msg_iter *lttng_live_msg_iter,
705 struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
706 {
707 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
708 enum ctf_msg_iter_status status;
709 uint64_t session_idx, trace_idx;
710
711 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
712 struct lttng_live_session *session =
713 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
714
715 if (session->new_streams_needed) {
716 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
717 "Need an update for streams: "
718 "session-id={}",
719 session->id);
720 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
721 goto end;
722 }
723 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
724 struct lttng_live_trace *trace =
725 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
726 if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
727 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
728 "Need an update for metadata stream: "
729 "session-id={}, trace-id={}",
730 session->id, trace->id);
731 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
732 goto end;
733 }
734 }
735 }
736
737 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
738 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
739 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
740 "Invalid state of live stream iterator"
741 "stream-iter-status={}",
742 lttng_live_stream->state);
743 goto end;
744 }
745
746 status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter, message);
747 switch (status) {
748 case CTF_MSG_ITER_STATUS_EOF:
749 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
750 break;
751 case CTF_MSG_ITER_STATUS_OK:
752 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
753 break;
754 case CTF_MSG_ITER_STATUS_AGAIN:
755 /*
756 * Continue immediately (end of packet). The next
757 * get_index may return AGAIN to delay the following
758 * attempt.
759 */
760 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
761 break;
762 case CTF_MSG_ITER_STATUS_ERROR:
763 default:
764 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
765 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
766 "CTF message iterator failed to get next message: "
767 "msg-iter={}, msg-iter-status={}",
768 fmt::ptr(lttng_live_stream->msg_iter), status);
769 break;
770 }
771
772 end:
773 return ret;
774 }
775
776 static enum lttng_live_iterator_status
777 lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
778 struct lttng_live_stream_iterator *stream_iter,
779 const bt_message **curr_msg)
780 {
781 enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
782
783 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
784 "Closing live stream iterator: stream-name=\"{}\", "
785 "viewer-stream-id={}",
786 stream_iter->name->str, stream_iter->viewer_stream_id);
787
788 /*
789 * The viewer has hung up on us so we are closing the stream. The
790 * `ctf_msg_iter` should simply realize that it needs to close the
791 * stream properly by emitting the necessary stream end message.
792 */
793 enum ctf_msg_iter_status status =
794 ctf_msg_iter_get_next_message(stream_iter->msg_iter, curr_msg);
795
796 if (status == CTF_MSG_ITER_STATUS_ERROR) {
797 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
798 "Error getting the next message from CTF message iterator");
799 live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
800 goto end;
801 } else if (status == CTF_MSG_ITER_STATUS_EOF) {
802 BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger,
803 "Reached the end of the live stream iterator.");
804 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
805 goto end;
806 }
807
808 BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
809
810 end:
811 return live_status;
812 }
813
814 /*
815 * helper function:
816 * handle_no_data_streams()
817 * retry:
818 * - for each ACTIVE_NO_DATA stream:
819 * - query relayd for stream data, or quiescence info.
820 * - if need metadata, get metadata, goto retry.
821 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
822 * - if quiescent, move to QUIESCENT streams
823 * - if fetched data, move to ACTIVE_DATA streams
824 * (at this point each stream either has data, or is quiescent)
825 *
826 *
827 * iterator_next:
828 * handle_new_streams_and_metadata()
829 * - query relayd for known streams, add them as ACTIVE_NO_DATA
830 * - query relayd for metadata
831 *
832 * call handle_active_no_data_streams()
833 *
834 * handle_quiescent_streams()
835 * - if at least one stream is ACTIVE_DATA:
836 * - peek stream event with lowest timestamp -> next_ts
837 * - for each quiescent stream
838 * - if next_ts >= quiescent end
839 * - set state to ACTIVE_NO_DATA
840 * - else
841 * - for each quiescent stream
842 * - set state to ACTIVE_NO_DATA
843 *
844 * call handle_active_no_data_streams()
845 *
846 * handle_active_data_streams()
847 * - if at least one stream is ACTIVE_DATA:
848 * - get stream event with lowest timestamp from heap
849 * - make that stream event the current message.
850 * - move this stream heap position to its next event
851 * - if we need to fetch data from relayd, move
852 * stream to ACTIVE_NO_DATA.
853 * - return OK
854 * - return AGAIN
855 *
856 * end criterion: ctrl-c on client. If relayd exits or the session
857 * closes on the relay daemon side, we keep on waiting for streams.
858 * Eventually handle --end timestamp (also an end criterion).
859 *
860 * When disconnected from relayd: try to re-connect endlessly.
861 */
862 static enum lttng_live_iterator_status
863 lttng_live_iterator_next_msg_on_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
864 struct lttng_live_stream_iterator *stream_iter,
865 const bt_message **curr_msg)
866 {
867 enum lttng_live_iterator_status live_status;
868
869 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
870 "Advancing live stream iterator until next message if possible: "
871 "stream-name=\"{}\", viewer-stream-id={}",
872 stream_iter->name->str, stream_iter->viewer_stream_id);
873
874 if (stream_iter->has_stream_hung_up) {
875 /*
876 * The stream has hung up and the stream was properly closed
877 * during the last call to the current function. Return _END
878 * status now so that this stream iterator is removed for the
879 * stream iterator list.
880 */
881 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
882 goto end;
883 }
884
885 retry:
886 LTTNG_LIVE_LOGD_STREAM_ITER(stream_iter);
887
888 /*
889 * Make sure we have the most recent metadata and possibly some new
890 * streams.
891 */
892 live_status = lttng_live_iterator_handle_new_streams_and_metadata(lttng_live_msg_iter);
893 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
894 goto end;
895 }
896
897 live_status =
898 lttng_live_iterator_next_handle_one_no_data_stream(lttng_live_msg_iter, stream_iter);
899 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
900 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
901 /*
902 * We overwrite `live_status` since `curr_msg` is
903 * likely set to a valid message in this function.
904 */
905 live_status =
906 lttng_live_iterator_close_stream(lttng_live_msg_iter, stream_iter, curr_msg);
907 }
908 goto end;
909 }
910
911 live_status = lttng_live_iterator_next_handle_one_quiescent_stream(lttng_live_msg_iter,
912 stream_iter, curr_msg);
913 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
914 BT_ASSERT(!*curr_msg);
915 goto end;
916 }
917 if (*curr_msg) {
918 goto end;
919 }
920 live_status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live_msg_iter,
921 stream_iter, curr_msg);
922 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
923 BT_ASSERT(!*curr_msg);
924 }
925
926 end:
927 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
928 BT_CPPLOGD_SPEC(
929 lttng_live_msg_iter->logger,
930 "Ask the relay daemon for an updated view of the data and metadata streams");
931 goto retry;
932 }
933
934 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
935 "Returning from advancing live stream iterator: status={}, "
936 "stream-name=\"{}\", viewer-stream-id={}",
937 live_status, stream_iter->name->str, stream_iter->viewer_stream_id);
938
939 return live_status;
940 }
941
942 static bool is_discarded_packet_or_event_message(const bt_message *msg)
943 {
944 const enum bt_message_type msg_type = bt_message_get_type(msg);
945
946 return msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS ||
947 msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS;
948 }
949
950 static enum lttng_live_iterator_status
951 adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream,
952 const bt_message *msg_in, bt_message **msg_out,
953 uint64_t new_begin_ts)
954 {
955 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
956 enum bt_property_availability availability;
957 const bt_clock_snapshot *clock_snapshot;
958 uint64_t end_ts;
959 uint64_t count;
960
961 clock_snapshot = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg_in);
962 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
963
964 availability = bt_message_discarded_packets_get_count(msg_in, &count);
965 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
966
967 *msg_out = bt_message_discarded_packets_create_with_default_clock_snapshots(
968 iter, stream, new_begin_ts, end_ts);
969 if (!*msg_out) {
970 status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
971 goto end;
972 }
973
974 bt_message_discarded_packets_set_count(*msg_out, count);
975 end:
976 return status;
977 }
978
979 static enum lttng_live_iterator_status
980 adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream,
981 const bt_message *msg_in, bt_message **msg_out,
982 uint64_t new_begin_ts)
983 {
984 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
985 enum bt_property_availability availability;
986 const bt_clock_snapshot *clock_snapshot;
987 uint64_t end_ts;
988 uint64_t count;
989
990 clock_snapshot = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg_in);
991 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
992
993 availability = bt_message_discarded_events_get_count(msg_in, &count);
994 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
995
996 *msg_out = bt_message_discarded_events_create_with_default_clock_snapshots(
997 iter, stream, new_begin_ts, end_ts);
998 if (!*msg_out) {
999 status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
1000 goto end;
1001 }
1002
1003 bt_message_discarded_events_set_count(*msg_out, count);
1004 end:
1005 return status;
1006 }
1007
1008 static enum lttng_live_iterator_status
1009 handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
1010 struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns,
1011 const bt_message *late_msg)
1012 {
1013 const bt_clock_class *clock_class;
1014 const bt_stream_class *stream_class;
1015 enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status;
1016 int64_t last_inactivity_ts_ns;
1017 enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1018 enum lttng_live_iterator_status adjust_status;
1019 bt_message *adjusted_message;
1020
1021 /*
1022 * The timestamp of the current message is before the last message sent
1023 * by this component. We CANNOT send it as is.
1024 *
1025 * The only expected scenario in which that could happen is the
1026 * following, everything else is a bug in this component, relay daemon,
1027 * or CTF parser.
1028 *
1029 * Expected scenario: The CTF message iterator emitted discarded
1030 * packets and discarded events with synthesized beginning and end
1031 * timestamps from the bounds of the last known packet and the newly
1032 * decoded packet header. The CTF message iterator is not aware of
1033 * stream inactivity beacons. Hence, we have to adjust the beginning
1034 * timestamp of those types of messages if a stream signalled its
1035 * inactivity up until _after_ the last known packet's beginning
1036 * timestamp.
1037 *
1038 * Otherwise, the monotonicity guarantee of message timestamps would
1039 * not be preserved.
1040 *
1041 * In short, the only scenario in which it's okay and fixable to
1042 * received a late message is when:
1043 * 1. the late message is a discarded packets or discarded events
1044 * message,
1045 * 2. this stream produced an inactivity message downstream, and
1046 * 3. the timestamp of the late message is within the inactivity
1047 * timespan we sent downstream through the inactivity message.
1048 */
1049
1050 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1051 "Handling late message on live stream iterator: "
1052 "stream-name=\"{}\", viewer-stream-id={}",
1053 stream_iter->name->str, stream_iter->viewer_stream_id);
1054
1055 if (!stream_iter->last_inactivity_ts.is_set) {
1056 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1057 "Invalid live stream state: "
1058 "have a late message when no inactivity message "
1059 "was ever sent for that stream.");
1060 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1061 goto end;
1062 }
1063
1064 if (!is_discarded_packet_or_event_message(late_msg)) {
1065 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1066 "Invalid live stream state: "
1067 "have a late message that is not a packet discarded or "
1068 "event discarded message: late-msg-type={}",
1069 static_cast<bt2::MessageType>(bt_message_get_type(late_msg)));
1070 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1071 goto end;
1072 }
1073
1074 stream_class = bt_stream_borrow_class_const(stream_iter->stream);
1075 clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class);
1076
1077 ts_ns_status = bt_clock_class_cycles_to_ns_from_origin(
1078 clock_class, stream_iter->last_inactivity_ts.value, &last_inactivity_ts_ns);
1079 if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) {
1080 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1081 "Error converting last "
1082 "inactivity message timestamp to nanoseconds");
1083 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1084 goto end;
1085 }
1086
1087 if (last_inactivity_ts_ns <= late_msg_ts_ns) {
1088 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1089 "Invalid live stream state: "
1090 "have a late message that is none included in a stream "
1091 "inactivity timespan: last-inactivity-ts-ns={}, "
1092 "late-msg-ts-ns={}",
1093 last_inactivity_ts_ns, late_msg_ts_ns);
1094 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1095 goto end;
1096 }
1097
1098 /*
1099 * We now know that it's okay for this message to be late, we can now
1100 * adjust its timestamp to ensure monotonicity.
1101 */
1102 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1103 "Adjusting the timestamp of late message: late-msg-type={}, "
1104 "msg-new-ts-ns={}",
1105 static_cast<bt2::MessageType>(bt_message_get_type(late_msg)),
1106 stream_iter->last_inactivity_ts.value);
1107 switch (bt_message_get_type(late_msg)) {
1108 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1109 adjust_status = adjust_discarded_events_message(
1110 lttng_live_msg_iter->self_msg_iter, stream_iter->stream, late_msg, &adjusted_message,
1111 stream_iter->last_inactivity_ts.value);
1112 break;
1113 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1114 adjust_status = adjust_discarded_packets_message(
1115 lttng_live_msg_iter->self_msg_iter, stream_iter->stream, late_msg, &adjusted_message,
1116 stream_iter->last_inactivity_ts.value);
1117 break;
1118 default:
1119 bt_common_abort();
1120 }
1121
1122 if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1123 stream_iter_status = adjust_status;
1124 goto end;
1125 }
1126
1127 BT_ASSERT_DBG(adjusted_message);
1128 stream_iter->current_msg = adjusted_message;
1129 stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
1130 bt_message_put_ref(late_msg);
1131
1132 end:
1133 return stream_iter_status;
1134 }
1135
1136 static enum lttng_live_iterator_status
1137 next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
1138 struct lttng_live_trace *live_trace,
1139 struct lttng_live_stream_iterator **youngest_trace_stream_iter)
1140 {
1141 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
1142 enum lttng_live_iterator_status stream_iter_status;
1143 int64_t youngest_candidate_msg_ts = INT64_MAX;
1144 uint64_t stream_iter_idx;
1145
1146 BT_ASSERT_DBG(live_trace);
1147 BT_ASSERT_DBG(live_trace->stream_iterators);
1148
1149 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1150 "Finding the next stream iterator for trace: "
1151 "trace-id={}",
1152 live_trace->id);
1153 /*
1154 * Update the current message of every stream iterators of this trace.
1155 * The current msg of every stream must have a timestamp equal or
1156 * larger than the last message returned by this iterator. We must
1157 * ensure monotonicity.
1158 */
1159 stream_iter_idx = 0;
1160 while (stream_iter_idx < live_trace->stream_iterators->len) {
1161 bool stream_iter_is_ended = false;
1162 struct lttng_live_stream_iterator *stream_iter =
1163 (lttng_live_stream_iterator *) g_ptr_array_index(live_trace->stream_iterators,
1164 stream_iter_idx);
1165
1166 /*
1167 * If there is no current message for this stream, go fetch
1168 * one.
1169 */
1170 while (!stream_iter->current_msg) {
1171 const bt_message *msg = NULL;
1172 int64_t curr_msg_ts_ns = INT64_MAX;
1173
1174 stream_iter_status =
1175 lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, &msg);
1176
1177 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1178 stream_iter_is_ended = true;
1179 break;
1180 }
1181
1182 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1183 goto end;
1184 }
1185
1186 BT_ASSERT_DBG(msg);
1187
1188 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1189 "Live stream iterator returned message: msg-type={}, "
1190 "stream-name=\"{}\", viewer-stream-id={}",
1191 static_cast<bt2::MessageType>(bt_message_get_type(msg)),
1192 stream_iter->name->str, stream_iter->viewer_stream_id);
1193
1194 /*
1195 * Get the timestamp in nanoseconds from origin of this
1196 * message.
1197 */
1198 live_get_msg_ts_ns(lttng_live_msg_iter, msg, lttng_live_msg_iter->last_msg_ts_ns,
1199 &curr_msg_ts_ns);
1200
1201 /*
1202 * Check if the message of the current live stream
1203 * iterator occurred at the exact same time or after the
1204 * last message returned by this component's message
1205 * iterator. If not, we need to handle it with care.
1206 */
1207 if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
1208 stream_iter->current_msg = msg;
1209 stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
1210 } else {
1211 /*
1212 * We received a message from the past. This
1213 * may be fixable but it can also be an error.
1214 */
1215 stream_iter_status =
1216 handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, msg);
1217 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1218 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1219 "Late message could not be handled correctly: "
1220 "lttng-live-msg-iter-addr={}, "
1221 "stream-name=\"{}\", "
1222 "curr-msg-ts={}, last-msg-ts={}",
1223 fmt::ptr(lttng_live_msg_iter),
1224 stream_iter->name->str, curr_msg_ts_ns,
1225 lttng_live_msg_iter->last_msg_ts_ns);
1226 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1227 goto end;
1228 }
1229 }
1230 }
1231
1232 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1233
1234 if (!stream_iter_is_ended) {
1235 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1236 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1237 /*
1238 * Update the current best candidate message
1239 * for the stream iterator of this live trace
1240 * to be forwarded downstream.
1241 */
1242 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1243 youngest_candidate_stream_iter = stream_iter;
1244 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1245 /*
1246 * Order the messages in an arbitrary but
1247 * deterministic way.
1248 */
1249 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1250 int ret = common_muxing_compare_messages(
1251 stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
1252 if (ret < 0) {
1253 /*
1254 * The `youngest_candidate_stream_iter->current_msg`
1255 * should go first. Update the next
1256 * iterator and the current timestamp.
1257 */
1258 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1259 youngest_candidate_stream_iter = stream_iter;
1260 } else if (ret == 0) {
1261 /*
1262 * Unable to pick which one should go
1263 * first.
1264 */
1265 BT_CPPLOGW_SPEC(
1266 lttng_live_msg_iter->logger,
1267 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1268 "stream-iter-addr={}"
1269 "stream-iter-addr={}",
1270 fmt::ptr(stream_iter), fmt::ptr(youngest_candidate_stream_iter));
1271 }
1272 }
1273
1274 stream_iter_idx++;
1275 } else {
1276 /*
1277 * The live stream iterator has ended. That
1278 * iterator is removed from the array, but
1279 * there is no need to increment
1280 * stream_iter_idx as
1281 * g_ptr_array_remove_index_fast replaces the
1282 * removed element with the array's last
1283 * element.
1284 */
1285 g_ptr_array_remove_index_fast(live_trace->stream_iterators, stream_iter_idx);
1286 }
1287 }
1288
1289 if (youngest_candidate_stream_iter) {
1290 *youngest_trace_stream_iter = youngest_candidate_stream_iter;
1291 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1292 } else {
1293 /*
1294 * The only case where we don't have a candidate for this trace
1295 * is if we reached the end of all the iterators.
1296 */
1297 BT_ASSERT(live_trace->stream_iterators->len == 0);
1298 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1299 }
1300
1301 end:
1302 return stream_iter_status;
1303 }
1304
1305 static enum lttng_live_iterator_status
1306 next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
1307 struct lttng_live_session *session,
1308 struct lttng_live_stream_iterator **youngest_session_stream_iter)
1309 {
1310 enum lttng_live_iterator_status stream_iter_status;
1311 uint64_t trace_idx = 0;
1312 int64_t youngest_candidate_msg_ts = INT64_MAX;
1313 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
1314
1315 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1316 "Finding the next stream iterator for session: "
1317 "session-id={}",
1318 session->id);
1319 /*
1320 * Make sure we are attached to the session and look for new streams
1321 * and metadata.
1322 */
1323 stream_iter_status = lttng_live_get_session(lttng_live_msg_iter, session);
1324 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
1325 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE &&
1326 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) {
1327 goto end;
1328 }
1329
1330 BT_ASSERT_DBG(session->traces);
1331
1332 while (trace_idx < session->traces->len) {
1333 bool trace_is_ended = false;
1334 struct lttng_live_stream_iterator *stream_iter;
1335 struct lttng_live_trace *trace =
1336 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
1337
1338 stream_iter_status =
1339 next_stream_iterator_for_trace(lttng_live_msg_iter, trace, &stream_iter);
1340 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1341 /*
1342 * All the live stream iterators for this trace are
1343 * ENDed. Remove the trace from this session.
1344 */
1345 trace_is_ended = true;
1346 } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1347 goto end;
1348 }
1349
1350 if (!trace_is_ended) {
1351 BT_ASSERT_DBG(stream_iter);
1352
1353 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1354 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1355 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1356 youngest_candidate_stream_iter = stream_iter;
1357 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1358 /*
1359 * Order the messages in an arbitrary but
1360 * deterministic way.
1361 */
1362 int ret = common_muxing_compare_messages(
1363 stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
1364 if (ret < 0) {
1365 /*
1366 * The `youngest_candidate_stream_iter->current_msg`
1367 * should go first. Update the next iterator
1368 * and the current timestamp.
1369 */
1370 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1371 youngest_candidate_stream_iter = stream_iter;
1372 } else if (ret == 0) {
1373 /* Unable to pick which one should go first. */
1374 BT_CPPLOGW_SPEC(
1375 lttng_live_msg_iter->logger,
1376 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1377 "stream-iter-addr={}"
1378 "youngest-candidate-stream-iter-addr={}",
1379 fmt::ptr(stream_iter), fmt::ptr(youngest_candidate_stream_iter));
1380 }
1381 }
1382 trace_idx++;
1383 } else {
1384 /*
1385 * trace_idx is not incremented since
1386 * g_ptr_array_remove_index_fast replaces the
1387 * element at trace_idx with the array's last element.
1388 */
1389 g_ptr_array_remove_index_fast(session->traces, trace_idx);
1390 }
1391 }
1392 if (youngest_candidate_stream_iter) {
1393 *youngest_session_stream_iter = youngest_candidate_stream_iter;
1394 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1395 } else {
1396 /*
1397 * The only cases where we don't have a candidate for this
1398 * trace is:
1399 * 1. if we reached the end of all the iterators of all the
1400 * traces of this session,
1401 * 2. if we never had live stream iterator in the first place.
1402 *
1403 * In either cases, we return END.
1404 */
1405 BT_ASSERT(session->traces->len == 0);
1406 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1407 }
1408 end:
1409 return stream_iter_status;
1410 }
1411
1412 static inline void put_messages(bt_message_array_const msgs, uint64_t count)
1413 {
1414 uint64_t i;
1415
1416 for (i = 0; i < count; i++) {
1417 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
1418 }
1419 }
1420
1421 bt_message_iterator_class_next_method_status
1422 lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array_const msgs,
1423 uint64_t capacity, uint64_t *count)
1424 {
1425 try {
1426 bt_message_iterator_class_next_method_status status;
1427 enum lttng_live_viewer_status viewer_status;
1428 struct lttng_live_msg_iter *lttng_live_msg_iter =
1429 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_it);
1430 struct lttng_live_component *lttng_live = lttng_live_msg_iter->lttng_live_comp;
1431 enum lttng_live_iterator_status stream_iter_status;
1432 uint64_t session_idx;
1433
1434 *count = 0;
1435
1436 BT_ASSERT_DBG(lttng_live_msg_iter);
1437
1438 if (G_UNLIKELY(lttng_live_msg_iter->was_interrupted)) {
1439 /*
1440 * The iterator was interrupted in a previous call to the
1441 * `_next()` method. We currently do not support generating
1442 * messages after such event. The babeltrace2 CLI should never
1443 * be running the graph after being interrupted. So this check
1444 * is to prevent other graph users from using this live
1445 * iterator in an messed up internal state.
1446 */
1447 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1448 BT_CPPLOGE_APPEND_CAUSE_SPEC(
1449 lttng_live_msg_iter->logger,
1450 "Message iterator was interrupted during a previous call to the `next()` and currently does not support continuing after such event.");
1451 goto end;
1452 }
1453
1454 /*
1455 * Clear all the invalid message reference that might be left over in
1456 * the output array.
1457 */
1458 memset(msgs, 0, capacity * sizeof(*msgs));
1459
1460 /*
1461 * If no session are exposed on the relay found at the url provided by
1462 * the user, session count will be 0. In this case, we return status
1463 * end to return gracefully.
1464 */
1465 if (lttng_live_msg_iter->sessions->len == 0) {
1466 if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
1467 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1468 goto end;
1469 } else {
1470 /*
1471 * The are no more active session for this session
1472 * name. Retry to create a viewer session for the
1473 * requested session name.
1474 */
1475 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1476 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1477 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1478 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1479 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1480 "Error creating LTTng live viewer session");
1481 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1482 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1483 } else {
1484 bt_common_abort();
1485 }
1486 goto end;
1487 }
1488 }
1489 }
1490
1491 if (lttng_live_msg_iter->active_stream_iter == 0) {
1492 lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter);
1493 }
1494
1495 /*
1496 * Here the muxing of message is done.
1497 *
1498 * We need to iterate over all the streams of all the traces of all the
1499 * viewer sessions in order to get the message with the smallest
1500 * timestamp. In this case, a session is a viewer session and there is
1501 * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
1502 * kernel). Each viewer session can have multiple traces, for example,
1503 * 64bit UST viewer sessions could have multiple per-pid traces.
1504 *
1505 * We iterate over the streams of each traces to update and see what is
1506 * their next message's timestamp. From those timestamps, we select the
1507 * message with the smallest timestamp as the best candidate message
1508 * for that trace and do the same thing across all the sessions.
1509 *
1510 * We then compare the timestamp of best candidate message of all the
1511 * sessions to pick the message with the smallest timestamp and we
1512 * return it.
1513 */
1514 while (*count < capacity) {
1515 struct lttng_live_stream_iterator *youngest_stream_iter = NULL,
1516 *candidate_stream_iter = NULL;
1517 int64_t youngest_msg_ts_ns = INT64_MAX;
1518
1519 BT_ASSERT_DBG(lttng_live_msg_iter->sessions);
1520 session_idx = 0;
1521 while (session_idx < lttng_live_msg_iter->sessions->len) {
1522 struct lttng_live_session *session = (lttng_live_session *) g_ptr_array_index(
1523 lttng_live_msg_iter->sessions, session_idx);
1524
1525 /* Find the best candidate message to send downstream. */
1526 stream_iter_status = next_stream_iterator_for_session(lttng_live_msg_iter, session,
1527 &candidate_stream_iter);
1528
1529 /* If we receive an END status, it means that either:
1530 * - Those traces never had active streams (UST with no
1531 * data produced yet),
1532 * - All live stream iterators have ENDed.*/
1533 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1534 if (session->closed && session->traces->len == 0) {
1535 /*
1536 * Remove the session from the list.
1537 * session_idx is not modified since
1538 * g_ptr_array_remove_index_fast
1539 * replaces the the removed element with
1540 * the array's last element.
1541 */
1542 g_ptr_array_remove_index_fast(lttng_live_msg_iter->sessions, session_idx);
1543 } else {
1544 session_idx++;
1545 }
1546 continue;
1547 }
1548
1549 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1550 goto return_status;
1551 }
1552
1553 if (G_UNLIKELY(youngest_stream_iter == NULL) ||
1554 candidate_stream_iter->current_msg_ts_ns < youngest_msg_ts_ns) {
1555 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1556 youngest_stream_iter = candidate_stream_iter;
1557 } else if (candidate_stream_iter->current_msg_ts_ns == youngest_msg_ts_ns) {
1558 /*
1559 * The currently selected message to be sent
1560 * downstream next has the exact same timestamp
1561 * that of the current candidate message. We
1562 * must break the tie in a predictable manner.
1563 */
1564 BT_CPPLOGD_STR_SPEC(
1565 lttng_live_msg_iter->logger,
1566 "Two of the next message candidates have the same timestamps, pick one deterministically.");
1567 /*
1568 * Order the messages in an arbitrary but
1569 * deterministic way.
1570 */
1571 int ret = common_muxing_compare_messages(candidate_stream_iter->current_msg,
1572 youngest_stream_iter->current_msg);
1573 if (ret < 0) {
1574 /*
1575 * The `candidate_stream_iter->current_msg`
1576 * should go first. Update the next
1577 * iterator and the current timestamp.
1578 */
1579 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1580 youngest_stream_iter = candidate_stream_iter;
1581 } else if (ret == 0) {
1582 /* Unable to pick which one should go first. */
1583 BT_CPPLOGW_SPEC(
1584 lttng_live_msg_iter->logger,
1585 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1586 "next-stream-iter-addr={}"
1587 "candidate-stream-iter-addr={}",
1588 fmt::ptr(youngest_stream_iter), fmt::ptr(candidate_stream_iter));
1589 }
1590 }
1591
1592 session_idx++;
1593 }
1594
1595 if (!youngest_stream_iter) {
1596 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1597 goto return_status;
1598 }
1599
1600 BT_ASSERT_DBG(youngest_stream_iter->current_msg);
1601 /* Ensure monotonicity. */
1602 BT_ASSERT_DBG(lttng_live_msg_iter->last_msg_ts_ns <=
1603 youngest_stream_iter->current_msg_ts_ns);
1604
1605 /*
1606 * Insert the next message to the message batch. This will set
1607 * stream iterator current message to NULL so that next time
1608 * we fetch the next message of that stream iterator
1609 */
1610 BT_MESSAGE_MOVE_REF(msgs[*count], youngest_stream_iter->current_msg);
1611 (*count)++;
1612
1613 /* Update the last timestamp in nanoseconds sent downstream. */
1614 lttng_live_msg_iter->last_msg_ts_ns = youngest_msg_ts_ns;
1615 youngest_stream_iter->current_msg_ts_ns = INT64_MAX;
1616
1617 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1618 }
1619
1620 return_status:
1621 switch (stream_iter_status) {
1622 case LTTNG_LIVE_ITERATOR_STATUS_OK:
1623 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
1624 /*
1625 * If we gathered messages, return _OK even if the graph was
1626 * interrupted. This allows for the components downstream to at
1627 * least get the those messages. If the graph was indeed
1628 * interrupted there should not be another _next() call as the
1629 * application will tear down the graph. This component class
1630 * doesn't support restarting after an interruption.
1631 */
1632 if (*count > 0) {
1633 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
1634 } else {
1635 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1636 }
1637 break;
1638 case LTTNG_LIVE_ITERATOR_STATUS_END:
1639 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1640 break;
1641 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
1642 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1643 "Memory error preparing the next batch of messages: "
1644 "live-iter-status={}",
1645 stream_iter_status);
1646 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1647 break;
1648 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
1649 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
1650 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
1651 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1652 "Error preparing the next batch of messages: "
1653 "live-iter-status={}",
1654 stream_iter_status);
1655
1656 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1657 /* Put all existing messages on error. */
1658 put_messages(msgs, *count);
1659 break;
1660 default:
1661 bt_common_abort();
1662 }
1663
1664 end:
1665 return status;
1666 } catch (const std::bad_alloc&) {
1667 return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1668 } catch (const bt2::Error&) {
1669 return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1670 }
1671 }
1672
1673 static struct lttng_live_msg_iter *
1674 lttng_live_msg_iter_create(struct lttng_live_component *lttng_live_comp,
1675 bt_self_message_iterator *self_msg_it)
1676 {
1677 lttng_live_msg_iter *msg_iter = new lttng_live_msg_iter {lttng_live_comp->logger};
1678 msg_iter->self_comp = lttng_live_comp->self_comp;
1679 msg_iter->lttng_live_comp = lttng_live_comp;
1680 msg_iter->self_msg_iter = self_msg_it;
1681
1682 msg_iter->active_stream_iter = 0;
1683 msg_iter->last_msg_ts_ns = INT64_MIN;
1684 msg_iter->was_interrupted = false;
1685
1686 msg_iter->sessions =
1687 g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_session);
1688 BT_ASSERT(msg_iter->sessions);
1689
1690 return msg_iter;
1691 }
1692
1693 bt_message_iterator_class_initialize_method_status
1694 lttng_live_msg_iter_init(bt_self_message_iterator *self_msg_it,
1695 bt_self_message_iterator_configuration *, bt_self_component_port_output *)
1696 {
1697 try {
1698 bt_message_iterator_class_initialize_method_status status;
1699 struct lttng_live_component *lttng_live;
1700 struct lttng_live_msg_iter *lttng_live_msg_iter;
1701 enum lttng_live_viewer_status viewer_status;
1702 bt_self_component *self_comp = bt_self_message_iterator_borrow_component(self_msg_it);
1703
1704 lttng_live = (lttng_live_component *) bt_self_component_get_data(self_comp);
1705
1706 /* There can be only one downstream iterator at the same time. */
1707 BT_ASSERT(!lttng_live->has_msg_iter);
1708 lttng_live->has_msg_iter = true;
1709
1710 lttng_live_msg_iter = lttng_live_msg_iter_create(lttng_live, self_msg_it);
1711 if (!lttng_live_msg_iter) {
1712 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live->logger,
1713 "Failed to create lttng_live_msg_iter");
1714 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1715 goto error;
1716 }
1717
1718 viewer_status = live_viewer_connection_create(
1719 lttng_live->params.url->str, false, lttng_live_msg_iter, lttng_live_msg_iter->logger,
1720 &lttng_live_msg_iter->viewer_connection);
1721 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1722 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1723 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1724 "Failed to create viewer connection");
1725 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1726 /*
1727 * Interruption in the _iter_init() method is not
1728 * supported. Return an error.
1729 */
1730 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1731 "Interrupted while creating viewer connection");
1732 }
1733
1734 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1735 goto error;
1736 }
1737
1738 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1739 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1740 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1741 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1742 "Failed to create viewer session");
1743 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1744 /*
1745 * Interruption in the _iter_init() method is not
1746 * supported. Return an error.
1747 */
1748 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1749 "Interrupted when creating viewer session");
1750 }
1751
1752 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1753 goto error;
1754 }
1755
1756 if (lttng_live_msg_iter->sessions->len == 0) {
1757 switch (lttng_live->params.sess_not_found_act) {
1758 case SESSION_NOT_FOUND_ACTION_CONTINUE:
1759 BT_CPPLOGI_SPEC(
1760 lttng_live_msg_iter->logger,
1761 "Unable to connect to the requested live viewer session. "
1762 "Keep trying to connect because of {}=\"{}\" component parameter: url=\"{}\"",
1763 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR,
1764 lttng_live->params.url->str);
1765 break;
1766 case SESSION_NOT_FOUND_ACTION_FAIL:
1767 BT_CPPLOGE_APPEND_CAUSE_SPEC(
1768 lttng_live_msg_iter->logger,
1769 "Unable to connect to the requested live viewer session. "
1770 "Fail the message iterator initialization because of {}=\"{}\" "
1771 "component parameter: url =\"{}\"",
1772 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_FAIL_STR,
1773 lttng_live->params.url->str);
1774 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1775 goto error;
1776 case SESSION_NOT_FOUND_ACTION_END:
1777 BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger,
1778 "Unable to connect to the requested live viewer session. "
1779 "End gracefully at the first _next() call because of {}=\"{}\""
1780 " component parameter: url=\"{}\"",
1781 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_END_STR,
1782 lttng_live->params.url->str);
1783 break;
1784 default:
1785 bt_common_abort();
1786 }
1787 }
1788
1789 bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
1790 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
1791 goto end;
1792
1793 error:
1794 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
1795 end:
1796 return status;
1797 } catch (const std::bad_alloc&) {
1798 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1799 } catch (const bt2::Error&) {
1800 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1801 }
1802 }
1803
1804 static struct bt_param_validation_map_value_entry_descr list_sessions_params[] = {
1805 {URL_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
1806 bt_param_validation_value_descr::makeString()},
1807 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
1808
1809 static bt_component_class_query_method_status
1810 lttng_live_query_list_sessions(const bt_value *params, const bt_value **result,
1811 const bt2c::Logger& logger)
1812 {
1813 bt_component_class_query_method_status status;
1814 const bt_value *url_value = NULL;
1815 const char *url;
1816 struct live_viewer_connection *viewer_connection = NULL;
1817 enum lttng_live_viewer_status viewer_status;
1818 enum bt_param_validation_status validation_status;
1819 gchar *validate_error = NULL;
1820
1821 validation_status = bt_param_validation_validate(params, list_sessions_params, &validate_error);
1822 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
1823 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1824 goto error;
1825 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
1826 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1827 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "{}", validate_error);
1828 goto error;
1829 }
1830
1831 url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
1832 url = bt_value_string_get(url_value);
1833
1834 viewer_status = live_viewer_connection_create(url, true, NULL, logger, &viewer_connection);
1835 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1836 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1837 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Failed to create viewer connection");
1838 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1839 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1840 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
1841 } else {
1842 bt_common_abort();
1843 }
1844 goto error;
1845 }
1846
1847 status = live_viewer_connection_list_sessions(viewer_connection, result);
1848 if (status != BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK) {
1849 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Failed to list viewer sessions");
1850 goto error;
1851 }
1852
1853 goto end;
1854
1855 error:
1856 BT_VALUE_PUT_REF_AND_RESET(*result);
1857
1858 if (status >= 0) {
1859 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1860 }
1861
1862 end:
1863 if (viewer_connection) {
1864 live_viewer_connection_destroy(viewer_connection);
1865 }
1866
1867 g_free(validate_error);
1868
1869 return status;
1870 }
1871
1872 static bt_component_class_query_method_status
1873 lttng_live_query_support_info(const bt_value *params, const bt_value **result,
1874 const bt2c::Logger& logger)
1875 {
1876 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
1877 const bt_value *input_type_value;
1878 const bt_value *input_value;
1879 double weight = 0;
1880 struct bt_common_lttng_live_url_parts parts = {};
1881
1882 /* Used by the logging macros */
1883 __attribute__((unused)) bt_self_component *self_comp = NULL;
1884
1885 *result = NULL;
1886 input_type_value = bt_value_map_borrow_entry_value_const(params, "type");
1887 if (!input_type_value) {
1888 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Missing expected `type` parameter.");
1889 goto error;
1890 }
1891
1892 if (!bt_value_is_string(input_type_value)) {
1893 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "`type` parameter is not a string value.");
1894 goto error;
1895 }
1896
1897 if (strcmp(bt_value_string_get(input_type_value), "string") != 0) {
1898 /* We don't handle file system paths */
1899 goto create_result;
1900 }
1901
1902 input_value = bt_value_map_borrow_entry_value_const(params, "input");
1903 if (!input_value) {
1904 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Missing expected `input` parameter.");
1905 goto error;
1906 }
1907
1908 if (!bt_value_is_string(input_value)) {
1909 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "`input` parameter is not a string value.");
1910 goto error;
1911 }
1912
1913 parts = bt_common_parse_lttng_live_url(bt_value_string_get(input_value), NULL, 0);
1914 if (parts.session_name) {
1915 /*
1916 * Looks pretty much like an LTTng live URL: we got the
1917 * session name part, which forms a complete URL.
1918 */
1919 weight = .75;
1920 }
1921
1922 create_result:
1923 *result = bt_value_real_create_init(weight);
1924 if (!*result) {
1925 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1926 goto error;
1927 }
1928
1929 goto end;
1930
1931 error:
1932 if (status >= 0) {
1933 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1934 }
1935
1936 BT_ASSERT(!*result);
1937
1938 end:
1939 bt_common_destroy_lttng_live_url_parts(&parts);
1940 return status;
1941 }
1942
1943 bt_component_class_query_method_status lttng_live_query(bt_self_component_class_source *comp_class,
1944 bt_private_query_executor *priv_query_exec,
1945 const char *object, const bt_value *params,
1946 __attribute__((unused)) void *method_data,
1947 const bt_value **result)
1948 {
1949 try {
1950 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
1951 bt2c::Logger logger {bt2::SelfComponentClass {comp_class},
1952 bt2::PrivateQueryExecutor {priv_query_exec},
1953 "PLUGIN/SRC.CTF.LTTNG-LIVE/QUERY"};
1954
1955 if (strcmp(object, "sessions") == 0) {
1956 status = lttng_live_query_list_sessions(params, result, logger);
1957 } else if (strcmp(object, "babeltrace.support-info") == 0) {
1958 status = lttng_live_query_support_info(params, result, logger);
1959 } else {
1960 BT_CPPLOGI_SPEC(logger, "Unknown query object `{}`", object);
1961 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT;
1962 goto end;
1963 }
1964
1965 end:
1966 return status;
1967 } catch (const std::bad_alloc&) {
1968 return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1969 } catch (const bt2::Error&) {
1970 return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1971 }
1972 }
1973
1974 static void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
1975 {
1976 if (!lttng_live) {
1977 return;
1978 }
1979
1980 if (lttng_live->params.url) {
1981 g_string_free(lttng_live->params.url, TRUE);
1982 }
1983
1984 delete lttng_live;
1985 }
1986
1987 void lttng_live_component_finalize(bt_self_component_source *component)
1988 {
1989 lttng_live_component *data = (lttng_live_component *) bt_self_component_get_data(
1990 bt_self_component_source_as_self_component(component));
1991
1992 if (!data) {
1993 return;
1994 }
1995 lttng_live_component_destroy_data(data);
1996 }
1997
1998 static enum session_not_found_action
1999 parse_session_not_found_action_param(const bt_value *no_session_param)
2000 {
2001 enum session_not_found_action action;
2002 const char *no_session_act_str = bt_value_string_get(no_session_param);
2003
2004 if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) {
2005 action = SESSION_NOT_FOUND_ACTION_CONTINUE;
2006 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) {
2007 action = SESSION_NOT_FOUND_ACTION_FAIL;
2008 } else {
2009 BT_ASSERT(strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0);
2010 action = SESSION_NOT_FOUND_ACTION_END;
2011 }
2012
2013 return action;
2014 }
2015
2016 static bt_param_validation_value_descr inputs_elem_descr =
2017 bt_param_validation_value_descr::makeString();
2018
2019 static const char *sess_not_found_action_choices[] = {
2020 SESS_NOT_FOUND_ACTION_CONTINUE_STR,
2021 SESS_NOT_FOUND_ACTION_FAIL_STR,
2022 SESS_NOT_FOUND_ACTION_END_STR,
2023 };
2024
2025 static struct bt_param_validation_map_value_entry_descr params_descr[] = {
2026 {INPUTS_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
2027 bt_param_validation_value_descr::makeArray(1, 1, inputs_elem_descr)},
2028 {SESS_NOT_FOUND_ACTION_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
2029 bt_param_validation_value_descr::makeString(sess_not_found_action_choices)},
2030 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
2031
2032 static bt_component_class_initialize_method_status
2033 lttng_live_component_create(const bt_value *params, bt_self_component_source *self_comp,
2034 struct lttng_live_component **component)
2035 {
2036 struct lttng_live_component *lttng_live = NULL;
2037 const bt_value *inputs_value;
2038 const bt_value *url_value;
2039 const bt_value *value;
2040 const char *url;
2041 enum bt_param_validation_status validation_status;
2042 gchar *validation_error = NULL;
2043 bt_component_class_initialize_method_status status;
2044 bt2c::Logger logger {bt2::SelfSourceComponent {self_comp}, "PLUGIN/SRC.CTF.LTTNG-LIVE/COMP"};
2045
2046 validation_status = bt_param_validation_validate(params, params_descr, &validation_error);
2047 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
2048 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2049 goto error;
2050 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
2051 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "{}", validation_error);
2052 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
2053 goto error;
2054 }
2055
2056 lttng_live = new lttng_live_component {std::move(logger)};
2057 lttng_live->self_comp = bt_self_component_source_as_self_component(self_comp);
2058 lttng_live->max_query_size = MAX_QUERY_SIZE;
2059 lttng_live->has_msg_iter = false;
2060
2061 inputs_value = bt_value_map_borrow_entry_value_const(params, INPUTS_PARAM);
2062 url_value = bt_value_array_borrow_element_by_index_const(inputs_value, 0);
2063 url = bt_value_string_get(url_value);
2064
2065 lttng_live->params.url = g_string_new(url);
2066 if (!lttng_live->params.url) {
2067 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2068 goto error;
2069 }
2070
2071 value = bt_value_map_borrow_entry_value_const(params, SESS_NOT_FOUND_ACTION_PARAM);
2072 if (value) {
2073 lttng_live->params.sess_not_found_act = parse_session_not_found_action_param(value);
2074 } else {
2075 BT_CPPLOGI_SPEC(lttng_live->logger,
2076 "Optional `{}` parameter is missing: defaulting to `{}`.",
2077 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR);
2078 lttng_live->params.sess_not_found_act = SESSION_NOT_FOUND_ACTION_CONTINUE;
2079 }
2080
2081 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
2082 goto end;
2083
2084 error:
2085 lttng_live_component_destroy_data(lttng_live);
2086 lttng_live = NULL;
2087 end:
2088 g_free(validation_error);
2089
2090 *component = lttng_live;
2091 return status;
2092 }
2093
2094 bt_component_class_initialize_method_status
2095 lttng_live_component_init(bt_self_component_source *self_comp_src,
2096 bt_self_component_source_configuration *, const bt_value *params, void *)
2097 {
2098 try {
2099 struct lttng_live_component *lttng_live;
2100 bt_component_class_initialize_method_status ret;
2101 bt_self_component *self_comp = bt_self_component_source_as_self_component(self_comp_src);
2102 bt_self_component_add_port_status add_port_status;
2103
2104 ret = lttng_live_component_create(params, self_comp_src, &lttng_live);
2105 if (ret != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
2106 goto error;
2107 }
2108
2109 add_port_status =
2110 bt_self_component_source_add_output_port(self_comp_src, "out", NULL, NULL);
2111 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
2112 ret = (bt_component_class_initialize_method_status) add_port_status;
2113 goto end;
2114 }
2115
2116 bt_self_component_set_data(self_comp, lttng_live);
2117 goto end;
2118
2119 error:
2120 lttng_live_component_destroy_data(lttng_live);
2121 lttng_live = NULL;
2122 end:
2123 return ret;
2124 } catch (const std::bad_alloc&) {
2125 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2126 } catch (const bt2::Error&) {
2127 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
2128 }
2129 }
This page took 0.073263 seconds and 4 git commands to generate.