85955bc7af178da62640250e322eb2189d801b77
[babeltrace.git] / src / plugins / ctf / lttng-live / lttng-live.cpp
1 /*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
5 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
6 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
7 *
8 * Babeltrace CTF LTTng-live Client Component
9 */
10
11 #include <glib.h>
12 #include <unistd.h>
13
14 #include "common/assert.h"
15 #include "cpp-common/bt2c/fmt.hpp"
16 #include "cpp-common/bt2c/glib-up.hpp"
17 #include "cpp-common/bt2c/vector.hpp"
18 #include "cpp-common/bt2s/make-unique.hpp"
19 #include "cpp-common/vendor/fmt/format.h"
20
21 #include "plugins/common/muxing/muxing.h"
22 #include "plugins/common/param-validation/param-validation.h"
23
24 #include "data-stream.hpp"
25 #include "lttng-live.hpp"
26 #include "metadata.hpp"
27
28 #define MAX_QUERY_SIZE (256 * 1024)
29 #define URL_PARAM "url"
30 #define INPUTS_PARAM "inputs"
31 #define SESS_NOT_FOUND_ACTION_PARAM "session-not-found-action"
32 #define SESS_NOT_FOUND_ACTION_CONTINUE_STR "continue"
33 #define SESS_NOT_FOUND_ACTION_FAIL_STR "fail"
34 #define SESS_NOT_FOUND_ACTION_END_STR "end"
35
36 void lttng_live_stream_iterator_set_state(struct lttng_live_stream_iterator *stream_iter,
37 enum lttng_live_stream_state new_state)
38 {
39 BT_CPPLOGD_SPEC(stream_iter->logger,
40 "Setting live stream iterator state: viewer-stream-id={}, "
41 "old-state={}, new-state={}",
42 stream_iter->viewer_stream_id, stream_iter->state, new_state);
43
44 stream_iter->state = new_state;
45 }
46
47 #define LTTNG_LIVE_LOGD_STREAM_ITER(live_stream_iter) \
48 do { \
49 BT_CPPLOGD_SPEC((live_stream_iter)->logger, \
50 "Live stream iterator state={}, " \
51 "last-inact-ts-is-set={}, last-inact-ts-value={}, " \
52 "curr-inact-ts={}", \
53 (live_stream_iter)->state, (live_stream_iter)->last_inactivity_ts.is_set, \
54 (live_stream_iter)->last_inactivity_ts.value, \
55 (live_stream_iter)->current_inactivity_ts); \
56 } while (0);
57
58 bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter)
59 {
60 bool ret;
61
62 if (!msg_iter) {
63 ret = false;
64 goto end;
65 }
66
67 ret = bt_self_message_iterator_is_interrupted(msg_iter->self_msg_iter);
68
69 end:
70 return ret;
71 }
72
73 static struct lttng_live_trace *
74 lttng_live_session_borrow_trace_by_id(struct lttng_live_session *session, uint64_t trace_id)
75 {
76 uint64_t trace_idx;
77 struct lttng_live_trace *ret_trace = NULL;
78
79 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
80 struct lttng_live_trace *trace =
81 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
82 if (trace->id == trace_id) {
83 ret_trace = trace;
84 goto end;
85 }
86 }
87
88 end:
89 return ret_trace;
90 }
91
92 static void lttng_live_destroy_trace(struct lttng_live_trace *trace)
93 {
94 BT_CPPLOGD_SPEC(trace->logger, "Destroying live trace: trace-id={}", trace->id);
95
96 delete trace;
97 }
98
99 static struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
100 uint64_t trace_id)
101 {
102 BT_CPPLOGD_SPEC(session->logger, "Creating live trace: session-id={}, trace-id={}", session->id,
103 trace_id);
104
105 lttng_live_trace *trace = new lttng_live_trace {session->logger};
106 trace->session = session;
107 trace->id = trace_id;
108 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
109 g_ptr_array_add(session->traces, trace);
110
111 return trace;
112 }
113
114 struct lttng_live_trace *
115 lttng_live_session_borrow_or_create_trace_by_id(struct lttng_live_session *session,
116 uint64_t trace_id)
117 {
118 struct lttng_live_trace *trace;
119
120 trace = lttng_live_session_borrow_trace_by_id(session, trace_id);
121 if (trace) {
122 goto end;
123 }
124
125 /* The session is the owner of the newly created trace. */
126 trace = lttng_live_create_trace(session, trace_id);
127
128 end:
129 return trace;
130 }
131
132 int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint64_t session_id,
133 const char *hostname, const char *session_name)
134 {
135 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
136 "Adding live session: "
137 "session-id={}, hostname=\"{}\", session-name=\"{}\"",
138 session_id, hostname, session_name);
139
140 lttng_live_session *session = new lttng_live_session {lttng_live_msg_iter->logger};
141 session->self_comp = lttng_live_msg_iter->self_comp;
142 session->id = session_id;
143 session->traces = g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_trace);
144 BT_ASSERT(session->traces);
145 session->lttng_live_msg_iter = lttng_live_msg_iter;
146 session->new_streams_needed = true;
147 session->hostname = g_string_new(hostname);
148 BT_ASSERT(session->hostname);
149
150 session->session_name = g_string_new(session_name);
151 BT_ASSERT(session->session_name);
152
153 g_ptr_array_add(lttng_live_msg_iter->sessions, session);
154
155 return 0;
156 }
157
158 static void lttng_live_destroy_session(struct lttng_live_session *session)
159 {
160 if (!session) {
161 goto end;
162 }
163
164 BT_CPPLOGD_SPEC(session->logger,
165 "Destroying live session: "
166 "session-id={}, session-name=\"{}\"",
167 session->id, session->session_name->str);
168 if (session->id != -1ULL) {
169 if (lttng_live_session_detach(session)) {
170 if (!lttng_live_graph_is_canceled(session->lttng_live_msg_iter)) {
171 /* Old relayd cannot detach sessions. */
172 BT_CPPLOGD_SPEC(session->logger, "Unable to detach lttng live session {}",
173 session->id);
174 }
175 }
176 session->id = -1ULL;
177 }
178
179 if (session->traces) {
180 g_ptr_array_free(session->traces, TRUE);
181 }
182
183 if (session->hostname) {
184 g_string_free(session->hostname, TRUE);
185 }
186
187 if (session->session_name) {
188 g_string_free(session->session_name, TRUE);
189 }
190
191 delete session;
192
193 end:
194 return;
195 }
196
197 static void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter)
198 {
199 if (!lttng_live_msg_iter) {
200 goto end;
201 }
202
203 if (lttng_live_msg_iter->sessions) {
204 g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
205 }
206
207 if (lttng_live_msg_iter->viewer_connection) {
208 live_viewer_connection_destroy(lttng_live_msg_iter->viewer_connection);
209 }
210 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
211 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
212
213 /* All stream iterators must be destroyed at this point. */
214 BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0);
215 lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false;
216
217 delete lttng_live_msg_iter;
218
219 end:
220 return;
221 }
222
223 void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
224 {
225 struct lttng_live_msg_iter *lttng_live_msg_iter;
226
227 BT_ASSERT(self_msg_iter);
228
229 lttng_live_msg_iter =
230 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_iter);
231 BT_ASSERT(lttng_live_msg_iter);
232 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
233 }
234
235 static enum lttng_live_iterator_status
236 lttng_live_iterator_next_check_stream_state(struct lttng_live_stream_iterator *lttng_live_stream)
237 {
238 switch (lttng_live_stream->state) {
239 case LTTNG_LIVE_STREAM_QUIESCENT:
240 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
241 break;
242 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
243 /* Invalid state. */
244 BT_CPPLOGF_SPEC(lttng_live_stream->logger, "Unexpected stream state \"ACTIVE_NO_DATA\"");
245 bt_common_abort();
246 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
247 /* Invalid state. */
248 BT_CPPLOGF_SPEC(lttng_live_stream->logger, "Unexpected stream state \"QUIESCENT_NO_DATA\"");
249 bt_common_abort();
250 case LTTNG_LIVE_STREAM_EOF:
251 break;
252 }
253 return LTTNG_LIVE_ITERATOR_STATUS_OK;
254 }
255
256 /*
257 * For active no data stream, fetch next index. As a result of that it can
258 * become either:
259 * - quiescent: won't have events for a bit,
260 * - have data: need to get that data and produce the event,
261 * - have no data on this stream at this point: need to retry (AGAIN) or return
262 * EOF.
263 */
264 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
265 struct lttng_live_msg_iter *lttng_live_msg_iter,
266 struct lttng_live_stream_iterator *lttng_live_stream)
267 {
268 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
269 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
270 struct packet_index index;
271
272 if (lttng_live_stream->trace->metadata_stream_state ==
273 LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
274 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
275 "Need to get an update for the metadata stream before proceeding "
276 "further with this stream: stream-name=\"{}\"",
277 lttng_live_stream->name);
278 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
279 goto end;
280 }
281
282 if (lttng_live_stream->trace->session->new_streams_needed) {
283 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
284 "Need to get an update of all streams before proceeding further "
285 "with this stream: stream-name=\"{}\"",
286 lttng_live_stream->name);
287 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
288 goto end;
289 }
290
291 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
292 lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
293 goto end;
294 }
295 ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream, &index);
296 if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
297 goto end;
298 }
299
300 BT_ASSERT_DBG(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
301
302 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
303 uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts.value,
304 curr_inact_ts = lttng_live_stream->current_inactivity_ts;
305
306 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA && last_inact_ts == curr_inact_ts) {
307 /*
308 * Because the stream is in the QUIESCENT_NO_DATA
309 * state, we can assert that the last_inactivity_ts was
310 * set and can be safely used in the `if` above.
311 */
312 BT_ASSERT(lttng_live_stream->last_inactivity_ts.is_set);
313
314 ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
315 LTTNG_LIVE_LOGD_STREAM_ITER(lttng_live_stream);
316 } else {
317 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
318 }
319 goto end;
320 }
321
322 lttng_live_stream->base_offset = index.offset;
323 lttng_live_stream->offset = index.offset;
324 lttng_live_stream->len = index.packet_size / CHAR_BIT;
325
326 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
327 "Setting live stream reading info: stream-name=\"{}\", "
328 "viewer-stream-id={}, stream-base-offset={}, stream-offset={}, stream-len={}",
329 lttng_live_stream->name, lttng_live_stream->viewer_stream_id,
330 lttng_live_stream->base_offset, lttng_live_stream->offset,
331 lttng_live_stream->len);
332
333 end:
334 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
335 ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
336 }
337 return ret;
338 }
339
340 /*
341 * Creation of the message requires the ctf trace class to be created
342 * beforehand, but the live protocol gives us all streams (including metadata)
343 * at once. So we split it in three steps: getting streams, getting metadata
344 * (which creates the ctf trace class), and then creating the per-stream
345 * messages.
346 */
347 static enum lttng_live_iterator_status
348 lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
349 struct lttng_live_session *session)
350 {
351 enum lttng_live_iterator_status status;
352 uint64_t trace_idx;
353
354 if (!session->attached) {
355 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Attach to session: session-id={}",
356 session->id);
357 enum lttng_live_viewer_status attach_status =
358 lttng_live_session_attach(session, lttng_live_msg_iter->self_msg_iter);
359 if (attach_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
360 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
361 /*
362 * Clear any causes appended in
363 * `lttng_live_attach_session()` as we want to
364 * return gracefully since the graph was
365 * cancelled.
366 */
367 bt_current_thread_clear_error();
368 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
369 } else {
370 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
371 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
372 "Error attaching to LTTng live session");
373 }
374 goto end;
375 }
376 }
377
378 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
379 "Updating all data streams: "
380 "session-id={}, session-name=\"{}\"",
381 session->id, session->session_name->str);
382
383 status = lttng_live_session_get_new_streams(session, lttng_live_msg_iter->self_msg_iter);
384 switch (status) {
385 case LTTNG_LIVE_ITERATOR_STATUS_OK:
386 break;
387 case LTTNG_LIVE_ITERATOR_STATUS_END:
388 /*
389 * We received a `_END` from the `_get_new_streams()` function,
390 * which means no more data will ever be received from the data
391 * streams of this session. But it's possible that the metadata
392 * is incomplete.
393 * The live protocol guarantees that we receive all the
394 * metadata needed before we receive data streams needing it.
395 * But it's possible to receive metadata NOT needed by
396 * data streams after the session was closed. For example, this
397 * could happen if a new event is registered and the session is
398 * stopped before any tracepoint for that event is actually
399 * fired.
400 */
401 BT_CPPLOGD_SPEC(
402 lttng_live_msg_iter->logger,
403 "Updating streams returned _END status. Override status to _OK in order fetch any remaining metadata:"
404 "session-id={}, session-name=\"{}\"",
405 session->id, session->session_name->str);
406 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
407 break;
408 default:
409 goto end;
410 }
411
412 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
413 "Updating metadata stream for session: "
414 "session-id={}, session-name=\"{}\"",
415 session->id, session->session_name->str);
416
417 trace_idx = 0;
418 while (trace_idx < session->traces->len) {
419 struct lttng_live_trace *trace =
420 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
421
422 status = lttng_live_metadata_update(trace);
423 switch (status) {
424 case LTTNG_LIVE_ITERATOR_STATUS_END:
425 case LTTNG_LIVE_ITERATOR_STATUS_OK:
426 trace_idx++;
427 break;
428 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
429 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
430 goto end;
431 default:
432 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
433 "Error updating trace metadata: "
434 "stream-iter-status={}, trace-id={}",
435 status, trace->id);
436 goto end;
437 }
438 }
439
440 /*
441 * Now that we have the metadata we can initialize the downstream
442 * iterator.
443 */
444 status = lttng_live_lazy_msg_init(session, lttng_live_msg_iter->self_msg_iter);
445
446 end:
447 return status;
448 }
449
450 static void
451 lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
452 {
453 uint64_t session_idx, trace_idx;
454
455 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
456 struct lttng_live_session *session =
457 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
458 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
459 "Force marking session as needing new streams: "
460 "session-id={}",
461 session->id);
462 session->new_streams_needed = true;
463 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
464 struct lttng_live_trace *trace =
465 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
466 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
467 "Force marking trace metadata state as needing an update: "
468 "session-id={}, trace-id={}",
469 session->id, trace->id);
470
471 BT_ASSERT(trace->metadata_stream_state != LTTNG_LIVE_METADATA_STREAM_STATE_CLOSED);
472
473 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
474 }
475 }
476 }
477
478 static enum lttng_live_iterator_status
479 lttng_live_iterator_handle_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
480 {
481 enum lttng_live_iterator_status status;
482 enum lttng_live_viewer_status viewer_status;
483 uint64_t session_idx = 0, nr_sessions_opened = 0;
484 struct lttng_live_session *session;
485 enum session_not_found_action sess_not_found_act =
486 lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
487
488 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
489 "Update data and metadata of all sessions: "
490 "live-msg-iter-addr={}",
491 fmt::ptr(lttng_live_msg_iter));
492 /*
493 * In a remotely distant future, we could add a "new
494 * session" flag to the protocol, which would tell us that we
495 * need to query for new sessions even though we have sessions
496 * currently ongoing.
497 */
498 if (lttng_live_msg_iter->sessions->len == 0) {
499 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
500 BT_CPPLOGD_SPEC(
501 lttng_live_msg_iter->logger,
502 "No session found. Exiting in accordance with the `session-not-found-action` parameter");
503 status = LTTNG_LIVE_ITERATOR_STATUS_END;
504 goto end;
505 } else {
506 BT_CPPLOGD_SPEC(
507 lttng_live_msg_iter->logger,
508 "No session found. Try creating a new one in accordance with the `session-not-found-action` parameter");
509 /*
510 * Retry to create a viewer session for the requested
511 * session name.
512 */
513 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
514 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
515 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
516 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
517 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
518 "Error creating LTTng live viewer session");
519 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
520 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
521 } else {
522 bt_common_abort();
523 }
524 goto end;
525 }
526 }
527 }
528
529 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
530 session =
531 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
532 status = lttng_live_get_session(lttng_live_msg_iter, session);
533 switch (status) {
534 case LTTNG_LIVE_ITERATOR_STATUS_OK:
535 case LTTNG_LIVE_ITERATOR_STATUS_END:
536 /*
537 * A session returned `_END`. Other sessions may still
538 * be active so we override the status and continue
539 * looping if needed.
540 */
541 break;
542 default:
543 goto end;
544 }
545 if (!session->closed) {
546 nr_sessions_opened++;
547 }
548 }
549
550 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE && nr_sessions_opened == 0) {
551 status = LTTNG_LIVE_ITERATOR_STATUS_END;
552 } else {
553 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
554 }
555
556 end:
557 return status;
558 }
559
560 static enum lttng_live_iterator_status
561 emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
562 struct lttng_live_stream_iterator *stream_iter,
563 bt2::ConstMessage::Shared& message, uint64_t timestamp)
564 {
565 BT_ASSERT(stream_iter->trace->clock_class);
566 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
567 "Emitting inactivity message for stream: ctf-stream-id={}, "
568 "viewer-stream-id={}, timestamp={}",
569 stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id,
570 timestamp);
571
572 const auto msg = bt_message_message_iterator_inactivity_create(
573 lttng_live_msg_iter->self_msg_iter, stream_iter->trace->clock_class, timestamp);
574
575 if (!msg) {
576 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
577 "Error emitting message iterator inactivity message");
578 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
579 }
580
581 message = bt2::ConstMessage::Shared::createWithoutRef(msg);
582 return LTTNG_LIVE_ITERATOR_STATUS_OK;
583 }
584
585 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
586 struct lttng_live_msg_iter *lttng_live_msg_iter,
587 struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message)
588 {
589 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
590
591 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
592 return LTTNG_LIVE_ITERATOR_STATUS_OK;
593 }
594
595 /*
596 * Check if we already sent an inactivty message downstream for this
597 * `current_inactivity_ts` value.
598 */
599 if (lttng_live_stream->last_inactivity_ts.is_set &&
600 lttng_live_stream->current_inactivity_ts == lttng_live_stream->last_inactivity_ts.value) {
601 lttng_live_stream_iterator_set_state(lttng_live_stream,
602 LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA);
603
604 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
605 goto end;
606 }
607
608 ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream, message,
609 lttng_live_stream->current_inactivity_ts);
610
611 lttng_live_stream->last_inactivity_ts.value = lttng_live_stream->current_inactivity_ts;
612 lttng_live_stream->last_inactivity_ts.is_set = true;
613 end:
614 return ret;
615 }
616
617 static int live_get_msg_ts_ns(struct lttng_live_msg_iter *lttng_live_msg_iter,
618 const bt_message *msg, int64_t last_msg_ts_ns, int64_t *ts_ns)
619 {
620 const bt_clock_snapshot *clock_snapshot = NULL;
621 int ret = 0;
622
623 BT_ASSERT_DBG(msg);
624 BT_ASSERT_DBG(ts_ns);
625
626 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
627 "Getting message's timestamp: iter-data-addr={}, msg-addr={}, "
628 "last-msg-ts={}",
629 fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns);
630
631 switch (bt_message_get_type(msg)) {
632 case BT_MESSAGE_TYPE_EVENT:
633 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(msg);
634 break;
635 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
636 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
637 break;
638 case BT_MESSAGE_TYPE_PACKET_END:
639 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
640 break;
641 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
642 clock_snapshot =
643 bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
644 break;
645 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
646 clock_snapshot =
647 bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
648 break;
649 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
650 clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const(msg);
651 break;
652 default:
653 /* All the other messages have a higher priority */
654 BT_CPPLOGD_STR_SPEC(lttng_live_msg_iter->logger,
655 "Message has no timestamp: using the last message timestamp.");
656 *ts_ns = last_msg_ts_ns;
657 goto end;
658 }
659
660 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
661 if (ret) {
662 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
663 "Cannot get nanoseconds from Epoch of clock snapshot: "
664 "clock-snapshot-addr={}",
665 fmt::ptr(clock_snapshot));
666 goto error;
667 }
668
669 goto end;
670
671 error:
672 ret = -1;
673
674 end:
675 if (ret == 0) {
676 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
677 "Found message's timestamp: iter-data-addr={}, msg-addr={}, "
678 "last-msg-ts={}, ts={}",
679 fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns, *ts_ns);
680 }
681
682 return ret;
683 }
684
685 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
686 struct lttng_live_msg_iter *lttng_live_msg_iter,
687 struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message)
688 {
689 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
690 enum ctf_msg_iter_status status;
691 uint64_t session_idx, trace_idx;
692
693 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
694 struct lttng_live_session *session =
695 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
696
697 if (session->new_streams_needed) {
698 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
699 "Need an update for streams: "
700 "session-id={}",
701 session->id);
702 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
703 goto end;
704 }
705 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
706 struct lttng_live_trace *trace =
707 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
708 if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
709 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
710 "Need an update for metadata stream: "
711 "session-id={}, trace-id={}",
712 session->id, trace->id);
713 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
714 goto end;
715 }
716 }
717 }
718
719 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
720 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
721 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
722 "Invalid state of live stream iterator"
723 "stream-iter-status={}",
724 lttng_live_stream->state);
725 goto end;
726 }
727
728 const bt_message *msg;
729 status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), &msg);
730 switch (status) {
731 case CTF_MSG_ITER_STATUS_EOF:
732 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
733 break;
734 case CTF_MSG_ITER_STATUS_OK:
735 message = bt2::ConstMessage::Shared::createWithoutRef(msg);
736 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
737 break;
738 case CTF_MSG_ITER_STATUS_AGAIN:
739 /*
740 * Continue immediately (end of packet). The next
741 * get_index may return AGAIN to delay the following
742 * attempt.
743 */
744 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
745 break;
746 case CTF_MSG_ITER_STATUS_ERROR:
747 default:
748 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
749 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
750 "CTF message iterator failed to get next message: "
751 "msg-iter={}, msg-iter-status={}",
752 fmt::ptr(lttng_live_stream->msg_iter), status);
753 break;
754 }
755
756 end:
757 return ret;
758 }
759
760 static enum lttng_live_iterator_status
761 lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
762 struct lttng_live_stream_iterator *stream_iter,
763 bt2::ConstMessage::Shared& curr_msg)
764 {
765 enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
766
767 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
768 "Closing live stream iterator: stream-name=\"{}\", "
769 "viewer-stream-id={}",
770 stream_iter->name, stream_iter->viewer_stream_id);
771
772 /*
773 * The viewer has hung up on us so we are closing the stream. The
774 * `ctf_msg_iter` should simply realize that it needs to close the
775 * stream properly by emitting the necessary stream end message.
776 */
777 const bt_message *msg;
778 enum ctf_msg_iter_status status =
779 ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), &msg);
780
781 if (status == CTF_MSG_ITER_STATUS_ERROR) {
782 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
783 "Error getting the next message from CTF message iterator");
784 live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
785 goto end;
786 } else if (status == CTF_MSG_ITER_STATUS_EOF) {
787 BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger,
788 "Reached the end of the live stream iterator.");
789 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
790 goto end;
791 }
792
793 BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
794
795 curr_msg = bt2::ConstMessage::Shared::createWithoutRef(msg);
796
797 end:
798 return live_status;
799 }
800
801 /*
802 * helper function:
803 * handle_no_data_streams()
804 * retry:
805 * - for each ACTIVE_NO_DATA stream:
806 * - query relayd for stream data, or quiescence info.
807 * - if need metadata, get metadata, goto retry.
808 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
809 * - if quiescent, move to QUIESCENT streams
810 * - if fetched data, move to ACTIVE_DATA streams
811 * (at this point each stream either has data, or is quiescent)
812 *
813 *
814 * iterator_next:
815 * handle_new_streams_and_metadata()
816 * - query relayd for known streams, add them as ACTIVE_NO_DATA
817 * - query relayd for metadata
818 *
819 * call handle_active_no_data_streams()
820 *
821 * handle_quiescent_streams()
822 * - if at least one stream is ACTIVE_DATA:
823 * - peek stream event with lowest timestamp -> next_ts
824 * - for each quiescent stream
825 * - if next_ts >= quiescent end
826 * - set state to ACTIVE_NO_DATA
827 * - else
828 * - for each quiescent stream
829 * - set state to ACTIVE_NO_DATA
830 *
831 * call handle_active_no_data_streams()
832 *
833 * handle_active_data_streams()
834 * - if at least one stream is ACTIVE_DATA:
835 * - get stream event with lowest timestamp from heap
836 * - make that stream event the current message.
837 * - move this stream heap position to its next event
838 * - if we need to fetch data from relayd, move
839 * stream to ACTIVE_NO_DATA.
840 * - return OK
841 * - return AGAIN
842 *
843 * end criterion: ctrl-c on client. If relayd exits or the session
844 * closes on the relay daemon side, we keep on waiting for streams.
845 * Eventually handle --end timestamp (also an end criterion).
846 *
847 * When disconnected from relayd: try to re-connect endlessly.
848 */
849 static enum lttng_live_iterator_status
850 lttng_live_iterator_next_msg_on_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
851 struct lttng_live_stream_iterator *stream_iter,
852 bt2::ConstMessage::Shared& curr_msg)
853 {
854 enum lttng_live_iterator_status live_status;
855
856 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
857 "Advancing live stream iterator until next message if possible: "
858 "stream-name=\"{}\", viewer-stream-id={}",
859 stream_iter->name, stream_iter->viewer_stream_id);
860
861 if (stream_iter->has_stream_hung_up) {
862 /*
863 * The stream has hung up and the stream was properly closed
864 * during the last call to the current function. Return _END
865 * status now so that this stream iterator is removed for the
866 * stream iterator list.
867 */
868 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
869 goto end;
870 }
871
872 retry:
873 LTTNG_LIVE_LOGD_STREAM_ITER(stream_iter);
874
875 /*
876 * Make sure we have the most recent metadata and possibly some new
877 * streams.
878 */
879 live_status = lttng_live_iterator_handle_new_streams_and_metadata(lttng_live_msg_iter);
880 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
881 goto end;
882 }
883
884 live_status =
885 lttng_live_iterator_next_handle_one_no_data_stream(lttng_live_msg_iter, stream_iter);
886 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
887 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
888 /*
889 * We overwrite `live_status` since `curr_msg` is
890 * likely set to a valid message in this function.
891 */
892 live_status =
893 lttng_live_iterator_close_stream(lttng_live_msg_iter, stream_iter, curr_msg);
894 }
895 goto end;
896 }
897
898 live_status = lttng_live_iterator_next_handle_one_quiescent_stream(lttng_live_msg_iter,
899 stream_iter, curr_msg);
900 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
901 BT_ASSERT(!curr_msg);
902 goto end;
903 }
904 if (curr_msg) {
905 goto end;
906 }
907 live_status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live_msg_iter,
908 stream_iter, curr_msg);
909 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
910 BT_ASSERT(!curr_msg);
911 }
912
913 end:
914 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
915 BT_CPPLOGD_SPEC(
916 lttng_live_msg_iter->logger,
917 "Ask the relay daemon for an updated view of the data and metadata streams");
918 goto retry;
919 }
920
921 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
922 "Returning from advancing live stream iterator: status={}, "
923 "stream-name=\"{}\", viewer-stream-id={}",
924 live_status, stream_iter->name, stream_iter->viewer_stream_id);
925
926 return live_status;
927 }
928
929 static bool is_discarded_packet_or_event_message(const bt2::ConstMessage msg)
930 {
931 return msg.type() == bt2::MessageType::DiscardedEvents ||
932 msg.type() == bt2::MessageType::DiscardedPackets;
933 }
934
935 static enum lttng_live_iterator_status
936 adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream,
937 const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out,
938 uint64_t new_begin_ts)
939 {
940 enum bt_property_availability availability;
941 const bt_clock_snapshot *clock_snapshot;
942 uint64_t end_ts;
943 uint64_t count;
944
945 clock_snapshot = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg_in);
946 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
947
948 availability = bt_message_discarded_packets_get_count(msg_in, &count);
949 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
950
951 const auto msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
952 iter, stream, new_begin_ts, end_ts);
953
954 if (!msg) {
955 return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
956 }
957
958 bt_message_discarded_packets_set_count(msg, count);
959 msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
960 return LTTNG_LIVE_ITERATOR_STATUS_OK;
961 }
962
963 static enum lttng_live_iterator_status
964 adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream,
965 const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out,
966 uint64_t new_begin_ts)
967 {
968 enum bt_property_availability availability;
969 const bt_clock_snapshot *clock_snapshot;
970 uint64_t end_ts;
971 uint64_t count;
972
973 clock_snapshot = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg_in);
974 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
975
976 availability = bt_message_discarded_events_get_count(msg_in, &count);
977 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
978
979 const auto msg = bt_message_discarded_events_create_with_default_clock_snapshots(
980 iter, stream, new_begin_ts, end_ts);
981
982 if (!msg) {
983 return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
984 }
985
986 bt_message_discarded_events_set_count(msg, count);
987 msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
988 return LTTNG_LIVE_ITERATOR_STATUS_OK;
989 }
990
991 static enum lttng_live_iterator_status
992 handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
993 struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns,
994 const bt2::ConstMessage& late_msg)
995 {
996 const bt_clock_class *clock_class;
997 const bt_stream_class *stream_class;
998 enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status;
999 int64_t last_inactivity_ts_ns;
1000 enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1001 enum lttng_live_iterator_status adjust_status;
1002 bt2::ConstMessage::Shared adjusted_message;
1003
1004 /*
1005 * The timestamp of the current message is before the last message sent
1006 * by this component. We CANNOT send it as is.
1007 *
1008 * The only expected scenario in which that could happen is the
1009 * following, everything else is a bug in this component, relay daemon,
1010 * or CTF parser.
1011 *
1012 * Expected scenario: The CTF message iterator emitted discarded
1013 * packets and discarded events with synthesized beginning and end
1014 * timestamps from the bounds of the last known packet and the newly
1015 * decoded packet header. The CTF message iterator is not aware of
1016 * stream inactivity beacons. Hence, we have to adjust the beginning
1017 * timestamp of those types of messages if a stream signalled its
1018 * inactivity up until _after_ the last known packet's beginning
1019 * timestamp.
1020 *
1021 * Otherwise, the monotonicity guarantee of message timestamps would
1022 * not be preserved.
1023 *
1024 * In short, the only scenario in which it's okay and fixable to
1025 * received a late message is when:
1026 * 1. the late message is a discarded packets or discarded events
1027 * message,
1028 * 2. this stream produced an inactivity message downstream, and
1029 * 3. the timestamp of the late message is within the inactivity
1030 * timespan we sent downstream through the inactivity message.
1031 */
1032
1033 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1034 "Handling late message on live stream iterator: "
1035 "stream-name=\"{}\", viewer-stream-id={}",
1036 stream_iter->name, stream_iter->viewer_stream_id);
1037
1038 if (!stream_iter->last_inactivity_ts.is_set) {
1039 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1040 "Invalid live stream state: "
1041 "have a late message when no inactivity message "
1042 "was ever sent for that stream.");
1043 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1044 goto end;
1045 }
1046
1047 if (!is_discarded_packet_or_event_message(late_msg)) {
1048 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1049 "Invalid live stream state: "
1050 "have a late message that is not a packet discarded or "
1051 "event discarded message: late-msg-type={}",
1052 late_msg.type());
1053 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1054 goto end;
1055 }
1056
1057 stream_class = bt_stream_borrow_class_const(stream_iter->stream->libObjPtr());
1058 clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class);
1059
1060 ts_ns_status = bt_clock_class_cycles_to_ns_from_origin(
1061 clock_class, stream_iter->last_inactivity_ts.value, &last_inactivity_ts_ns);
1062 if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) {
1063 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1064 "Error converting last "
1065 "inactivity message timestamp to nanoseconds");
1066 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1067 goto end;
1068 }
1069
1070 if (last_inactivity_ts_ns <= late_msg_ts_ns) {
1071 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1072 "Invalid live stream state: "
1073 "have a late message that is none included in a stream "
1074 "inactivity timespan: last-inactivity-ts-ns={}, "
1075 "late-msg-ts-ns={}",
1076 last_inactivity_ts_ns, late_msg_ts_ns);
1077 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1078 goto end;
1079 }
1080
1081 /*
1082 * We now know that it's okay for this message to be late, we can now
1083 * adjust its timestamp to ensure monotonicity.
1084 */
1085 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1086 "Adjusting the timestamp of late message: late-msg-type={}, "
1087 "msg-new-ts-ns={}",
1088 late_msg.type(), stream_iter->last_inactivity_ts.value);
1089 switch (late_msg.type()) {
1090 case bt2::MessageType::DiscardedEvents:
1091 adjust_status = adjust_discarded_events_message(
1092 lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(),
1093 late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
1094 break;
1095 case bt2::MessageType::DiscardedPackets:
1096 adjust_status = adjust_discarded_packets_message(
1097 lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(),
1098 late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
1099 break;
1100 default:
1101 bt_common_abort();
1102 }
1103
1104 if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1105 stream_iter_status = adjust_status;
1106 goto end;
1107 }
1108
1109 BT_ASSERT_DBG(adjusted_message);
1110 stream_iter->current_msg = adjusted_message;
1111 stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
1112
1113 end:
1114 return stream_iter_status;
1115 }
1116
1117 static enum lttng_live_iterator_status
1118 next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
1119 struct lttng_live_trace *live_trace,
1120 struct lttng_live_stream_iterator **youngest_trace_stream_iter)
1121 {
1122 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
1123 enum lttng_live_iterator_status stream_iter_status;
1124 int64_t youngest_candidate_msg_ts = INT64_MAX;
1125 uint64_t stream_iter_idx;
1126
1127 BT_ASSERT_DBG(live_trace);
1128
1129 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1130 "Finding the next stream iterator for trace: "
1131 "trace-id={}",
1132 live_trace->id);
1133 /*
1134 * Update the current message of every stream iterators of this trace.
1135 * The current msg of every stream must have a timestamp equal or
1136 * larger than the last message returned by this iterator. We must
1137 * ensure monotonicity.
1138 */
1139 stream_iter_idx = 0;
1140 while (stream_iter_idx < live_trace->stream_iterators.size()) {
1141 bool stream_iter_is_ended = false;
1142 lttng_live_stream_iterator *stream_iter =
1143 live_trace->stream_iterators[stream_iter_idx].get();
1144
1145 /*
1146 * If there is no current message for this stream, go fetch
1147 * one.
1148 */
1149 while (!stream_iter->current_msg) {
1150 bt2::ConstMessage::Shared msg;
1151 int64_t curr_msg_ts_ns = INT64_MAX;
1152
1153 stream_iter_status =
1154 lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, msg);
1155
1156 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1157 stream_iter_is_ended = true;
1158 break;
1159 }
1160
1161 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1162 goto end;
1163 }
1164
1165 BT_ASSERT_DBG(msg);
1166
1167 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1168 "Live stream iterator returned message: msg-type={}, "
1169 "stream-name=\"{}\", viewer-stream-id={}",
1170 msg->type(), stream_iter->name, stream_iter->viewer_stream_id);
1171
1172 /*
1173 * Get the timestamp in nanoseconds from origin of this
1174 * message.
1175 */
1176 live_get_msg_ts_ns(lttng_live_msg_iter, msg->libObjPtr(),
1177 lttng_live_msg_iter->last_msg_ts_ns, &curr_msg_ts_ns);
1178
1179 /*
1180 * Check if the message of the current live stream
1181 * iterator occurred at the exact same time or after the
1182 * last message returned by this component's message
1183 * iterator. If not, we need to handle it with care.
1184 */
1185 if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
1186 stream_iter->current_msg = std::move(msg);
1187 stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
1188 } else {
1189 /*
1190 * We received a message from the past. This
1191 * may be fixable but it can also be an error.
1192 */
1193 stream_iter_status =
1194 handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, *msg);
1195 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1196 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1197 "Late message could not be handled correctly: "
1198 "lttng-live-msg-iter-addr={}, "
1199 "stream-name=\"{}\", "
1200 "curr-msg-ts={}, last-msg-ts={}",
1201 fmt::ptr(lttng_live_msg_iter), stream_iter->name,
1202 curr_msg_ts_ns,
1203 lttng_live_msg_iter->last_msg_ts_ns);
1204 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1205 goto end;
1206 }
1207 }
1208 }
1209
1210 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1211
1212 if (!stream_iter_is_ended) {
1213 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1214 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1215 /*
1216 * Update the current best candidate message
1217 * for the stream iterator of this live trace
1218 * to be forwarded downstream.
1219 */
1220 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1221 youngest_candidate_stream_iter = stream_iter;
1222 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1223 /*
1224 * Order the messages in an arbitrary but
1225 * deterministic way.
1226 */
1227 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1228 int ret = common_muxing_compare_messages(
1229 stream_iter->current_msg->libObjPtr(),
1230 youngest_candidate_stream_iter->current_msg->libObjPtr());
1231 if (ret < 0) {
1232 /*
1233 * The `youngest_candidate_stream_iter->current_msg`
1234 * should go first. Update the next
1235 * iterator and the current timestamp.
1236 */
1237 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1238 youngest_candidate_stream_iter = stream_iter;
1239 } else if (ret == 0) {
1240 /*
1241 * Unable to pick which one should go
1242 * first.
1243 */
1244 BT_CPPLOGW_SPEC(
1245 lttng_live_msg_iter->logger,
1246 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1247 "stream-iter-addr={}"
1248 "stream-iter-addr={}",
1249 fmt::ptr(stream_iter), fmt::ptr(youngest_candidate_stream_iter));
1250 }
1251 }
1252
1253 stream_iter_idx++;
1254 } else {
1255 /*
1256 * The live stream iterator has ended. That
1257 * iterator is removed from the array, but
1258 * there is no need to increment
1259 * stream_iter_idx as
1260 * g_ptr_array_remove_index_fast replaces the
1261 * removed element with the array's last
1262 * element.
1263 */
1264 bt2c::vectorFastRemove(live_trace->stream_iterators, stream_iter_idx);
1265 }
1266 }
1267
1268 if (youngest_candidate_stream_iter) {
1269 *youngest_trace_stream_iter = youngest_candidate_stream_iter;
1270 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1271 } else {
1272 /*
1273 * The only case where we don't have a candidate for this trace
1274 * is if we reached the end of all the iterators.
1275 */
1276 BT_ASSERT(live_trace->stream_iterators.empty());
1277 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1278 }
1279
1280 end:
1281 return stream_iter_status;
1282 }
1283
1284 static enum lttng_live_iterator_status
1285 next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
1286 struct lttng_live_session *session,
1287 struct lttng_live_stream_iterator **youngest_session_stream_iter)
1288 {
1289 enum lttng_live_iterator_status stream_iter_status;
1290 uint64_t trace_idx = 0;
1291 int64_t youngest_candidate_msg_ts = INT64_MAX;
1292 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
1293
1294 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1295 "Finding the next stream iterator for session: "
1296 "session-id={}",
1297 session->id);
1298 /*
1299 * Make sure we are attached to the session and look for new streams
1300 * and metadata.
1301 */
1302 stream_iter_status = lttng_live_get_session(lttng_live_msg_iter, session);
1303 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
1304 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE &&
1305 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) {
1306 goto end;
1307 }
1308
1309 BT_ASSERT_DBG(session->traces);
1310
1311 while (trace_idx < session->traces->len) {
1312 bool trace_is_ended = false;
1313 struct lttng_live_stream_iterator *stream_iter;
1314 struct lttng_live_trace *trace =
1315 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
1316
1317 stream_iter_status =
1318 next_stream_iterator_for_trace(lttng_live_msg_iter, trace, &stream_iter);
1319 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1320 /*
1321 * All the live stream iterators for this trace are
1322 * ENDed. Remove the trace from this session.
1323 */
1324 trace_is_ended = true;
1325 } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1326 goto end;
1327 }
1328
1329 if (!trace_is_ended) {
1330 BT_ASSERT_DBG(stream_iter);
1331
1332 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1333 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1334 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1335 youngest_candidate_stream_iter = stream_iter;
1336 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1337 /*
1338 * Order the messages in an arbitrary but
1339 * deterministic way.
1340 */
1341 int ret = common_muxing_compare_messages(
1342 stream_iter->current_msg->libObjPtr(),
1343 youngest_candidate_stream_iter->current_msg->libObjPtr());
1344 if (ret < 0) {
1345 /*
1346 * The `youngest_candidate_stream_iter->current_msg`
1347 * should go first. Update the next iterator
1348 * and the current timestamp.
1349 */
1350 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1351 youngest_candidate_stream_iter = stream_iter;
1352 } else if (ret == 0) {
1353 /* Unable to pick which one should go first. */
1354 BT_CPPLOGW_SPEC(
1355 lttng_live_msg_iter->logger,
1356 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1357 "stream-iter-addr={}"
1358 "youngest-candidate-stream-iter-addr={}",
1359 fmt::ptr(stream_iter), fmt::ptr(youngest_candidate_stream_iter));
1360 }
1361 }
1362 trace_idx++;
1363 } else {
1364 /*
1365 * trace_idx is not incremented since
1366 * g_ptr_array_remove_index_fast replaces the
1367 * element at trace_idx with the array's last element.
1368 */
1369 g_ptr_array_remove_index_fast(session->traces, trace_idx);
1370 }
1371 }
1372 if (youngest_candidate_stream_iter) {
1373 *youngest_session_stream_iter = youngest_candidate_stream_iter;
1374 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1375 } else {
1376 /*
1377 * The only cases where we don't have a candidate for this
1378 * trace is:
1379 * 1. if we reached the end of all the iterators of all the
1380 * traces of this session,
1381 * 2. if we never had live stream iterator in the first place.
1382 *
1383 * In either cases, we return END.
1384 */
1385 BT_ASSERT(session->traces->len == 0);
1386 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1387 }
1388 end:
1389 return stream_iter_status;
1390 }
1391
1392 static inline void put_messages(bt_message_array_const msgs, uint64_t count)
1393 {
1394 uint64_t i;
1395
1396 for (i = 0; i < count; i++) {
1397 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
1398 }
1399 }
1400
1401 bt_message_iterator_class_next_method_status
1402 lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array_const msgs,
1403 uint64_t capacity, uint64_t *count)
1404 {
1405 try {
1406 bt_message_iterator_class_next_method_status status;
1407 enum lttng_live_viewer_status viewer_status;
1408 struct lttng_live_msg_iter *lttng_live_msg_iter =
1409 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_it);
1410 struct lttng_live_component *lttng_live = lttng_live_msg_iter->lttng_live_comp;
1411 enum lttng_live_iterator_status stream_iter_status;
1412 uint64_t session_idx;
1413
1414 *count = 0;
1415
1416 BT_ASSERT_DBG(lttng_live_msg_iter);
1417
1418 if (G_UNLIKELY(lttng_live_msg_iter->was_interrupted)) {
1419 /*
1420 * The iterator was interrupted in a previous call to the
1421 * `_next()` method. We currently do not support generating
1422 * messages after such event. The babeltrace2 CLI should never
1423 * be running the graph after being interrupted. So this check
1424 * is to prevent other graph users from using this live
1425 * iterator in an messed up internal state.
1426 */
1427 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1428 BT_CPPLOGE_APPEND_CAUSE_SPEC(
1429 lttng_live_msg_iter->logger,
1430 "Message iterator was interrupted during a previous call to the `next()` and currently does not support continuing after such event.");
1431 goto end;
1432 }
1433
1434 /*
1435 * Clear all the invalid message reference that might be left over in
1436 * the output array.
1437 */
1438 memset(msgs, 0, capacity * sizeof(*msgs));
1439
1440 /*
1441 * If no session are exposed on the relay found at the url provided by
1442 * the user, session count will be 0. In this case, we return status
1443 * end to return gracefully.
1444 */
1445 if (lttng_live_msg_iter->sessions->len == 0) {
1446 if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
1447 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1448 goto end;
1449 } else {
1450 /*
1451 * The are no more active session for this session
1452 * name. Retry to create a viewer session for the
1453 * requested session name.
1454 */
1455 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1456 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1457 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1458 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1459 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1460 "Error creating LTTng live viewer session");
1461 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1462 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1463 } else {
1464 bt_common_abort();
1465 }
1466 goto end;
1467 }
1468 }
1469 }
1470
1471 if (lttng_live_msg_iter->active_stream_iter == 0) {
1472 lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter);
1473 }
1474
1475 /*
1476 * Here the muxing of message is done.
1477 *
1478 * We need to iterate over all the streams of all the traces of all the
1479 * viewer sessions in order to get the message with the smallest
1480 * timestamp. In this case, a session is a viewer session and there is
1481 * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
1482 * kernel). Each viewer session can have multiple traces, for example,
1483 * 64bit UST viewer sessions could have multiple per-pid traces.
1484 *
1485 * We iterate over the streams of each traces to update and see what is
1486 * their next message's timestamp. From those timestamps, we select the
1487 * message with the smallest timestamp as the best candidate message
1488 * for that trace and do the same thing across all the sessions.
1489 *
1490 * We then compare the timestamp of best candidate message of all the
1491 * sessions to pick the message with the smallest timestamp and we
1492 * return it.
1493 */
1494 while (*count < capacity) {
1495 struct lttng_live_stream_iterator *youngest_stream_iter = NULL,
1496 *candidate_stream_iter = NULL;
1497 int64_t youngest_msg_ts_ns = INT64_MAX;
1498
1499 BT_ASSERT_DBG(lttng_live_msg_iter->sessions);
1500 session_idx = 0;
1501 while (session_idx < lttng_live_msg_iter->sessions->len) {
1502 struct lttng_live_session *session = (lttng_live_session *) g_ptr_array_index(
1503 lttng_live_msg_iter->sessions, session_idx);
1504
1505 /* Find the best candidate message to send downstream. */
1506 stream_iter_status = next_stream_iterator_for_session(lttng_live_msg_iter, session,
1507 &candidate_stream_iter);
1508
1509 /* If we receive an END status, it means that either:
1510 * - Those traces never had active streams (UST with no
1511 * data produced yet),
1512 * - All live stream iterators have ENDed.
1513 */
1514 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1515 if (session->closed && session->traces->len == 0) {
1516 /*
1517 * Remove the session from the list.
1518 * session_idx is not modified since
1519 * g_ptr_array_remove_index_fast
1520 * replaces the the removed element with
1521 * the array's last element.
1522 */
1523 g_ptr_array_remove_index_fast(lttng_live_msg_iter->sessions, session_idx);
1524 } else {
1525 session_idx++;
1526 }
1527 continue;
1528 }
1529
1530 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1531 goto return_status;
1532 }
1533
1534 if (G_UNLIKELY(youngest_stream_iter == NULL) ||
1535 candidate_stream_iter->current_msg_ts_ns < youngest_msg_ts_ns) {
1536 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1537 youngest_stream_iter = candidate_stream_iter;
1538 } else if (candidate_stream_iter->current_msg_ts_ns == youngest_msg_ts_ns) {
1539 /*
1540 * The currently selected message to be sent
1541 * downstream next has the exact same timestamp
1542 * that of the current candidate message. We
1543 * must break the tie in a predictable manner.
1544 */
1545 BT_CPPLOGD_STR_SPEC(
1546 lttng_live_msg_iter->logger,
1547 "Two of the next message candidates have the same timestamps, pick one deterministically.");
1548 /*
1549 * Order the messages in an arbitrary but
1550 * deterministic way.
1551 */
1552 int ret = common_muxing_compare_messages(
1553 candidate_stream_iter->current_msg->libObjPtr(),
1554 youngest_stream_iter->current_msg->libObjPtr());
1555 if (ret < 0) {
1556 /*
1557 * The `candidate_stream_iter->current_msg`
1558 * should go first. Update the next
1559 * iterator and the current timestamp.
1560 */
1561 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1562 youngest_stream_iter = candidate_stream_iter;
1563 } else if (ret == 0) {
1564 /* Unable to pick which one should go first. */
1565 BT_CPPLOGW_SPEC(
1566 lttng_live_msg_iter->logger,
1567 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1568 "next-stream-iter-addr={}"
1569 "candidate-stream-iter-addr={}",
1570 fmt::ptr(youngest_stream_iter), fmt::ptr(candidate_stream_iter));
1571 }
1572 }
1573
1574 session_idx++;
1575 }
1576
1577 if (!youngest_stream_iter) {
1578 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1579 goto return_status;
1580 }
1581
1582 BT_ASSERT_DBG(youngest_stream_iter->current_msg);
1583 /* Ensure monotonicity. */
1584 BT_ASSERT_DBG(lttng_live_msg_iter->last_msg_ts_ns <=
1585 youngest_stream_iter->current_msg_ts_ns);
1586
1587 /*
1588 * Insert the next message to the message batch. This will set
1589 * stream iterator current message to NULL so that next time
1590 * we fetch the next message of that stream iterator
1591 */
1592 msgs[*count] = youngest_stream_iter->current_msg.release().libObjPtr();
1593 (*count)++;
1594
1595 /* Update the last timestamp in nanoseconds sent downstream. */
1596 lttng_live_msg_iter->last_msg_ts_ns = youngest_msg_ts_ns;
1597 youngest_stream_iter->current_msg_ts_ns = INT64_MAX;
1598
1599 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1600 }
1601
1602 return_status:
1603 switch (stream_iter_status) {
1604 case LTTNG_LIVE_ITERATOR_STATUS_OK:
1605 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
1606 /*
1607 * If we gathered messages, return _OK even if the graph was
1608 * interrupted. This allows for the components downstream to at
1609 * least get the those messages. If the graph was indeed
1610 * interrupted there should not be another _next() call as the
1611 * application will tear down the graph. This component class
1612 * doesn't support restarting after an interruption.
1613 */
1614 if (*count > 0) {
1615 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
1616 } else {
1617 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1618 }
1619 break;
1620 case LTTNG_LIVE_ITERATOR_STATUS_END:
1621 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1622 break;
1623 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
1624 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1625 "Memory error preparing the next batch of messages: "
1626 "live-iter-status={}",
1627 stream_iter_status);
1628 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1629 break;
1630 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
1631 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
1632 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
1633 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1634 "Error preparing the next batch of messages: "
1635 "live-iter-status={}",
1636 stream_iter_status);
1637
1638 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1639 /* Put all existing messages on error. */
1640 put_messages(msgs, *count);
1641 break;
1642 default:
1643 bt_common_abort();
1644 }
1645
1646 end:
1647 return status;
1648 } catch (const std::bad_alloc&) {
1649 return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1650 } catch (const bt2::Error&) {
1651 return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1652 }
1653 }
1654
1655 static struct lttng_live_msg_iter *
1656 lttng_live_msg_iter_create(struct lttng_live_component *lttng_live_comp,
1657 bt_self_message_iterator *self_msg_it)
1658 {
1659 lttng_live_msg_iter *msg_iter = new lttng_live_msg_iter {lttng_live_comp->logger};
1660 msg_iter->self_comp = lttng_live_comp->self_comp;
1661 msg_iter->lttng_live_comp = lttng_live_comp;
1662 msg_iter->self_msg_iter = self_msg_it;
1663
1664 msg_iter->active_stream_iter = 0;
1665 msg_iter->last_msg_ts_ns = INT64_MIN;
1666 msg_iter->was_interrupted = false;
1667
1668 msg_iter->sessions =
1669 g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_session);
1670 BT_ASSERT(msg_iter->sessions);
1671
1672 return msg_iter;
1673 }
1674
1675 bt_message_iterator_class_initialize_method_status
1676 lttng_live_msg_iter_init(bt_self_message_iterator *self_msg_it,
1677 bt_self_message_iterator_configuration *, bt_self_component_port_output *)
1678 {
1679 try {
1680 bt_message_iterator_class_initialize_method_status status;
1681 struct lttng_live_component *lttng_live;
1682 struct lttng_live_msg_iter *lttng_live_msg_iter;
1683 enum lttng_live_viewer_status viewer_status;
1684 bt_self_component *self_comp = bt_self_message_iterator_borrow_component(self_msg_it);
1685
1686 lttng_live = (lttng_live_component *) bt_self_component_get_data(self_comp);
1687
1688 /* There can be only one downstream iterator at the same time. */
1689 BT_ASSERT(!lttng_live->has_msg_iter);
1690 lttng_live->has_msg_iter = true;
1691
1692 lttng_live_msg_iter = lttng_live_msg_iter_create(lttng_live, self_msg_it);
1693 if (!lttng_live_msg_iter) {
1694 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live->logger,
1695 "Failed to create lttng_live_msg_iter");
1696 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1697 goto error;
1698 }
1699
1700 viewer_status = live_viewer_connection_create(
1701 lttng_live->params.url.c_str(), false, lttng_live_msg_iter, lttng_live_msg_iter->logger,
1702 &lttng_live_msg_iter->viewer_connection);
1703 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1704 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1705 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1706 "Failed to create viewer connection");
1707 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1708 /*
1709 * Interruption in the _iter_init() method is not
1710 * supported. Return an error.
1711 */
1712 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1713 "Interrupted while creating viewer connection");
1714 }
1715
1716 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1717 goto error;
1718 }
1719
1720 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1721 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1722 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1723 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1724 "Failed to create viewer session");
1725 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1726 /*
1727 * Interruption in the _iter_init() method is not
1728 * supported. Return an error.
1729 */
1730 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1731 "Interrupted when creating viewer session");
1732 }
1733
1734 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1735 goto error;
1736 }
1737
1738 if (lttng_live_msg_iter->sessions->len == 0) {
1739 switch (lttng_live->params.sess_not_found_act) {
1740 case SESSION_NOT_FOUND_ACTION_CONTINUE:
1741 BT_CPPLOGI_SPEC(
1742 lttng_live_msg_iter->logger,
1743 "Unable to connect to the requested live viewer session. "
1744 "Keep trying to connect because of {}=\"{}\" component parameter: url=\"{}\"",
1745 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR,
1746 lttng_live->params.url);
1747 break;
1748 case SESSION_NOT_FOUND_ACTION_FAIL:
1749 BT_CPPLOGE_APPEND_CAUSE_SPEC(
1750 lttng_live_msg_iter->logger,
1751 "Unable to connect to the requested live viewer session. "
1752 "Fail the message iterator initialization because of {}=\"{}\" "
1753 "component parameter: url =\"{}\"",
1754 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_FAIL_STR,
1755 lttng_live->params.url);
1756 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1757 goto error;
1758 case SESSION_NOT_FOUND_ACTION_END:
1759 BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger,
1760 "Unable to connect to the requested live viewer session. "
1761 "End gracefully at the first _next() call because of {}=\"{}\""
1762 " component parameter: url=\"{}\"",
1763 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_END_STR,
1764 lttng_live->params.url);
1765 break;
1766 default:
1767 bt_common_abort();
1768 }
1769 }
1770
1771 bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
1772 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
1773 goto end;
1774
1775 error:
1776 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
1777 end:
1778 return status;
1779 } catch (const std::bad_alloc&) {
1780 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1781 } catch (const bt2::Error&) {
1782 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1783 }
1784 }
1785
1786 static struct bt_param_validation_map_value_entry_descr list_sessions_params[] = {
1787 {URL_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
1788 bt_param_validation_value_descr::makeString()},
1789 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
1790
1791 static bt_component_class_query_method_status
1792 lttng_live_query_list_sessions(const bt_value *params, const bt_value **result,
1793 const bt2c::Logger& logger)
1794 {
1795 bt_component_class_query_method_status status;
1796 const bt_value *url_value = NULL;
1797 const char *url;
1798 struct live_viewer_connection *viewer_connection = NULL;
1799 enum lttng_live_viewer_status viewer_status;
1800 enum bt_param_validation_status validation_status;
1801 gchar *validate_error = NULL;
1802
1803 validation_status = bt_param_validation_validate(params, list_sessions_params, &validate_error);
1804 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
1805 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1806 goto error;
1807 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
1808 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1809 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "{}", validate_error);
1810 goto error;
1811 }
1812
1813 url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
1814 url = bt_value_string_get(url_value);
1815
1816 viewer_status = live_viewer_connection_create(url, true, NULL, logger, &viewer_connection);
1817 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1818 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1819 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Failed to create viewer connection");
1820 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1821 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1822 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
1823 } else {
1824 bt_common_abort();
1825 }
1826 goto error;
1827 }
1828
1829 status = live_viewer_connection_list_sessions(viewer_connection, result);
1830 if (status != BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK) {
1831 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Failed to list viewer sessions");
1832 goto error;
1833 }
1834
1835 goto end;
1836
1837 error:
1838 BT_VALUE_PUT_REF_AND_RESET(*result);
1839
1840 if (status >= 0) {
1841 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1842 }
1843
1844 end:
1845 if (viewer_connection) {
1846 live_viewer_connection_destroy(viewer_connection);
1847 }
1848
1849 g_free(validate_error);
1850
1851 return status;
1852 }
1853
1854 static bt_component_class_query_method_status
1855 lttng_live_query_support_info(const bt_value *params, const bt_value **result,
1856 const bt2c::Logger& logger)
1857 {
1858 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
1859 const bt_value *input_type_value;
1860 const bt_value *input_value;
1861 double weight = 0;
1862 struct bt_common_lttng_live_url_parts parts = {};
1863
1864 /* Used by the logging macros */
1865 __attribute__((unused)) bt_self_component *self_comp = NULL;
1866
1867 *result = NULL;
1868 input_type_value = bt_value_map_borrow_entry_value_const(params, "type");
1869 if (!input_type_value) {
1870 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Missing expected `type` parameter.");
1871 goto error;
1872 }
1873
1874 if (!bt_value_is_string(input_type_value)) {
1875 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "`type` parameter is not a string value.");
1876 goto error;
1877 }
1878
1879 if (strcmp(bt_value_string_get(input_type_value), "string") != 0) {
1880 /* We don't handle file system paths */
1881 goto create_result;
1882 }
1883
1884 input_value = bt_value_map_borrow_entry_value_const(params, "input");
1885 if (!input_value) {
1886 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Missing expected `input` parameter.");
1887 goto error;
1888 }
1889
1890 if (!bt_value_is_string(input_value)) {
1891 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "`input` parameter is not a string value.");
1892 goto error;
1893 }
1894
1895 parts = bt_common_parse_lttng_live_url(bt_value_string_get(input_value), NULL, 0);
1896 if (parts.session_name) {
1897 /*
1898 * Looks pretty much like an LTTng live URL: we got the
1899 * session name part, which forms a complete URL.
1900 */
1901 weight = .75;
1902 }
1903
1904 create_result:
1905 *result = bt_value_real_create_init(weight);
1906 if (!*result) {
1907 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1908 goto error;
1909 }
1910
1911 goto end;
1912
1913 error:
1914 if (status >= 0) {
1915 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1916 }
1917
1918 BT_ASSERT(!*result);
1919
1920 end:
1921 bt_common_destroy_lttng_live_url_parts(&parts);
1922 return status;
1923 }
1924
1925 bt_component_class_query_method_status lttng_live_query(bt_self_component_class_source *comp_class,
1926 bt_private_query_executor *priv_query_exec,
1927 const char *object, const bt_value *params,
1928 __attribute__((unused)) void *method_data,
1929 const bt_value **result)
1930 {
1931 try {
1932 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
1933 bt2c::Logger logger {bt2::SelfComponentClass {comp_class},
1934 bt2::PrivateQueryExecutor {priv_query_exec},
1935 "PLUGIN/SRC.CTF.LTTNG-LIVE/QUERY"};
1936
1937 if (strcmp(object, "sessions") == 0) {
1938 status = lttng_live_query_list_sessions(params, result, logger);
1939 } else if (strcmp(object, "babeltrace.support-info") == 0) {
1940 status = lttng_live_query_support_info(params, result, logger);
1941 } else {
1942 BT_CPPLOGI_SPEC(logger, "Unknown query object `{}`", object);
1943 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT;
1944 goto end;
1945 }
1946
1947 end:
1948 return status;
1949 } catch (const std::bad_alloc&) {
1950 return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1951 } catch (const bt2::Error&) {
1952 return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1953 }
1954 }
1955
1956 void lttng_live_component_finalize(bt_self_component_source *component)
1957 {
1958 lttng_live_component::UP {static_cast<lttng_live_component *>(
1959 bt_self_component_get_data(bt_self_component_source_as_self_component(component)))};
1960 }
1961
1962 static enum session_not_found_action
1963 parse_session_not_found_action_param(const bt_value *no_session_param)
1964 {
1965 enum session_not_found_action action;
1966 const char *no_session_act_str = bt_value_string_get(no_session_param);
1967
1968 if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) {
1969 action = SESSION_NOT_FOUND_ACTION_CONTINUE;
1970 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) {
1971 action = SESSION_NOT_FOUND_ACTION_FAIL;
1972 } else {
1973 BT_ASSERT(strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0);
1974 action = SESSION_NOT_FOUND_ACTION_END;
1975 }
1976
1977 return action;
1978 }
1979
1980 static bt_param_validation_value_descr inputs_elem_descr =
1981 bt_param_validation_value_descr::makeString();
1982
1983 static const char *sess_not_found_action_choices[] = {
1984 SESS_NOT_FOUND_ACTION_CONTINUE_STR,
1985 SESS_NOT_FOUND_ACTION_FAIL_STR,
1986 SESS_NOT_FOUND_ACTION_END_STR,
1987 };
1988
1989 static struct bt_param_validation_map_value_entry_descr params_descr[] = {
1990 {INPUTS_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
1991 bt_param_validation_value_descr::makeArray(1, 1, inputs_elem_descr)},
1992 {SESS_NOT_FOUND_ACTION_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
1993 bt_param_validation_value_descr::makeString(sess_not_found_action_choices)},
1994 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
1995
1996 static bt_component_class_initialize_method_status
1997 lttng_live_component_create(const bt_value *params, bt_self_component_source *self_comp,
1998 lttng_live_component::UP& component)
1999 {
2000 const bt_value *inputs_value;
2001 const bt_value *url_value;
2002 const bt_value *value;
2003 enum bt_param_validation_status validation_status;
2004 gchar *validation_error = NULL;
2005 bt2c::Logger logger {bt2::SelfSourceComponent {self_comp}, "PLUGIN/SRC.CTF.LTTNG-LIVE/COMP"};
2006
2007 validation_status = bt_param_validation_validate(params, params_descr, &validation_error);
2008 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
2009 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2010 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
2011 bt2c::GCharUP errorFreer {validation_error};
2012 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "{}", validation_error);
2013 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
2014 }
2015
2016 auto lttng_live = bt2s::make_unique<lttng_live_component>(std::move(logger));
2017 lttng_live->self_comp = bt_self_component_source_as_self_component(self_comp);
2018 lttng_live->max_query_size = MAX_QUERY_SIZE;
2019 lttng_live->has_msg_iter = false;
2020
2021 inputs_value = bt_value_map_borrow_entry_value_const(params, INPUTS_PARAM);
2022 url_value = bt_value_array_borrow_element_by_index_const(inputs_value, 0);
2023 lttng_live->params.url = bt_value_string_get(url_value);
2024
2025 value = bt_value_map_borrow_entry_value_const(params, SESS_NOT_FOUND_ACTION_PARAM);
2026 if (value) {
2027 lttng_live->params.sess_not_found_act = parse_session_not_found_action_param(value);
2028 } else {
2029 BT_CPPLOGI_SPEC(lttng_live->logger,
2030 "Optional `{}` parameter is missing: defaulting to `{}`.",
2031 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR);
2032 lttng_live->params.sess_not_found_act = SESSION_NOT_FOUND_ACTION_CONTINUE;
2033 }
2034
2035 component = std::move(lttng_live);
2036 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
2037 }
2038
2039 bt_component_class_initialize_method_status
2040 lttng_live_component_init(bt_self_component_source *self_comp_src,
2041 bt_self_component_source_configuration *, const bt_value *params, void *)
2042 {
2043 try {
2044 lttng_live_component::UP lttng_live;
2045 bt_component_class_initialize_method_status ret;
2046 bt_self_component *self_comp = bt_self_component_source_as_self_component(self_comp_src);
2047 bt_self_component_add_port_status add_port_status;
2048
2049 ret = lttng_live_component_create(params, self_comp_src, lttng_live);
2050 if (ret != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
2051 return ret;
2052 }
2053
2054 add_port_status =
2055 bt_self_component_source_add_output_port(self_comp_src, "out", NULL, NULL);
2056 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
2057 ret = (bt_component_class_initialize_method_status) add_port_status;
2058 return ret;
2059 }
2060
2061 bt_self_component_set_data(self_comp, lttng_live.release());
2062
2063 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
2064 } catch (const std::bad_alloc&) {
2065 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2066 } catch (const bt2::Error&) {
2067 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
2068 }
2069 }
This page took 0.07001 seconds and 3 git commands to generate.