7d687b3ebf1574b30d696c492f163b89d27e2170
[babeltrace.git] / src / plugins / ctf / lttng-live / lttng-live.cpp
1 /*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
5 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
6 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
7 *
8 * Babeltrace CTF LTTng-live Client Component
9 */
10
11 #include <glib.h>
12 #include <unistd.h>
13
14 #include "common/assert.h"
15 #include "cpp-common/bt2c/fmt.hpp"
16 #include "cpp-common/vendor/fmt/format.h"
17
18 #include "plugins/common/muxing/muxing.h"
19 #include "plugins/common/param-validation/param-validation.h"
20
21 #include "data-stream.hpp"
22 #include "lttng-live.hpp"
23 #include "metadata.hpp"
24
25 #define MAX_QUERY_SIZE (256 * 1024)
26 #define URL_PARAM "url"
27 #define INPUTS_PARAM "inputs"
28 #define SESS_NOT_FOUND_ACTION_PARAM "session-not-found-action"
29 #define SESS_NOT_FOUND_ACTION_CONTINUE_STR "continue"
30 #define SESS_NOT_FOUND_ACTION_FAIL_STR "fail"
31 #define SESS_NOT_FOUND_ACTION_END_STR "end"
32
33 void lttng_live_stream_iterator_set_state(struct lttng_live_stream_iterator *stream_iter,
34 enum lttng_live_stream_state new_state)
35 {
36 BT_CPPLOGD_SPEC(stream_iter->logger,
37 "Setting live stream iterator state: viewer-stream-id={}, "
38 "old-state={}, new-state={}",
39 stream_iter->viewer_stream_id, stream_iter->state, new_state);
40
41 stream_iter->state = new_state;
42 }
43
44 #define LTTNG_LIVE_LOGD_STREAM_ITER(live_stream_iter) \
45 do { \
46 BT_CPPLOGD_SPEC((live_stream_iter)->logger, \
47 "Live stream iterator state={}, " \
48 "last-inact-ts-is-set={}, last-inact-ts-value={}, " \
49 "curr-inact-ts={}", \
50 (live_stream_iter)->state, (live_stream_iter)->last_inactivity_ts.is_set, \
51 (live_stream_iter)->last_inactivity_ts.value, \
52 (live_stream_iter)->current_inactivity_ts); \
53 } while (0);
54
55 bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter)
56 {
57 bool ret;
58
59 if (!msg_iter) {
60 ret = false;
61 goto end;
62 }
63
64 ret = bt_self_message_iterator_is_interrupted(msg_iter->self_msg_iter);
65
66 end:
67 return ret;
68 }
69
70 static struct lttng_live_trace *
71 lttng_live_session_borrow_trace_by_id(struct lttng_live_session *session, uint64_t trace_id)
72 {
73 uint64_t trace_idx;
74 struct lttng_live_trace *ret_trace = NULL;
75
76 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
77 struct lttng_live_trace *trace =
78 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
79 if (trace->id == trace_id) {
80 ret_trace = trace;
81 goto end;
82 }
83 }
84
85 end:
86 return ret_trace;
87 }
88
89 static void lttng_live_destroy_trace(struct lttng_live_trace *trace)
90 {
91 BT_CPPLOGD_SPEC(trace->logger, "Destroying live trace: trace-id={}", trace->id);
92
93 BT_ASSERT(trace->stream_iterators);
94 g_ptr_array_free(trace->stream_iterators, TRUE);
95
96 lttng_live_metadata_fini(trace);
97 delete trace;
98 }
99
100 static struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
101 uint64_t trace_id)
102 {
103 BT_CPPLOGD_SPEC(session->logger, "Creating live trace: session-id={}, trace-id={}", session->id,
104 trace_id);
105
106 lttng_live_trace *trace = new lttng_live_trace {session->logger};
107 trace->session = session;
108 trace->id = trace_id;
109 trace->stream_iterators =
110 g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_stream_iterator_destroy);
111 BT_ASSERT(trace->stream_iterators);
112 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
113 g_ptr_array_add(session->traces, trace);
114
115 return trace;
116 }
117
118 struct lttng_live_trace *
119 lttng_live_session_borrow_or_create_trace_by_id(struct lttng_live_session *session,
120 uint64_t trace_id)
121 {
122 struct lttng_live_trace *trace;
123
124 trace = lttng_live_session_borrow_trace_by_id(session, trace_id);
125 if (trace) {
126 goto end;
127 }
128
129 /* The session is the owner of the newly created trace. */
130 trace = lttng_live_create_trace(session, trace_id);
131
132 end:
133 return trace;
134 }
135
136 int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint64_t session_id,
137 const char *hostname, const char *session_name)
138 {
139 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
140 "Adding live session: "
141 "session-id={}, hostname=\"{}\", session-name=\"{}\"",
142 session_id, hostname, session_name);
143
144 lttng_live_session *session = new lttng_live_session {lttng_live_msg_iter->logger};
145 session->self_comp = lttng_live_msg_iter->self_comp;
146 session->id = session_id;
147 session->traces = g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_trace);
148 BT_ASSERT(session->traces);
149 session->lttng_live_msg_iter = lttng_live_msg_iter;
150 session->new_streams_needed = true;
151 session->hostname = g_string_new(hostname);
152 BT_ASSERT(session->hostname);
153
154 session->session_name = g_string_new(session_name);
155 BT_ASSERT(session->session_name);
156
157 g_ptr_array_add(lttng_live_msg_iter->sessions, session);
158
159 return 0;
160 }
161
162 static void lttng_live_destroy_session(struct lttng_live_session *session)
163 {
164 if (!session) {
165 goto end;
166 }
167
168 BT_CPPLOGD_SPEC(session->logger,
169 "Destroying live session: "
170 "session-id={}, session-name=\"{}\"",
171 session->id, session->session_name->str);
172 if (session->id != -1ULL) {
173 if (lttng_live_session_detach(session)) {
174 if (!lttng_live_graph_is_canceled(session->lttng_live_msg_iter)) {
175 /* Old relayd cannot detach sessions. */
176 BT_CPPLOGD_SPEC(session->logger, "Unable to detach lttng live session {}",
177 session->id);
178 }
179 }
180 session->id = -1ULL;
181 }
182
183 if (session->traces) {
184 g_ptr_array_free(session->traces, TRUE);
185 }
186
187 if (session->hostname) {
188 g_string_free(session->hostname, TRUE);
189 }
190
191 if (session->session_name) {
192 g_string_free(session->session_name, TRUE);
193 }
194
195 delete session;
196
197 end:
198 return;
199 }
200
201 static void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter)
202 {
203 if (!lttng_live_msg_iter) {
204 goto end;
205 }
206
207 if (lttng_live_msg_iter->sessions) {
208 g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
209 }
210
211 if (lttng_live_msg_iter->viewer_connection) {
212 live_viewer_connection_destroy(lttng_live_msg_iter->viewer_connection);
213 }
214 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
215 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
216
217 /* All stream iterators must be destroyed at this point. */
218 BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0);
219 lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false;
220
221 delete lttng_live_msg_iter;
222
223 end:
224 return;
225 }
226
227 void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
228 {
229 struct lttng_live_msg_iter *lttng_live_msg_iter;
230
231 BT_ASSERT(self_msg_iter);
232
233 lttng_live_msg_iter =
234 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_iter);
235 BT_ASSERT(lttng_live_msg_iter);
236 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
237 }
238
239 static enum lttng_live_iterator_status
240 lttng_live_iterator_next_check_stream_state(struct lttng_live_stream_iterator *lttng_live_stream)
241 {
242 switch (lttng_live_stream->state) {
243 case LTTNG_LIVE_STREAM_QUIESCENT:
244 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
245 break;
246 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
247 /* Invalid state. */
248 BT_CPPLOGF_SPEC(lttng_live_stream->logger, "Unexpected stream state \"ACTIVE_NO_DATA\"");
249 bt_common_abort();
250 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
251 /* Invalid state. */
252 BT_CPPLOGF_SPEC(lttng_live_stream->logger, "Unexpected stream state \"QUIESCENT_NO_DATA\"");
253 bt_common_abort();
254 case LTTNG_LIVE_STREAM_EOF:
255 break;
256 }
257 return LTTNG_LIVE_ITERATOR_STATUS_OK;
258 }
259
260 /*
261 * For active no data stream, fetch next index. As a result of that it can
262 * become either:
263 * - quiescent: won't have events for a bit,
264 * - have data: need to get that data and produce the event,
265 * - have no data on this stream at this point: need to retry (AGAIN) or return
266 * EOF.
267 */
268 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
269 struct lttng_live_msg_iter *lttng_live_msg_iter,
270 struct lttng_live_stream_iterator *lttng_live_stream)
271 {
272 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
273 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
274 struct packet_index index;
275
276 if (lttng_live_stream->trace->metadata_stream_state ==
277 LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
278 BT_CPPLOGD_SPEC(
279 lttng_live_msg_iter->logger,
280 "Need to get an update for the metadata stream before proceeding further with this stream: "
281 "stream-name=\"{}\"",
282 lttng_live_stream->name->str);
283 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
284 goto end;
285 }
286
287 if (lttng_live_stream->trace->session->new_streams_needed) {
288 BT_CPPLOGD_SPEC(
289 lttng_live_msg_iter->logger,
290 "Need to get an update of all streams before proceeding further with this stream: "
291 "stream-name=\"{}\"",
292 lttng_live_stream->name->str);
293 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
294 goto end;
295 }
296
297 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
298 lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
299 goto end;
300 }
301 ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream, &index);
302 if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
303 goto end;
304 }
305
306 BT_ASSERT_DBG(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
307
308 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
309 uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts.value,
310 curr_inact_ts = lttng_live_stream->current_inactivity_ts;
311
312 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA && last_inact_ts == curr_inact_ts) {
313 /*
314 * Because the stream is in the QUIESCENT_NO_DATA
315 * state, we can assert that the last_inactivity_ts was
316 * set and can be safely used in the `if` above.
317 */
318 BT_ASSERT(lttng_live_stream->last_inactivity_ts.is_set);
319
320 ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
321 LTTNG_LIVE_LOGD_STREAM_ITER(lttng_live_stream);
322 } else {
323 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
324 }
325 goto end;
326 }
327
328 lttng_live_stream->base_offset = index.offset;
329 lttng_live_stream->offset = index.offset;
330 lttng_live_stream->len = index.packet_size / CHAR_BIT;
331
332 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
333 "Setting live stream reading info: stream-name=\"{}\", "
334 "viewer-stream-id={}, stream-base-offset={}, stream-offset={}, stream-len={}",
335 lttng_live_stream->name->str, lttng_live_stream->viewer_stream_id,
336 lttng_live_stream->base_offset, lttng_live_stream->offset,
337 lttng_live_stream->len);
338
339 end:
340 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
341 ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
342 }
343 return ret;
344 }
345
346 /*
347 * Creation of the message requires the ctf trace class to be created
348 * beforehand, but the live protocol gives us all streams (including metadata)
349 * at once. So we split it in three steps: getting streams, getting metadata
350 * (which creates the ctf trace class), and then creating the per-stream
351 * messages.
352 */
353 static enum lttng_live_iterator_status
354 lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
355 struct lttng_live_session *session)
356 {
357 enum lttng_live_iterator_status status;
358 uint64_t trace_idx;
359
360 if (!session->attached) {
361 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Attach to session: session-id={}",
362 session->id);
363 enum lttng_live_viewer_status attach_status =
364 lttng_live_session_attach(session, lttng_live_msg_iter->self_msg_iter);
365 if (attach_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
366 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
367 /*
368 * Clear any causes appended in
369 * `lttng_live_attach_session()` as we want to
370 * return gracefully since the graph was
371 * cancelled.
372 */
373 bt_current_thread_clear_error();
374 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
375 } else {
376 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
377 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
378 "Error attaching to LTTng live session");
379 }
380 goto end;
381 }
382 }
383
384 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
385 "Updating all data streams: "
386 "session-id={}, session-name=\"{}\"",
387 session->id, session->session_name->str);
388
389 status = lttng_live_session_get_new_streams(session, lttng_live_msg_iter->self_msg_iter);
390 switch (status) {
391 case LTTNG_LIVE_ITERATOR_STATUS_OK:
392 break;
393 case LTTNG_LIVE_ITERATOR_STATUS_END:
394 /*
395 * We received a `_END` from the `_get_new_streams()` function,
396 * which means no more data will ever be received from the data
397 * streams of this session. But it's possible that the metadata
398 * is incomplete.
399 * The live protocol guarantees that we receive all the
400 * metadata needed before we receive data streams needing it.
401 * But it's possible to receive metadata NOT needed by
402 * data streams after the session was closed. For example, this
403 * could happen if a new event is registered and the session is
404 * stopped before any tracepoint for that event is actually
405 * fired.
406 */
407 BT_CPPLOGD_SPEC(
408 lttng_live_msg_iter->logger,
409 "Updating streams returned _END status. Override status to _OK in order fetch any remaining metadata:"
410 "session-id={}, session-name=\"{}\"",
411 session->id, session->session_name->str);
412 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
413 break;
414 default:
415 goto end;
416 }
417
418 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
419 "Updating metadata stream for session: "
420 "session-id={}, session-name=\"{}\"",
421 session->id, session->session_name->str);
422
423 trace_idx = 0;
424 while (trace_idx < session->traces->len) {
425 struct lttng_live_trace *trace =
426 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
427
428 status = lttng_live_metadata_update(trace);
429 switch (status) {
430 case LTTNG_LIVE_ITERATOR_STATUS_END:
431 case LTTNG_LIVE_ITERATOR_STATUS_OK:
432 trace_idx++;
433 break;
434 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
435 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
436 goto end;
437 default:
438 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
439 "Error updating trace metadata: "
440 "stream-iter-status={}, trace-id={}",
441 status, trace->id);
442 goto end;
443 }
444 }
445
446 /*
447 * Now that we have the metadata we can initialize the downstream
448 * iterator.
449 */
450 status = lttng_live_lazy_msg_init(session, lttng_live_msg_iter->self_msg_iter);
451
452 end:
453 return status;
454 }
455
456 static void
457 lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
458 {
459 uint64_t session_idx, trace_idx;
460
461 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
462 struct lttng_live_session *session =
463 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
464 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
465 "Force marking session as needing new streams: "
466 "session-id={}",
467 session->id);
468 session->new_streams_needed = true;
469 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
470 struct lttng_live_trace *trace =
471 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
472 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
473 "Force marking trace metadata state as needing an update: "
474 "session-id={}, trace-id={}",
475 session->id, trace->id);
476
477 BT_ASSERT(trace->metadata_stream_state != LTTNG_LIVE_METADATA_STREAM_STATE_CLOSED);
478
479 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
480 }
481 }
482 }
483
484 static enum lttng_live_iterator_status
485 lttng_live_iterator_handle_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
486 {
487 enum lttng_live_iterator_status status;
488 enum lttng_live_viewer_status viewer_status;
489 uint64_t session_idx = 0, nr_sessions_opened = 0;
490 struct lttng_live_session *session;
491 enum session_not_found_action sess_not_found_act =
492 lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
493
494 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
495 "Update data and metadata of all sessions: "
496 "live-msg-iter-addr={}",
497 fmt::ptr(lttng_live_msg_iter));
498 /*
499 * In a remotely distant future, we could add a "new
500 * session" flag to the protocol, which would tell us that we
501 * need to query for new sessions even though we have sessions
502 * currently ongoing.
503 */
504 if (lttng_live_msg_iter->sessions->len == 0) {
505 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
506 BT_CPPLOGD_SPEC(
507 lttng_live_msg_iter->logger,
508 "No session found. Exiting in accordance with the `session-not-found-action` parameter");
509 status = LTTNG_LIVE_ITERATOR_STATUS_END;
510 goto end;
511 } else {
512 BT_CPPLOGD_SPEC(
513 lttng_live_msg_iter->logger,
514 "No session found. Try creating a new one in accordance with the `session-not-found-action` parameter");
515 /*
516 * Retry to create a viewer session for the requested
517 * session name.
518 */
519 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
520 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
521 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
522 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
523 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
524 "Error creating LTTng live viewer session");
525 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
526 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
527 } else {
528 bt_common_abort();
529 }
530 goto end;
531 }
532 }
533 }
534
535 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
536 session =
537 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
538 status = lttng_live_get_session(lttng_live_msg_iter, session);
539 switch (status) {
540 case LTTNG_LIVE_ITERATOR_STATUS_OK:
541 case LTTNG_LIVE_ITERATOR_STATUS_END:
542 /*
543 * A session returned `_END`. Other sessions may still
544 * be active so we override the status and continue
545 * looping if needed.
546 */
547 break;
548 default:
549 goto end;
550 }
551 if (!session->closed) {
552 nr_sessions_opened++;
553 }
554 }
555
556 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE && nr_sessions_opened == 0) {
557 status = LTTNG_LIVE_ITERATOR_STATUS_END;
558 } else {
559 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
560 }
561
562 end:
563 return status;
564 }
565
566 static enum lttng_live_iterator_status
567 emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
568 struct lttng_live_stream_iterator *stream_iter, const bt_message **message,
569 uint64_t timestamp)
570 {
571 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
572 bt_message *msg = NULL;
573
574 BT_ASSERT(stream_iter->trace->clock_class);
575
576 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
577 "Emitting inactivity message for stream: ctf-stream-id={}, "
578 "viewer-stream-id={}, timestamp={}",
579 stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id,
580 timestamp);
581
582 msg = bt_message_message_iterator_inactivity_create(lttng_live_msg_iter->self_msg_iter,
583 stream_iter->trace->clock_class, timestamp);
584 if (!msg) {
585 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
586 "Error emitting message iterator inactivity message");
587 goto error;
588 }
589
590 *message = msg;
591 end:
592 return ret;
593
594 error:
595 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
596 bt_message_put_ref(msg);
597 goto end;
598 }
599
600 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
601 struct lttng_live_msg_iter *lttng_live_msg_iter,
602 struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
603 {
604 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
605
606 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
607 return LTTNG_LIVE_ITERATOR_STATUS_OK;
608 }
609
610 /*
611 * Check if we already sent an inactivty message downstream for this
612 * `current_inactivity_ts` value.
613 */
614 if (lttng_live_stream->last_inactivity_ts.is_set &&
615 lttng_live_stream->current_inactivity_ts == lttng_live_stream->last_inactivity_ts.value) {
616 lttng_live_stream_iterator_set_state(lttng_live_stream,
617 LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA);
618
619 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
620 goto end;
621 }
622
623 ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream, message,
624 lttng_live_stream->current_inactivity_ts);
625
626 lttng_live_stream->last_inactivity_ts.value = lttng_live_stream->current_inactivity_ts;
627 lttng_live_stream->last_inactivity_ts.is_set = true;
628 end:
629 return ret;
630 }
631
632 static int live_get_msg_ts_ns(struct lttng_live_msg_iter *lttng_live_msg_iter,
633 const bt_message *msg, int64_t last_msg_ts_ns, int64_t *ts_ns)
634 {
635 const bt_clock_snapshot *clock_snapshot = NULL;
636 int ret = 0;
637
638 BT_ASSERT_DBG(msg);
639 BT_ASSERT_DBG(ts_ns);
640
641 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
642 "Getting message's timestamp: iter-data-addr={}, msg-addr={}, "
643 "last-msg-ts={}",
644 fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns);
645
646 switch (bt_message_get_type(msg)) {
647 case BT_MESSAGE_TYPE_EVENT:
648 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(msg);
649 break;
650 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
651 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
652 break;
653 case BT_MESSAGE_TYPE_PACKET_END:
654 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
655 break;
656 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
657 clock_snapshot =
658 bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
659 break;
660 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
661 clock_snapshot =
662 bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
663 break;
664 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
665 clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const(msg);
666 break;
667 default:
668 /* All the other messages have a higher priority */
669 BT_CPPLOGD_STR_SPEC(lttng_live_msg_iter->logger,
670 "Message has no timestamp: using the last message timestamp.");
671 *ts_ns = last_msg_ts_ns;
672 goto end;
673 }
674
675 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
676 if (ret) {
677 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
678 "Cannot get nanoseconds from Epoch of clock snapshot: "
679 "clock-snapshot-addr={}",
680 fmt::ptr(clock_snapshot));
681 goto error;
682 }
683
684 goto end;
685
686 error:
687 ret = -1;
688
689 end:
690 if (ret == 0) {
691 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
692 "Found message's timestamp: iter-data-addr={}, msg-addr={}, "
693 "last-msg-ts={}, ts={}",
694 fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns, *ts_ns);
695 }
696
697 return ret;
698 }
699
700 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
701 struct lttng_live_msg_iter *lttng_live_msg_iter,
702 struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
703 {
704 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
705 enum ctf_msg_iter_status status;
706 uint64_t session_idx, trace_idx;
707
708 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
709 struct lttng_live_session *session =
710 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
711
712 if (session->new_streams_needed) {
713 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
714 "Need an update for streams: "
715 "session-id={}",
716 session->id);
717 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
718 goto end;
719 }
720 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
721 struct lttng_live_trace *trace =
722 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
723 if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
724 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
725 "Need an update for metadata stream: "
726 "session-id={}, trace-id={}",
727 session->id, trace->id);
728 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
729 goto end;
730 }
731 }
732 }
733
734 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
735 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
736 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
737 "Invalid state of live stream iterator"
738 "stream-iter-status={}",
739 lttng_live_stream->state);
740 goto end;
741 }
742
743 status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter, message);
744 switch (status) {
745 case CTF_MSG_ITER_STATUS_EOF:
746 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
747 break;
748 case CTF_MSG_ITER_STATUS_OK:
749 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
750 break;
751 case CTF_MSG_ITER_STATUS_AGAIN:
752 /*
753 * Continue immediately (end of packet). The next
754 * get_index may return AGAIN to delay the following
755 * attempt.
756 */
757 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
758 break;
759 case CTF_MSG_ITER_STATUS_ERROR:
760 default:
761 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
762 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
763 "CTF message iterator failed to get next message: "
764 "msg-iter={}, msg-iter-status={}",
765 fmt::ptr(lttng_live_stream->msg_iter), status);
766 break;
767 }
768
769 end:
770 return ret;
771 }
772
773 static enum lttng_live_iterator_status
774 lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
775 struct lttng_live_stream_iterator *stream_iter,
776 const bt_message **curr_msg)
777 {
778 enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
779
780 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
781 "Closing live stream iterator: stream-name=\"{}\", "
782 "viewer-stream-id={}",
783 stream_iter->name->str, stream_iter->viewer_stream_id);
784
785 /*
786 * The viewer has hung up on us so we are closing the stream. The
787 * `ctf_msg_iter` should simply realize that it needs to close the
788 * stream properly by emitting the necessary stream end message.
789 */
790 enum ctf_msg_iter_status status =
791 ctf_msg_iter_get_next_message(stream_iter->msg_iter, curr_msg);
792
793 if (status == CTF_MSG_ITER_STATUS_ERROR) {
794 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
795 "Error getting the next message from CTF message iterator");
796 live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
797 goto end;
798 } else if (status == CTF_MSG_ITER_STATUS_EOF) {
799 BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger,
800 "Reached the end of the live stream iterator.");
801 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
802 goto end;
803 }
804
805 BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
806
807 end:
808 return live_status;
809 }
810
811 /*
812 * helper function:
813 * handle_no_data_streams()
814 * retry:
815 * - for each ACTIVE_NO_DATA stream:
816 * - query relayd for stream data, or quiescence info.
817 * - if need metadata, get metadata, goto retry.
818 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
819 * - if quiescent, move to QUIESCENT streams
820 * - if fetched data, move to ACTIVE_DATA streams
821 * (at this point each stream either has data, or is quiescent)
822 *
823 *
824 * iterator_next:
825 * handle_new_streams_and_metadata()
826 * - query relayd for known streams, add them as ACTIVE_NO_DATA
827 * - query relayd for metadata
828 *
829 * call handle_active_no_data_streams()
830 *
831 * handle_quiescent_streams()
832 * - if at least one stream is ACTIVE_DATA:
833 * - peek stream event with lowest timestamp -> next_ts
834 * - for each quiescent stream
835 * - if next_ts >= quiescent end
836 * - set state to ACTIVE_NO_DATA
837 * - else
838 * - for each quiescent stream
839 * - set state to ACTIVE_NO_DATA
840 *
841 * call handle_active_no_data_streams()
842 *
843 * handle_active_data_streams()
844 * - if at least one stream is ACTIVE_DATA:
845 * - get stream event with lowest timestamp from heap
846 * - make that stream event the current message.
847 * - move this stream heap position to its next event
848 * - if we need to fetch data from relayd, move
849 * stream to ACTIVE_NO_DATA.
850 * - return OK
851 * - return AGAIN
852 *
853 * end criterion: ctrl-c on client. If relayd exits or the session
854 * closes on the relay daemon side, we keep on waiting for streams.
855 * Eventually handle --end timestamp (also an end criterion).
856 *
857 * When disconnected from relayd: try to re-connect endlessly.
858 */
859 static enum lttng_live_iterator_status
860 lttng_live_iterator_next_msg_on_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
861 struct lttng_live_stream_iterator *stream_iter,
862 const bt_message **curr_msg)
863 {
864 enum lttng_live_iterator_status live_status;
865
866 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
867 "Advancing live stream iterator until next message if possible: "
868 "stream-name=\"{}\", viewer-stream-id={}",
869 stream_iter->name->str, stream_iter->viewer_stream_id);
870
871 if (stream_iter->has_stream_hung_up) {
872 /*
873 * The stream has hung up and the stream was properly closed
874 * during the last call to the current function. Return _END
875 * status now so that this stream iterator is removed for the
876 * stream iterator list.
877 */
878 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
879 goto end;
880 }
881
882 retry:
883 LTTNG_LIVE_LOGD_STREAM_ITER(stream_iter);
884
885 /*
886 * Make sure we have the most recent metadata and possibly some new
887 * streams.
888 */
889 live_status = lttng_live_iterator_handle_new_streams_and_metadata(lttng_live_msg_iter);
890 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
891 goto end;
892 }
893
894 live_status =
895 lttng_live_iterator_next_handle_one_no_data_stream(lttng_live_msg_iter, stream_iter);
896 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
897 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
898 /*
899 * We overwrite `live_status` since `curr_msg` is
900 * likely set to a valid message in this function.
901 */
902 live_status =
903 lttng_live_iterator_close_stream(lttng_live_msg_iter, stream_iter, curr_msg);
904 }
905 goto end;
906 }
907
908 live_status = lttng_live_iterator_next_handle_one_quiescent_stream(lttng_live_msg_iter,
909 stream_iter, curr_msg);
910 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
911 BT_ASSERT(!*curr_msg);
912 goto end;
913 }
914 if (*curr_msg) {
915 goto end;
916 }
917 live_status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live_msg_iter,
918 stream_iter, curr_msg);
919 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
920 BT_ASSERT(!*curr_msg);
921 }
922
923 end:
924 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
925 BT_CPPLOGD_SPEC(
926 lttng_live_msg_iter->logger,
927 "Ask the relay daemon for an updated view of the data and metadata streams");
928 goto retry;
929 }
930
931 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
932 "Returning from advancing live stream iterator: status={}, "
933 "stream-name=\"{}\", viewer-stream-id={}",
934 live_status, stream_iter->name->str, stream_iter->viewer_stream_id);
935
936 return live_status;
937 }
938
939 static bool is_discarded_packet_or_event_message(const bt_message *msg)
940 {
941 const enum bt_message_type msg_type = bt_message_get_type(msg);
942
943 return msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS ||
944 msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS;
945 }
946
947 static enum lttng_live_iterator_status
948 adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream,
949 const bt_message *msg_in, bt_message **msg_out,
950 uint64_t new_begin_ts)
951 {
952 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
953 enum bt_property_availability availability;
954 const bt_clock_snapshot *clock_snapshot;
955 uint64_t end_ts;
956 uint64_t count;
957
958 clock_snapshot = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg_in);
959 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
960
961 availability = bt_message_discarded_packets_get_count(msg_in, &count);
962 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
963
964 *msg_out = bt_message_discarded_packets_create_with_default_clock_snapshots(
965 iter, stream, new_begin_ts, end_ts);
966 if (!*msg_out) {
967 status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
968 goto end;
969 }
970
971 bt_message_discarded_packets_set_count(*msg_out, count);
972 end:
973 return status;
974 }
975
976 static enum lttng_live_iterator_status
977 adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream,
978 const bt_message *msg_in, bt_message **msg_out,
979 uint64_t new_begin_ts)
980 {
981 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
982 enum bt_property_availability availability;
983 const bt_clock_snapshot *clock_snapshot;
984 uint64_t end_ts;
985 uint64_t count;
986
987 clock_snapshot = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg_in);
988 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
989
990 availability = bt_message_discarded_events_get_count(msg_in, &count);
991 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
992
993 *msg_out = bt_message_discarded_events_create_with_default_clock_snapshots(
994 iter, stream, new_begin_ts, end_ts);
995 if (!*msg_out) {
996 status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
997 goto end;
998 }
999
1000 bt_message_discarded_events_set_count(*msg_out, count);
1001 end:
1002 return status;
1003 }
1004
1005 static enum lttng_live_iterator_status
1006 handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
1007 struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns,
1008 const bt_message *late_msg)
1009 {
1010 const bt_clock_class *clock_class;
1011 const bt_stream_class *stream_class;
1012 enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status;
1013 int64_t last_inactivity_ts_ns;
1014 enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1015 enum lttng_live_iterator_status adjust_status;
1016 bt_message *adjusted_message;
1017
1018 /*
1019 * The timestamp of the current message is before the last message sent
1020 * by this component. We CANNOT send it as is.
1021 *
1022 * The only expected scenario in which that could happen is the
1023 * following, everything else is a bug in this component, relay daemon,
1024 * or CTF parser.
1025 *
1026 * Expected scenario: The CTF message iterator emitted discarded
1027 * packets and discarded events with synthesized beginning and end
1028 * timestamps from the bounds of the last known packet and the newly
1029 * decoded packet header. The CTF message iterator is not aware of
1030 * stream inactivity beacons. Hence, we have to adjust the beginning
1031 * timestamp of those types of messages if a stream signalled its
1032 * inactivity up until _after_ the last known packet's beginning
1033 * timestamp.
1034 *
1035 * Otherwise, the monotonicity guarantee of message timestamps would
1036 * not be preserved.
1037 *
1038 * In short, the only scenario in which it's okay and fixable to
1039 * received a late message is when:
1040 * 1. the late message is a discarded packets or discarded events
1041 * message,
1042 * 2. this stream produced an inactivity message downstream, and
1043 * 3. the timestamp of the late message is within the inactivity
1044 * timespan we sent downstream through the inactivity message.
1045 */
1046
1047 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1048 "Handling late message on live stream iterator: "
1049 "stream-name=\"{}\", viewer-stream-id={}",
1050 stream_iter->name->str, stream_iter->viewer_stream_id);
1051
1052 if (!stream_iter->last_inactivity_ts.is_set) {
1053 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1054 "Invalid live stream state: "
1055 "have a late message when no inactivity message "
1056 "was ever sent for that stream.");
1057 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1058 goto end;
1059 }
1060
1061 if (!is_discarded_packet_or_event_message(late_msg)) {
1062 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1063 "Invalid live stream state: "
1064 "have a late message that is not a packet discarded or "
1065 "event discarded message: late-msg-type={}",
1066 static_cast<bt2::MessageType>(bt_message_get_type(late_msg)));
1067 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1068 goto end;
1069 }
1070
1071 stream_class = bt_stream_borrow_class_const(stream_iter->stream);
1072 clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class);
1073
1074 ts_ns_status = bt_clock_class_cycles_to_ns_from_origin(
1075 clock_class, stream_iter->last_inactivity_ts.value, &last_inactivity_ts_ns);
1076 if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) {
1077 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1078 "Error converting last "
1079 "inactivity message timestamp to nanoseconds");
1080 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1081 goto end;
1082 }
1083
1084 if (last_inactivity_ts_ns <= late_msg_ts_ns) {
1085 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1086 "Invalid live stream state: "
1087 "have a late message that is none included in a stream "
1088 "inactivity timespan: last-inactivity-ts-ns={}, "
1089 "late-msg-ts-ns={}",
1090 last_inactivity_ts_ns, late_msg_ts_ns);
1091 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1092 goto end;
1093 }
1094
1095 /*
1096 * We now know that it's okay for this message to be late, we can now
1097 * adjust its timestamp to ensure monotonicity.
1098 */
1099 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1100 "Adjusting the timestamp of late message: late-msg-type={}, "
1101 "msg-new-ts-ns={}",
1102 static_cast<bt2::MessageType>(bt_message_get_type(late_msg)),
1103 stream_iter->last_inactivity_ts.value);
1104 switch (bt_message_get_type(late_msg)) {
1105 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1106 adjust_status = adjust_discarded_events_message(
1107 lttng_live_msg_iter->self_msg_iter, stream_iter->stream, late_msg, &adjusted_message,
1108 stream_iter->last_inactivity_ts.value);
1109 break;
1110 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1111 adjust_status = adjust_discarded_packets_message(
1112 lttng_live_msg_iter->self_msg_iter, stream_iter->stream, late_msg, &adjusted_message,
1113 stream_iter->last_inactivity_ts.value);
1114 break;
1115 default:
1116 bt_common_abort();
1117 }
1118
1119 if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1120 stream_iter_status = adjust_status;
1121 goto end;
1122 }
1123
1124 BT_ASSERT_DBG(adjusted_message);
1125 stream_iter->current_msg = adjusted_message;
1126 stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
1127 bt_message_put_ref(late_msg);
1128
1129 end:
1130 return stream_iter_status;
1131 }
1132
1133 static enum lttng_live_iterator_status
1134 next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
1135 struct lttng_live_trace *live_trace,
1136 struct lttng_live_stream_iterator **youngest_trace_stream_iter)
1137 {
1138 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
1139 enum lttng_live_iterator_status stream_iter_status;
1140 int64_t youngest_candidate_msg_ts = INT64_MAX;
1141 uint64_t stream_iter_idx;
1142
1143 BT_ASSERT_DBG(live_trace);
1144 BT_ASSERT_DBG(live_trace->stream_iterators);
1145
1146 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1147 "Finding the next stream iterator for trace: "
1148 "trace-id={}",
1149 live_trace->id);
1150 /*
1151 * Update the current message of every stream iterators of this trace.
1152 * The current msg of every stream must have a timestamp equal or
1153 * larger than the last message returned by this iterator. We must
1154 * ensure monotonicity.
1155 */
1156 stream_iter_idx = 0;
1157 while (stream_iter_idx < live_trace->stream_iterators->len) {
1158 bool stream_iter_is_ended = false;
1159 struct lttng_live_stream_iterator *stream_iter =
1160 (lttng_live_stream_iterator *) g_ptr_array_index(live_trace->stream_iterators,
1161 stream_iter_idx);
1162
1163 /*
1164 * If there is no current message for this stream, go fetch
1165 * one.
1166 */
1167 while (!stream_iter->current_msg) {
1168 const bt_message *msg = NULL;
1169 int64_t curr_msg_ts_ns = INT64_MAX;
1170
1171 stream_iter_status =
1172 lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, &msg);
1173
1174 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1175 stream_iter_is_ended = true;
1176 break;
1177 }
1178
1179 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1180 goto end;
1181 }
1182
1183 BT_ASSERT_DBG(msg);
1184
1185 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1186 "Live stream iterator returned message: msg-type={}, "
1187 "stream-name=\"{}\", viewer-stream-id={}",
1188 static_cast<bt2::MessageType>(bt_message_get_type(msg)),
1189 stream_iter->name->str, stream_iter->viewer_stream_id);
1190
1191 /*
1192 * Get the timestamp in nanoseconds from origin of this
1193 * message.
1194 */
1195 live_get_msg_ts_ns(lttng_live_msg_iter, msg, lttng_live_msg_iter->last_msg_ts_ns,
1196 &curr_msg_ts_ns);
1197
1198 /*
1199 * Check if the message of the current live stream
1200 * iterator occurred at the exact same time or after the
1201 * last message returned by this component's message
1202 * iterator. If not, we need to handle it with care.
1203 */
1204 if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
1205 stream_iter->current_msg = msg;
1206 stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
1207 } else {
1208 /*
1209 * We received a message from the past. This
1210 * may be fixable but it can also be an error.
1211 */
1212 stream_iter_status =
1213 handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, msg);
1214 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1215 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1216 "Late message could not be handled correctly: "
1217 "lttng-live-msg-iter-addr={}, "
1218 "stream-name=\"{}\", "
1219 "curr-msg-ts={}, last-msg-ts={}",
1220 fmt::ptr(lttng_live_msg_iter),
1221 stream_iter->name->str, curr_msg_ts_ns,
1222 lttng_live_msg_iter->last_msg_ts_ns);
1223 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1224 goto end;
1225 }
1226 }
1227 }
1228
1229 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1230
1231 if (!stream_iter_is_ended) {
1232 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1233 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1234 /*
1235 * Update the current best candidate message
1236 * for the stream iterator of this live trace
1237 * to be forwarded downstream.
1238 */
1239 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1240 youngest_candidate_stream_iter = stream_iter;
1241 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1242 /*
1243 * Order the messages in an arbitrary but
1244 * deterministic way.
1245 */
1246 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1247 int ret = common_muxing_compare_messages(
1248 stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
1249 if (ret < 0) {
1250 /*
1251 * The `youngest_candidate_stream_iter->current_msg`
1252 * should go first. Update the next
1253 * iterator and the current timestamp.
1254 */
1255 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1256 youngest_candidate_stream_iter = stream_iter;
1257 } else if (ret == 0) {
1258 /*
1259 * Unable to pick which one should go
1260 * first.
1261 */
1262 BT_CPPLOGW_SPEC(
1263 lttng_live_msg_iter->logger,
1264 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1265 "stream-iter-addr={}"
1266 "stream-iter-addr={}",
1267 fmt::ptr(stream_iter), fmt::ptr(youngest_candidate_stream_iter));
1268 }
1269 }
1270
1271 stream_iter_idx++;
1272 } else {
1273 /*
1274 * The live stream iterator has ended. That
1275 * iterator is removed from the array, but
1276 * there is no need to increment
1277 * stream_iter_idx as
1278 * g_ptr_array_remove_index_fast replaces the
1279 * removed element with the array's last
1280 * element.
1281 */
1282 g_ptr_array_remove_index_fast(live_trace->stream_iterators, stream_iter_idx);
1283 }
1284 }
1285
1286 if (youngest_candidate_stream_iter) {
1287 *youngest_trace_stream_iter = youngest_candidate_stream_iter;
1288 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1289 } else {
1290 /*
1291 * The only case where we don't have a candidate for this trace
1292 * is if we reached the end of all the iterators.
1293 */
1294 BT_ASSERT(live_trace->stream_iterators->len == 0);
1295 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1296 }
1297
1298 end:
1299 return stream_iter_status;
1300 }
1301
1302 static enum lttng_live_iterator_status
1303 next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
1304 struct lttng_live_session *session,
1305 struct lttng_live_stream_iterator **youngest_session_stream_iter)
1306 {
1307 enum lttng_live_iterator_status stream_iter_status;
1308 uint64_t trace_idx = 0;
1309 int64_t youngest_candidate_msg_ts = INT64_MAX;
1310 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
1311
1312 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1313 "Finding the next stream iterator for session: "
1314 "session-id={}",
1315 session->id);
1316 /*
1317 * Make sure we are attached to the session and look for new streams
1318 * and metadata.
1319 */
1320 stream_iter_status = lttng_live_get_session(lttng_live_msg_iter, session);
1321 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
1322 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE &&
1323 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) {
1324 goto end;
1325 }
1326
1327 BT_ASSERT_DBG(session->traces);
1328
1329 while (trace_idx < session->traces->len) {
1330 bool trace_is_ended = false;
1331 struct lttng_live_stream_iterator *stream_iter;
1332 struct lttng_live_trace *trace =
1333 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
1334
1335 stream_iter_status =
1336 next_stream_iterator_for_trace(lttng_live_msg_iter, trace, &stream_iter);
1337 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1338 /*
1339 * All the live stream iterators for this trace are
1340 * ENDed. Remove the trace from this session.
1341 */
1342 trace_is_ended = true;
1343 } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1344 goto end;
1345 }
1346
1347 if (!trace_is_ended) {
1348 BT_ASSERT_DBG(stream_iter);
1349
1350 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1351 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1352 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1353 youngest_candidate_stream_iter = stream_iter;
1354 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1355 /*
1356 * Order the messages in an arbitrary but
1357 * deterministic way.
1358 */
1359 int ret = common_muxing_compare_messages(
1360 stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
1361 if (ret < 0) {
1362 /*
1363 * The `youngest_candidate_stream_iter->current_msg`
1364 * should go first. Update the next iterator
1365 * and the current timestamp.
1366 */
1367 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1368 youngest_candidate_stream_iter = stream_iter;
1369 } else if (ret == 0) {
1370 /* Unable to pick which one should go first. */
1371 BT_CPPLOGW_SPEC(
1372 lttng_live_msg_iter->logger,
1373 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1374 "stream-iter-addr={}"
1375 "youngest-candidate-stream-iter-addr={}",
1376 fmt::ptr(stream_iter), fmt::ptr(youngest_candidate_stream_iter));
1377 }
1378 }
1379 trace_idx++;
1380 } else {
1381 /*
1382 * trace_idx is not incremented since
1383 * g_ptr_array_remove_index_fast replaces the
1384 * element at trace_idx with the array's last element.
1385 */
1386 g_ptr_array_remove_index_fast(session->traces, trace_idx);
1387 }
1388 }
1389 if (youngest_candidate_stream_iter) {
1390 *youngest_session_stream_iter = youngest_candidate_stream_iter;
1391 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1392 } else {
1393 /*
1394 * The only cases where we don't have a candidate for this
1395 * trace is:
1396 * 1. if we reached the end of all the iterators of all the
1397 * traces of this session,
1398 * 2. if we never had live stream iterator in the first place.
1399 *
1400 * In either cases, we return END.
1401 */
1402 BT_ASSERT(session->traces->len == 0);
1403 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1404 }
1405 end:
1406 return stream_iter_status;
1407 }
1408
1409 static inline void put_messages(bt_message_array_const msgs, uint64_t count)
1410 {
1411 uint64_t i;
1412
1413 for (i = 0; i < count; i++) {
1414 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
1415 }
1416 }
1417
1418 bt_message_iterator_class_next_method_status
1419 lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array_const msgs,
1420 uint64_t capacity, uint64_t *count)
1421 {
1422 try {
1423 bt_message_iterator_class_next_method_status status;
1424 enum lttng_live_viewer_status viewer_status;
1425 struct lttng_live_msg_iter *lttng_live_msg_iter =
1426 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_it);
1427 struct lttng_live_component *lttng_live = lttng_live_msg_iter->lttng_live_comp;
1428 enum lttng_live_iterator_status stream_iter_status;
1429 uint64_t session_idx;
1430
1431 *count = 0;
1432
1433 BT_ASSERT_DBG(lttng_live_msg_iter);
1434
1435 if (G_UNLIKELY(lttng_live_msg_iter->was_interrupted)) {
1436 /*
1437 * The iterator was interrupted in a previous call to the
1438 * `_next()` method. We currently do not support generating
1439 * messages after such event. The babeltrace2 CLI should never
1440 * be running the graph after being interrupted. So this check
1441 * is to prevent other graph users from using this live
1442 * iterator in an messed up internal state.
1443 */
1444 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1445 BT_CPPLOGE_APPEND_CAUSE_SPEC(
1446 lttng_live_msg_iter->logger,
1447 "Message iterator was interrupted during a previous call to the `next()` and currently does not support continuing after such event.");
1448 goto end;
1449 }
1450
1451 /*
1452 * Clear all the invalid message reference that might be left over in
1453 * the output array.
1454 */
1455 memset(msgs, 0, capacity * sizeof(*msgs));
1456
1457 /*
1458 * If no session are exposed on the relay found at the url provided by
1459 * the user, session count will be 0. In this case, we return status
1460 * end to return gracefully.
1461 */
1462 if (lttng_live_msg_iter->sessions->len == 0) {
1463 if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
1464 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1465 goto end;
1466 } else {
1467 /*
1468 * The are no more active session for this session
1469 * name. Retry to create a viewer session for the
1470 * requested session name.
1471 */
1472 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1473 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1474 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1475 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1476 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1477 "Error creating LTTng live viewer session");
1478 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1479 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1480 } else {
1481 bt_common_abort();
1482 }
1483 goto end;
1484 }
1485 }
1486 }
1487
1488 if (lttng_live_msg_iter->active_stream_iter == 0) {
1489 lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter);
1490 }
1491
1492 /*
1493 * Here the muxing of message is done.
1494 *
1495 * We need to iterate over all the streams of all the traces of all the
1496 * viewer sessions in order to get the message with the smallest
1497 * timestamp. In this case, a session is a viewer session and there is
1498 * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
1499 * kernel). Each viewer session can have multiple traces, for example,
1500 * 64bit UST viewer sessions could have multiple per-pid traces.
1501 *
1502 * We iterate over the streams of each traces to update and see what is
1503 * their next message's timestamp. From those timestamps, we select the
1504 * message with the smallest timestamp as the best candidate message
1505 * for that trace and do the same thing across all the sessions.
1506 *
1507 * We then compare the timestamp of best candidate message of all the
1508 * sessions to pick the message with the smallest timestamp and we
1509 * return it.
1510 */
1511 while (*count < capacity) {
1512 struct lttng_live_stream_iterator *youngest_stream_iter = NULL,
1513 *candidate_stream_iter = NULL;
1514 int64_t youngest_msg_ts_ns = INT64_MAX;
1515
1516 BT_ASSERT_DBG(lttng_live_msg_iter->sessions);
1517 session_idx = 0;
1518 while (session_idx < lttng_live_msg_iter->sessions->len) {
1519 struct lttng_live_session *session = (lttng_live_session *) g_ptr_array_index(
1520 lttng_live_msg_iter->sessions, session_idx);
1521
1522 /* Find the best candidate message to send downstream. */
1523 stream_iter_status = next_stream_iterator_for_session(lttng_live_msg_iter, session,
1524 &candidate_stream_iter);
1525
1526 /* If we receive an END status, it means that either:
1527 * - Those traces never had active streams (UST with no
1528 * data produced yet),
1529 * - All live stream iterators have ENDed.*/
1530 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1531 if (session->closed && session->traces->len == 0) {
1532 /*
1533 * Remove the session from the list.
1534 * session_idx is not modified since
1535 * g_ptr_array_remove_index_fast
1536 * replaces the the removed element with
1537 * the array's last element.
1538 */
1539 g_ptr_array_remove_index_fast(lttng_live_msg_iter->sessions, session_idx);
1540 } else {
1541 session_idx++;
1542 }
1543 continue;
1544 }
1545
1546 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1547 goto return_status;
1548 }
1549
1550 if (G_UNLIKELY(youngest_stream_iter == NULL) ||
1551 candidate_stream_iter->current_msg_ts_ns < youngest_msg_ts_ns) {
1552 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1553 youngest_stream_iter = candidate_stream_iter;
1554 } else if (candidate_stream_iter->current_msg_ts_ns == youngest_msg_ts_ns) {
1555 /*
1556 * The currently selected message to be sent
1557 * downstream next has the exact same timestamp
1558 * that of the current candidate message. We
1559 * must break the tie in a predictable manner.
1560 */
1561 BT_CPPLOGD_STR_SPEC(
1562 lttng_live_msg_iter->logger,
1563 "Two of the next message candidates have the same timestamps, pick one deterministically.");
1564 /*
1565 * Order the messages in an arbitrary but
1566 * deterministic way.
1567 */
1568 int ret = common_muxing_compare_messages(candidate_stream_iter->current_msg,
1569 youngest_stream_iter->current_msg);
1570 if (ret < 0) {
1571 /*
1572 * The `candidate_stream_iter->current_msg`
1573 * should go first. Update the next
1574 * iterator and the current timestamp.
1575 */
1576 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1577 youngest_stream_iter = candidate_stream_iter;
1578 } else if (ret == 0) {
1579 /* Unable to pick which one should go first. */
1580 BT_CPPLOGW_SPEC(
1581 lttng_live_msg_iter->logger,
1582 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1583 "next-stream-iter-addr={}"
1584 "candidate-stream-iter-addr={}",
1585 fmt::ptr(youngest_stream_iter), fmt::ptr(candidate_stream_iter));
1586 }
1587 }
1588
1589 session_idx++;
1590 }
1591
1592 if (!youngest_stream_iter) {
1593 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1594 goto return_status;
1595 }
1596
1597 BT_ASSERT_DBG(youngest_stream_iter->current_msg);
1598 /* Ensure monotonicity. */
1599 BT_ASSERT_DBG(lttng_live_msg_iter->last_msg_ts_ns <=
1600 youngest_stream_iter->current_msg_ts_ns);
1601
1602 /*
1603 * Insert the next message to the message batch. This will set
1604 * stream iterator current message to NULL so that next time
1605 * we fetch the next message of that stream iterator
1606 */
1607 BT_MESSAGE_MOVE_REF(msgs[*count], youngest_stream_iter->current_msg);
1608 (*count)++;
1609
1610 /* Update the last timestamp in nanoseconds sent downstream. */
1611 lttng_live_msg_iter->last_msg_ts_ns = youngest_msg_ts_ns;
1612 youngest_stream_iter->current_msg_ts_ns = INT64_MAX;
1613
1614 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1615 }
1616
1617 return_status:
1618 switch (stream_iter_status) {
1619 case LTTNG_LIVE_ITERATOR_STATUS_OK:
1620 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
1621 /*
1622 * If we gathered messages, return _OK even if the graph was
1623 * interrupted. This allows for the components downstream to at
1624 * least get the those messages. If the graph was indeed
1625 * interrupted there should not be another _next() call as the
1626 * application will tear down the graph. This component class
1627 * doesn't support restarting after an interruption.
1628 */
1629 if (*count > 0) {
1630 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
1631 } else {
1632 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1633 }
1634 break;
1635 case LTTNG_LIVE_ITERATOR_STATUS_END:
1636 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1637 break;
1638 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
1639 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1640 "Memory error preparing the next batch of messages: "
1641 "live-iter-status={}",
1642 stream_iter_status);
1643 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1644 break;
1645 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
1646 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
1647 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
1648 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1649 "Error preparing the next batch of messages: "
1650 "live-iter-status={}",
1651 stream_iter_status);
1652
1653 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1654 /* Put all existing messages on error. */
1655 put_messages(msgs, *count);
1656 break;
1657 default:
1658 bt_common_abort();
1659 }
1660
1661 end:
1662 return status;
1663 } catch (const std::bad_alloc&) {
1664 return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1665 } catch (const bt2::Error&) {
1666 return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1667 }
1668 }
1669
1670 static struct lttng_live_msg_iter *
1671 lttng_live_msg_iter_create(struct lttng_live_component *lttng_live_comp,
1672 bt_self_message_iterator *self_msg_it)
1673 {
1674 lttng_live_msg_iter *msg_iter = new lttng_live_msg_iter {lttng_live_comp->logger};
1675 msg_iter->self_comp = lttng_live_comp->self_comp;
1676 msg_iter->lttng_live_comp = lttng_live_comp;
1677 msg_iter->self_msg_iter = self_msg_it;
1678
1679 msg_iter->active_stream_iter = 0;
1680 msg_iter->last_msg_ts_ns = INT64_MIN;
1681 msg_iter->was_interrupted = false;
1682
1683 msg_iter->sessions =
1684 g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_session);
1685 BT_ASSERT(msg_iter->sessions);
1686
1687 return msg_iter;
1688 }
1689
1690 bt_message_iterator_class_initialize_method_status
1691 lttng_live_msg_iter_init(bt_self_message_iterator *self_msg_it,
1692 bt_self_message_iterator_configuration *, bt_self_component_port_output *)
1693 {
1694 try {
1695 bt_message_iterator_class_initialize_method_status status;
1696 struct lttng_live_component *lttng_live;
1697 struct lttng_live_msg_iter *lttng_live_msg_iter;
1698 enum lttng_live_viewer_status viewer_status;
1699 bt_self_component *self_comp = bt_self_message_iterator_borrow_component(self_msg_it);
1700
1701 lttng_live = (lttng_live_component *) bt_self_component_get_data(self_comp);
1702
1703 /* There can be only one downstream iterator at the same time. */
1704 BT_ASSERT(!lttng_live->has_msg_iter);
1705 lttng_live->has_msg_iter = true;
1706
1707 lttng_live_msg_iter = lttng_live_msg_iter_create(lttng_live, self_msg_it);
1708 if (!lttng_live_msg_iter) {
1709 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live->logger,
1710 "Failed to create lttng_live_msg_iter");
1711 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1712 goto error;
1713 }
1714
1715 viewer_status = live_viewer_connection_create(
1716 lttng_live->params.url->str, false, lttng_live_msg_iter, lttng_live_msg_iter->logger,
1717 &lttng_live_msg_iter->viewer_connection);
1718 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1719 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1720 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1721 "Failed to create viewer connection");
1722 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1723 /*
1724 * Interruption in the _iter_init() method is not
1725 * supported. Return an error.
1726 */
1727 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1728 "Interrupted while creating viewer connection");
1729 }
1730
1731 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1732 goto error;
1733 }
1734
1735 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1736 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1737 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1738 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1739 "Failed to create viewer session");
1740 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1741 /*
1742 * Interruption in the _iter_init() method is not
1743 * supported. Return an error.
1744 */
1745 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1746 "Interrupted when creating viewer session");
1747 }
1748
1749 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1750 goto error;
1751 }
1752
1753 if (lttng_live_msg_iter->sessions->len == 0) {
1754 switch (lttng_live->params.sess_not_found_act) {
1755 case SESSION_NOT_FOUND_ACTION_CONTINUE:
1756 BT_CPPLOGI_SPEC(
1757 lttng_live_msg_iter->logger,
1758 "Unable to connect to the requested live viewer session. "
1759 "Keep trying to connect because of {}=\"{}\" component parameter: url=\"{}\"",
1760 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR,
1761 lttng_live->params.url->str);
1762 break;
1763 case SESSION_NOT_FOUND_ACTION_FAIL:
1764 BT_CPPLOGE_APPEND_CAUSE_SPEC(
1765 lttng_live_msg_iter->logger,
1766 "Unable to connect to the requested live viewer session. "
1767 "Fail the message iterator initialization because of {}=\"{}\" "
1768 "component parameter: url =\"{}\"",
1769 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_FAIL_STR,
1770 lttng_live->params.url->str);
1771 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1772 goto error;
1773 case SESSION_NOT_FOUND_ACTION_END:
1774 BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger,
1775 "Unable to connect to the requested live viewer session. "
1776 "End gracefully at the first _next() call because of {}=\"{}\""
1777 " component parameter: url=\"{}\"",
1778 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_END_STR,
1779 lttng_live->params.url->str);
1780 break;
1781 default:
1782 bt_common_abort();
1783 }
1784 }
1785
1786 bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
1787 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
1788 goto end;
1789
1790 error:
1791 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
1792 end:
1793 return status;
1794 } catch (const std::bad_alloc&) {
1795 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1796 } catch (const bt2::Error&) {
1797 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1798 }
1799 }
1800
1801 static struct bt_param_validation_map_value_entry_descr list_sessions_params[] = {
1802 {URL_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
1803 bt_param_validation_value_descr::makeString()},
1804 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
1805
1806 static bt_component_class_query_method_status
1807 lttng_live_query_list_sessions(const bt_value *params, const bt_value **result,
1808 const bt2c::Logger& logger)
1809 {
1810 bt_component_class_query_method_status status;
1811 const bt_value *url_value = NULL;
1812 const char *url;
1813 struct live_viewer_connection *viewer_connection = NULL;
1814 enum lttng_live_viewer_status viewer_status;
1815 enum bt_param_validation_status validation_status;
1816 gchar *validate_error = NULL;
1817
1818 validation_status = bt_param_validation_validate(params, list_sessions_params, &validate_error);
1819 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
1820 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1821 goto error;
1822 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
1823 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1824 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "{}", validate_error);
1825 goto error;
1826 }
1827
1828 url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
1829 url = bt_value_string_get(url_value);
1830
1831 viewer_status = live_viewer_connection_create(url, true, NULL, logger, &viewer_connection);
1832 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1833 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1834 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Failed to create viewer connection");
1835 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1836 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1837 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
1838 } else {
1839 bt_common_abort();
1840 }
1841 goto error;
1842 }
1843
1844 status = live_viewer_connection_list_sessions(viewer_connection, result);
1845 if (status != BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK) {
1846 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Failed to list viewer sessions");
1847 goto error;
1848 }
1849
1850 goto end;
1851
1852 error:
1853 BT_VALUE_PUT_REF_AND_RESET(*result);
1854
1855 if (status >= 0) {
1856 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1857 }
1858
1859 end:
1860 if (viewer_connection) {
1861 live_viewer_connection_destroy(viewer_connection);
1862 }
1863
1864 g_free(validate_error);
1865
1866 return status;
1867 }
1868
1869 static bt_component_class_query_method_status
1870 lttng_live_query_support_info(const bt_value *params, const bt_value **result,
1871 const bt2c::Logger& logger)
1872 {
1873 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
1874 const bt_value *input_type_value;
1875 const bt_value *input_value;
1876 double weight = 0;
1877 struct bt_common_lttng_live_url_parts parts = {};
1878
1879 /* Used by the logging macros */
1880 __attribute__((unused)) bt_self_component *self_comp = NULL;
1881
1882 *result = NULL;
1883 input_type_value = bt_value_map_borrow_entry_value_const(params, "type");
1884 if (!input_type_value) {
1885 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Missing expected `type` parameter.");
1886 goto error;
1887 }
1888
1889 if (!bt_value_is_string(input_type_value)) {
1890 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "`type` parameter is not a string value.");
1891 goto error;
1892 }
1893
1894 if (strcmp(bt_value_string_get(input_type_value), "string") != 0) {
1895 /* We don't handle file system paths */
1896 goto create_result;
1897 }
1898
1899 input_value = bt_value_map_borrow_entry_value_const(params, "input");
1900 if (!input_value) {
1901 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Missing expected `input` parameter.");
1902 goto error;
1903 }
1904
1905 if (!bt_value_is_string(input_value)) {
1906 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "`input` parameter is not a string value.");
1907 goto error;
1908 }
1909
1910 parts = bt_common_parse_lttng_live_url(bt_value_string_get(input_value), NULL, 0);
1911 if (parts.session_name) {
1912 /*
1913 * Looks pretty much like an LTTng live URL: we got the
1914 * session name part, which forms a complete URL.
1915 */
1916 weight = .75;
1917 }
1918
1919 create_result:
1920 *result = bt_value_real_create_init(weight);
1921 if (!*result) {
1922 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1923 goto error;
1924 }
1925
1926 goto end;
1927
1928 error:
1929 if (status >= 0) {
1930 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1931 }
1932
1933 BT_ASSERT(!*result);
1934
1935 end:
1936 bt_common_destroy_lttng_live_url_parts(&parts);
1937 return status;
1938 }
1939
1940 bt_component_class_query_method_status lttng_live_query(bt_self_component_class_source *comp_class,
1941 bt_private_query_executor *priv_query_exec,
1942 const char *object, const bt_value *params,
1943 __attribute__((unused)) void *method_data,
1944 const bt_value **result)
1945 {
1946 try {
1947 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
1948 bt2c::Logger logger {bt2::SelfComponentClass {comp_class},
1949 bt2::PrivateQueryExecutor {priv_query_exec},
1950 "PLUGIN/SRC.CTF.LTTNG-LIVE/QUERY"};
1951
1952 if (strcmp(object, "sessions") == 0) {
1953 status = lttng_live_query_list_sessions(params, result, logger);
1954 } else if (strcmp(object, "babeltrace.support-info") == 0) {
1955 status = lttng_live_query_support_info(params, result, logger);
1956 } else {
1957 BT_CPPLOGI_SPEC(logger, "Unknown query object `{}`", object);
1958 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT;
1959 goto end;
1960 }
1961
1962 end:
1963 return status;
1964 } catch (const std::bad_alloc&) {
1965 return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1966 } catch (const bt2::Error&) {
1967 return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1968 }
1969 }
1970
1971 static void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
1972 {
1973 if (!lttng_live) {
1974 return;
1975 }
1976
1977 if (lttng_live->params.url) {
1978 g_string_free(lttng_live->params.url, TRUE);
1979 }
1980
1981 delete lttng_live;
1982 }
1983
1984 void lttng_live_component_finalize(bt_self_component_source *component)
1985 {
1986 lttng_live_component *data = (lttng_live_component *) bt_self_component_get_data(
1987 bt_self_component_source_as_self_component(component));
1988
1989 if (!data) {
1990 return;
1991 }
1992 lttng_live_component_destroy_data(data);
1993 }
1994
1995 static enum session_not_found_action
1996 parse_session_not_found_action_param(const bt_value *no_session_param)
1997 {
1998 enum session_not_found_action action;
1999 const char *no_session_act_str = bt_value_string_get(no_session_param);
2000
2001 if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) {
2002 action = SESSION_NOT_FOUND_ACTION_CONTINUE;
2003 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) {
2004 action = SESSION_NOT_FOUND_ACTION_FAIL;
2005 } else {
2006 BT_ASSERT(strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0);
2007 action = SESSION_NOT_FOUND_ACTION_END;
2008 }
2009
2010 return action;
2011 }
2012
2013 static bt_param_validation_value_descr inputs_elem_descr =
2014 bt_param_validation_value_descr::makeString();
2015
2016 static const char *sess_not_found_action_choices[] = {
2017 SESS_NOT_FOUND_ACTION_CONTINUE_STR,
2018 SESS_NOT_FOUND_ACTION_FAIL_STR,
2019 SESS_NOT_FOUND_ACTION_END_STR,
2020 };
2021
2022 static struct bt_param_validation_map_value_entry_descr params_descr[] = {
2023 {INPUTS_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
2024 bt_param_validation_value_descr::makeArray(1, 1, inputs_elem_descr)},
2025 {SESS_NOT_FOUND_ACTION_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
2026 bt_param_validation_value_descr::makeString(sess_not_found_action_choices)},
2027 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
2028
2029 static bt_component_class_initialize_method_status
2030 lttng_live_component_create(const bt_value *params, bt_self_component_source *self_comp,
2031 struct lttng_live_component **component)
2032 {
2033 struct lttng_live_component *lttng_live = NULL;
2034 const bt_value *inputs_value;
2035 const bt_value *url_value;
2036 const bt_value *value;
2037 const char *url;
2038 enum bt_param_validation_status validation_status;
2039 gchar *validation_error = NULL;
2040 bt_component_class_initialize_method_status status;
2041 bt2c::Logger logger {bt2::SelfSourceComponent {self_comp}, "PLUGIN/SRC.CTF.LTTNG-LIVE/COMP"};
2042
2043 validation_status = bt_param_validation_validate(params, params_descr, &validation_error);
2044 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
2045 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2046 goto error;
2047 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
2048 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "{}", validation_error);
2049 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
2050 goto error;
2051 }
2052
2053 lttng_live = new lttng_live_component {std::move(logger)};
2054 lttng_live->self_comp = bt_self_component_source_as_self_component(self_comp);
2055 lttng_live->max_query_size = MAX_QUERY_SIZE;
2056 lttng_live->has_msg_iter = false;
2057
2058 inputs_value = bt_value_map_borrow_entry_value_const(params, INPUTS_PARAM);
2059 url_value = bt_value_array_borrow_element_by_index_const(inputs_value, 0);
2060 url = bt_value_string_get(url_value);
2061
2062 lttng_live->params.url = g_string_new(url);
2063 if (!lttng_live->params.url) {
2064 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2065 goto error;
2066 }
2067
2068 value = bt_value_map_borrow_entry_value_const(params, SESS_NOT_FOUND_ACTION_PARAM);
2069 if (value) {
2070 lttng_live->params.sess_not_found_act = parse_session_not_found_action_param(value);
2071 } else {
2072 BT_CPPLOGI_SPEC(lttng_live->logger,
2073 "Optional `{}` parameter is missing: defaulting to `{}`.",
2074 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR);
2075 lttng_live->params.sess_not_found_act = SESSION_NOT_FOUND_ACTION_CONTINUE;
2076 }
2077
2078 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
2079 goto end;
2080
2081 error:
2082 lttng_live_component_destroy_data(lttng_live);
2083 lttng_live = NULL;
2084 end:
2085 g_free(validation_error);
2086
2087 *component = lttng_live;
2088 return status;
2089 }
2090
2091 bt_component_class_initialize_method_status
2092 lttng_live_component_init(bt_self_component_source *self_comp_src,
2093 bt_self_component_source_configuration *, const bt_value *params, void *)
2094 {
2095 try {
2096 struct lttng_live_component *lttng_live;
2097 bt_component_class_initialize_method_status ret;
2098 bt_self_component *self_comp = bt_self_component_source_as_self_component(self_comp_src);
2099 bt_self_component_add_port_status add_port_status;
2100
2101 ret = lttng_live_component_create(params, self_comp_src, &lttng_live);
2102 if (ret != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
2103 goto error;
2104 }
2105
2106 add_port_status =
2107 bt_self_component_source_add_output_port(self_comp_src, "out", NULL, NULL);
2108 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
2109 ret = (bt_component_class_initialize_method_status) add_port_status;
2110 goto end;
2111 }
2112
2113 bt_self_component_set_data(self_comp, lttng_live);
2114 goto end;
2115
2116 error:
2117 lttng_live_component_destroy_data(lttng_live);
2118 lttng_live = NULL;
2119 end:
2120 return ret;
2121 } catch (const std::bad_alloc&) {
2122 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2123 } catch (const bt2::Error&) {
2124 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
2125 }
2126 }
This page took 0.070317 seconds and 3 git commands to generate.