src.ctf.lttng-live: introduce lttng_live_session::UP and use it
[babeltrace.git] / src / plugins / ctf / lttng-live / lttng-live.cpp
1 /*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
5 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
6 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
7 *
8 * Babeltrace CTF LTTng-live Client Component
9 */
10
11 #include <glib.h>
12 #include <unistd.h>
13
14 #include "common/assert.h"
15 #include "cpp-common/bt2c/fmt.hpp"
16 #include "cpp-common/bt2c/glib-up.hpp"
17 #include "cpp-common/bt2c/vector.hpp"
18 #include "cpp-common/bt2s/make-unique.hpp"
19 #include "cpp-common/vendor/fmt/format.h"
20
21 #include "plugins/common/muxing/muxing.h"
22 #include "plugins/common/param-validation/param-validation.h"
23
24 #include "data-stream.hpp"
25 #include "lttng-live.hpp"
26 #include "metadata.hpp"
27
28 #define MAX_QUERY_SIZE (256 * 1024)
29 #define URL_PARAM "url"
30 #define INPUTS_PARAM "inputs"
31 #define SESS_NOT_FOUND_ACTION_PARAM "session-not-found-action"
32 #define SESS_NOT_FOUND_ACTION_CONTINUE_STR "continue"
33 #define SESS_NOT_FOUND_ACTION_FAIL_STR "fail"
34 #define SESS_NOT_FOUND_ACTION_END_STR "end"
35
36 void lttng_live_stream_iterator_set_state(struct lttng_live_stream_iterator *stream_iter,
37 enum lttng_live_stream_state new_state)
38 {
39 BT_CPPLOGD_SPEC(stream_iter->logger,
40 "Setting live stream iterator state: viewer-stream-id={}, "
41 "old-state={}, new-state={}",
42 stream_iter->viewer_stream_id, stream_iter->state, new_state);
43
44 stream_iter->state = new_state;
45 }
46
47 #define LTTNG_LIVE_LOGD_STREAM_ITER(live_stream_iter) \
48 do { \
49 BT_CPPLOGD_SPEC((live_stream_iter)->logger, \
50 "Live stream iterator state={}, " \
51 "last-inact-ts-is-set={}, last-inact-ts-value={}, " \
52 "curr-inact-ts={}", \
53 (live_stream_iter)->state, (live_stream_iter)->last_inactivity_ts.is_set, \
54 (live_stream_iter)->last_inactivity_ts.value, \
55 (live_stream_iter)->current_inactivity_ts); \
56 } while (0);
57
58 bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter)
59 {
60 bool ret;
61
62 if (!msg_iter) {
63 ret = false;
64 goto end;
65 }
66
67 ret = bt_self_message_iterator_is_interrupted(msg_iter->self_msg_iter);
68
69 end:
70 return ret;
71 }
72
73 static struct lttng_live_trace *
74 lttng_live_session_borrow_trace_by_id(struct lttng_live_session *session, uint64_t trace_id)
75 {
76 for (lttng_live_trace::UP& trace : session->traces) {
77 if (trace->id == trace_id) {
78 return trace.get();
79 }
80 }
81
82 return nullptr;
83 }
84
85 static struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
86 uint64_t trace_id)
87 {
88 BT_CPPLOGD_SPEC(session->logger, "Creating live trace: session-id={}, trace-id={}", session->id,
89 trace_id);
90
91 auto trace = bt2s::make_unique<lttng_live_trace>(session->logger);
92
93 trace->session = session;
94 trace->id = trace_id;
95 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
96
97 const auto ret = trace.get();
98 session->traces.emplace_back(std::move(trace));
99 return ret;
100 }
101
102 struct lttng_live_trace *
103 lttng_live_session_borrow_or_create_trace_by_id(struct lttng_live_session *session,
104 uint64_t trace_id)
105 {
106 struct lttng_live_trace *trace;
107
108 trace = lttng_live_session_borrow_trace_by_id(session, trace_id);
109 if (trace) {
110 goto end;
111 }
112
113 /* The session is the owner of the newly created trace. */
114 trace = lttng_live_create_trace(session, trace_id);
115
116 end:
117 return trace;
118 }
119
120 int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint64_t session_id,
121 const char *hostname, const char *session_name)
122 {
123 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
124 "Adding live session: "
125 "session-id={}, hostname=\"{}\", session-name=\"{}\"",
126 session_id, hostname, session_name);
127
128 auto session = bt2s::make_unique<lttng_live_session>(lttng_live_msg_iter->logger);
129
130 session->self_comp = lttng_live_msg_iter->self_comp;
131 session->id = session_id;
132 session->lttng_live_msg_iter = lttng_live_msg_iter;
133 session->new_streams_needed = true;
134 session->hostname = hostname;
135 session->session_name = session_name;
136
137 g_ptr_array_add(lttng_live_msg_iter->sessions, session.release());
138
139 return 0;
140 }
141
142 lttng_live_session::~lttng_live_session()
143 {
144 BT_CPPLOGD_SPEC(this->logger, "Destroying live session: session-id={}, session-name=\"{}\"",
145 this->id, this->session_name);
146
147 if (this->id != -1ULL) {
148 if (lttng_live_session_detach(this)) {
149 if (!lttng_live_graph_is_canceled(this->lttng_live_msg_iter)) {
150 /* Old relayd cannot detach sessions. */
151 BT_CPPLOGD_SPEC(this->logger, "Unable to detach lttng live session {}", this->id);
152 }
153 }
154
155 this->id = -1ULL;
156 }
157 }
158
159 static void lttng_live_destroy_session(struct lttng_live_session *session)
160 {
161 delete session;
162 }
163
164 static void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter)
165 {
166 if (!lttng_live_msg_iter) {
167 goto end;
168 }
169
170 if (lttng_live_msg_iter->sessions) {
171 g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
172 }
173
174 if (lttng_live_msg_iter->viewer_connection) {
175 live_viewer_connection_destroy(lttng_live_msg_iter->viewer_connection);
176 }
177 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
178 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
179
180 /* All stream iterators must be destroyed at this point. */
181 BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0);
182 lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false;
183
184 delete lttng_live_msg_iter;
185
186 end:
187 return;
188 }
189
190 void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
191 {
192 struct lttng_live_msg_iter *lttng_live_msg_iter;
193
194 BT_ASSERT(self_msg_iter);
195
196 lttng_live_msg_iter =
197 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_iter);
198 BT_ASSERT(lttng_live_msg_iter);
199 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
200 }
201
202 static enum lttng_live_iterator_status
203 lttng_live_iterator_next_check_stream_state(struct lttng_live_stream_iterator *lttng_live_stream)
204 {
205 switch (lttng_live_stream->state) {
206 case LTTNG_LIVE_STREAM_QUIESCENT:
207 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
208 break;
209 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
210 /* Invalid state. */
211 BT_CPPLOGF_SPEC(lttng_live_stream->logger, "Unexpected stream state \"ACTIVE_NO_DATA\"");
212 bt_common_abort();
213 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
214 /* Invalid state. */
215 BT_CPPLOGF_SPEC(lttng_live_stream->logger, "Unexpected stream state \"QUIESCENT_NO_DATA\"");
216 bt_common_abort();
217 case LTTNG_LIVE_STREAM_EOF:
218 break;
219 }
220 return LTTNG_LIVE_ITERATOR_STATUS_OK;
221 }
222
223 /*
224 * For active no data stream, fetch next index. As a result of that it can
225 * become either:
226 * - quiescent: won't have events for a bit,
227 * - have data: need to get that data and produce the event,
228 * - have no data on this stream at this point: need to retry (AGAIN) or return
229 * EOF.
230 */
231 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
232 struct lttng_live_msg_iter *lttng_live_msg_iter,
233 struct lttng_live_stream_iterator *lttng_live_stream)
234 {
235 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
236 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
237 struct packet_index index;
238
239 if (lttng_live_stream->trace->metadata_stream_state ==
240 LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
241 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
242 "Need to get an update for the metadata stream before proceeding "
243 "further with this stream: stream-name=\"{}\"",
244 lttng_live_stream->name);
245 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
246 goto end;
247 }
248
249 if (lttng_live_stream->trace->session->new_streams_needed) {
250 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
251 "Need to get an update of all streams before proceeding further "
252 "with this stream: stream-name=\"{}\"",
253 lttng_live_stream->name);
254 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
255 goto end;
256 }
257
258 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
259 lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
260 goto end;
261 }
262 ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream, &index);
263 if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
264 goto end;
265 }
266
267 BT_ASSERT_DBG(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
268
269 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
270 uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts.value,
271 curr_inact_ts = lttng_live_stream->current_inactivity_ts;
272
273 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA && last_inact_ts == curr_inact_ts) {
274 /*
275 * Because the stream is in the QUIESCENT_NO_DATA
276 * state, we can assert that the last_inactivity_ts was
277 * set and can be safely used in the `if` above.
278 */
279 BT_ASSERT(lttng_live_stream->last_inactivity_ts.is_set);
280
281 ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
282 LTTNG_LIVE_LOGD_STREAM_ITER(lttng_live_stream);
283 } else {
284 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
285 }
286 goto end;
287 }
288
289 lttng_live_stream->base_offset = index.offset;
290 lttng_live_stream->offset = index.offset;
291 lttng_live_stream->len = index.packet_size / CHAR_BIT;
292
293 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
294 "Setting live stream reading info: stream-name=\"{}\", "
295 "viewer-stream-id={}, stream-base-offset={}, stream-offset={}, stream-len={}",
296 lttng_live_stream->name, lttng_live_stream->viewer_stream_id,
297 lttng_live_stream->base_offset, lttng_live_stream->offset,
298 lttng_live_stream->len);
299
300 end:
301 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
302 ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
303 }
304 return ret;
305 }
306
307 /*
308 * Creation of the message requires the ctf trace class to be created
309 * beforehand, but the live protocol gives us all streams (including metadata)
310 * at once. So we split it in three steps: getting streams, getting metadata
311 * (which creates the ctf trace class), and then creating the per-stream
312 * messages.
313 */
314 static enum lttng_live_iterator_status
315 lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
316 struct lttng_live_session *session)
317 {
318 enum lttng_live_iterator_status status;
319
320 if (!session->attached) {
321 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Attach to session: session-id={}",
322 session->id);
323 enum lttng_live_viewer_status attach_status =
324 lttng_live_session_attach(session, lttng_live_msg_iter->self_msg_iter);
325 if (attach_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
326 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
327 /*
328 * Clear any causes appended in
329 * `lttng_live_attach_session()` as we want to
330 * return gracefully since the graph was
331 * cancelled.
332 */
333 bt_current_thread_clear_error();
334 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
335 } else {
336 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
337 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
338 "Error attaching to LTTng live session");
339 }
340 goto end;
341 }
342 }
343
344 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
345 "Updating all data streams: session-id={}, session-name=\"{}\"", session->id,
346 session->session_name);
347
348 status = lttng_live_session_get_new_streams(session, lttng_live_msg_iter->self_msg_iter);
349 switch (status) {
350 case LTTNG_LIVE_ITERATOR_STATUS_OK:
351 break;
352 case LTTNG_LIVE_ITERATOR_STATUS_END:
353 /*
354 * We received a `_END` from the `_get_new_streams()` function,
355 * which means no more data will ever be received from the data
356 * streams of this session. But it's possible that the metadata
357 * is incomplete.
358 * The live protocol guarantees that we receive all the
359 * metadata needed before we receive data streams needing it.
360 * But it's possible to receive metadata NOT needed by
361 * data streams after the session was closed. For example, this
362 * could happen if a new event is registered and the session is
363 * stopped before any tracepoint for that event is actually
364 * fired.
365 */
366 BT_CPPLOGD_SPEC(
367 lttng_live_msg_iter->logger,
368 "Updating streams returned _END status. Override status to _OK in order fetch any remaining metadata:"
369 "session-id={}, session-name=\"{}\"",
370 session->id, session->session_name);
371 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
372 break;
373 default:
374 goto end;
375 }
376
377 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
378 "Updating metadata stream for session: session-id={}, session-name=\"{}\"",
379 session->id, session->session_name);
380
381 for (lttng_live_trace::UP& trace : session->traces) {
382 status = lttng_live_metadata_update(trace.get());
383 switch (status) {
384 case LTTNG_LIVE_ITERATOR_STATUS_END:
385 case LTTNG_LIVE_ITERATOR_STATUS_OK:
386 break;
387 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
388 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
389 goto end;
390 default:
391 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
392 "Error updating trace metadata: "
393 "stream-iter-status={}, trace-id={}",
394 status, trace->id);
395 goto end;
396 }
397 }
398
399 /*
400 * Now that we have the metadata we can initialize the downstream
401 * iterator.
402 */
403 status = lttng_live_lazy_msg_init(session, lttng_live_msg_iter->self_msg_iter);
404
405 end:
406 return status;
407 }
408
409 static void
410 lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
411 {
412 uint64_t session_idx;
413
414 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
415 struct lttng_live_session *session =
416 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
417 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
418 "Force marking session as needing new streams: "
419 "session-id={}",
420 session->id);
421 session->new_streams_needed = true;
422 for (lttng_live_trace::UP& trace : session->traces) {
423 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
424 "Force marking trace metadata state as needing an update: "
425 "session-id={}, trace-id={}",
426 session->id, trace->id);
427
428 BT_ASSERT(trace->metadata_stream_state != LTTNG_LIVE_METADATA_STREAM_STATE_CLOSED);
429
430 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
431 }
432 }
433 }
434
435 static enum lttng_live_iterator_status
436 lttng_live_iterator_handle_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
437 {
438 enum lttng_live_iterator_status status;
439 enum lttng_live_viewer_status viewer_status;
440 uint64_t session_idx = 0, nr_sessions_opened = 0;
441 struct lttng_live_session *session;
442 enum session_not_found_action sess_not_found_act =
443 lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
444
445 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
446 "Update data and metadata of all sessions: "
447 "live-msg-iter-addr={}",
448 fmt::ptr(lttng_live_msg_iter));
449 /*
450 * In a remotely distant future, we could add a "new
451 * session" flag to the protocol, which would tell us that we
452 * need to query for new sessions even though we have sessions
453 * currently ongoing.
454 */
455 if (lttng_live_msg_iter->sessions->len == 0) {
456 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
457 BT_CPPLOGD_SPEC(
458 lttng_live_msg_iter->logger,
459 "No session found. Exiting in accordance with the `session-not-found-action` parameter");
460 status = LTTNG_LIVE_ITERATOR_STATUS_END;
461 goto end;
462 } else {
463 BT_CPPLOGD_SPEC(
464 lttng_live_msg_iter->logger,
465 "No session found. Try creating a new one in accordance with the `session-not-found-action` parameter");
466 /*
467 * Retry to create a viewer session for the requested
468 * session name.
469 */
470 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
471 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
472 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
473 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
474 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
475 "Error creating LTTng live viewer session");
476 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
477 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
478 } else {
479 bt_common_abort();
480 }
481 goto end;
482 }
483 }
484 }
485
486 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
487 session =
488 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
489 status = lttng_live_get_session(lttng_live_msg_iter, session);
490 switch (status) {
491 case LTTNG_LIVE_ITERATOR_STATUS_OK:
492 case LTTNG_LIVE_ITERATOR_STATUS_END:
493 /*
494 * A session returned `_END`. Other sessions may still
495 * be active so we override the status and continue
496 * looping if needed.
497 */
498 break;
499 default:
500 goto end;
501 }
502 if (!session->closed) {
503 nr_sessions_opened++;
504 }
505 }
506
507 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE && nr_sessions_opened == 0) {
508 status = LTTNG_LIVE_ITERATOR_STATUS_END;
509 } else {
510 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
511 }
512
513 end:
514 return status;
515 }
516
517 static enum lttng_live_iterator_status
518 emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
519 struct lttng_live_stream_iterator *stream_iter,
520 bt2::ConstMessage::Shared& message, uint64_t timestamp)
521 {
522 BT_ASSERT(stream_iter->trace->clock_class);
523 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
524 "Emitting inactivity message for stream: ctf-stream-id={}, "
525 "viewer-stream-id={}, timestamp={}",
526 stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id,
527 timestamp);
528
529 const auto msg = bt_message_message_iterator_inactivity_create(
530 lttng_live_msg_iter->self_msg_iter, stream_iter->trace->clock_class, timestamp);
531
532 if (!msg) {
533 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
534 "Error emitting message iterator inactivity message");
535 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
536 }
537
538 message = bt2::ConstMessage::Shared::createWithoutRef(msg);
539 return LTTNG_LIVE_ITERATOR_STATUS_OK;
540 }
541
542 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
543 struct lttng_live_msg_iter *lttng_live_msg_iter,
544 struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message)
545 {
546 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
547
548 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
549 return LTTNG_LIVE_ITERATOR_STATUS_OK;
550 }
551
552 /*
553 * Check if we already sent an inactivty message downstream for this
554 * `current_inactivity_ts` value.
555 */
556 if (lttng_live_stream->last_inactivity_ts.is_set &&
557 lttng_live_stream->current_inactivity_ts == lttng_live_stream->last_inactivity_ts.value) {
558 lttng_live_stream_iterator_set_state(lttng_live_stream,
559 LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA);
560
561 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
562 goto end;
563 }
564
565 ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream, message,
566 lttng_live_stream->current_inactivity_ts);
567
568 lttng_live_stream->last_inactivity_ts.value = lttng_live_stream->current_inactivity_ts;
569 lttng_live_stream->last_inactivity_ts.is_set = true;
570 end:
571 return ret;
572 }
573
574 static int live_get_msg_ts_ns(struct lttng_live_msg_iter *lttng_live_msg_iter,
575 const bt_message *msg, int64_t last_msg_ts_ns, int64_t *ts_ns)
576 {
577 const bt_clock_snapshot *clock_snapshot = NULL;
578 int ret = 0;
579
580 BT_ASSERT_DBG(msg);
581 BT_ASSERT_DBG(ts_ns);
582
583 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
584 "Getting message's timestamp: iter-data-addr={}, msg-addr={}, "
585 "last-msg-ts={}",
586 fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns);
587
588 switch (bt_message_get_type(msg)) {
589 case BT_MESSAGE_TYPE_EVENT:
590 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(msg);
591 break;
592 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
593 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
594 break;
595 case BT_MESSAGE_TYPE_PACKET_END:
596 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
597 break;
598 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
599 clock_snapshot =
600 bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
601 break;
602 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
603 clock_snapshot =
604 bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
605 break;
606 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
607 clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const(msg);
608 break;
609 default:
610 /* All the other messages have a higher priority */
611 BT_CPPLOGD_STR_SPEC(lttng_live_msg_iter->logger,
612 "Message has no timestamp: using the last message timestamp.");
613 *ts_ns = last_msg_ts_ns;
614 goto end;
615 }
616
617 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
618 if (ret) {
619 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
620 "Cannot get nanoseconds from Epoch of clock snapshot: "
621 "clock-snapshot-addr={}",
622 fmt::ptr(clock_snapshot));
623 goto error;
624 }
625
626 goto end;
627
628 error:
629 ret = -1;
630
631 end:
632 if (ret == 0) {
633 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
634 "Found message's timestamp: iter-data-addr={}, msg-addr={}, "
635 "last-msg-ts={}, ts={}",
636 fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns, *ts_ns);
637 }
638
639 return ret;
640 }
641
642 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
643 struct lttng_live_msg_iter *lttng_live_msg_iter,
644 struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message)
645 {
646 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
647 enum ctf_msg_iter_status status;
648 uint64_t session_idx;
649
650 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
651 struct lttng_live_session *session =
652 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
653
654 if (session->new_streams_needed) {
655 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
656 "Need an update for streams: "
657 "session-id={}",
658 session->id);
659 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
660 goto end;
661 }
662 for (lttng_live_trace::UP& trace : session->traces) {
663 if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
664 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
665 "Need an update for metadata stream: "
666 "session-id={}, trace-id={}",
667 session->id, trace->id);
668 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
669 goto end;
670 }
671 }
672 }
673
674 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
675 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
676 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
677 "Invalid state of live stream iterator"
678 "stream-iter-status={}",
679 lttng_live_stream->state);
680 goto end;
681 }
682
683 const bt_message *msg;
684 status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), &msg);
685 switch (status) {
686 case CTF_MSG_ITER_STATUS_EOF:
687 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
688 break;
689 case CTF_MSG_ITER_STATUS_OK:
690 message = bt2::ConstMessage::Shared::createWithoutRef(msg);
691 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
692 break;
693 case CTF_MSG_ITER_STATUS_AGAIN:
694 /*
695 * Continue immediately (end of packet). The next
696 * get_index may return AGAIN to delay the following
697 * attempt.
698 */
699 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
700 break;
701 case CTF_MSG_ITER_STATUS_ERROR:
702 default:
703 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
704 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
705 "CTF message iterator failed to get next message: "
706 "msg-iter={}, msg-iter-status={}",
707 fmt::ptr(lttng_live_stream->msg_iter), status);
708 break;
709 }
710
711 end:
712 return ret;
713 }
714
715 static enum lttng_live_iterator_status
716 lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
717 struct lttng_live_stream_iterator *stream_iter,
718 bt2::ConstMessage::Shared& curr_msg)
719 {
720 enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
721
722 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
723 "Closing live stream iterator: stream-name=\"{}\", "
724 "viewer-stream-id={}",
725 stream_iter->name, stream_iter->viewer_stream_id);
726
727 /*
728 * The viewer has hung up on us so we are closing the stream. The
729 * `ctf_msg_iter` should simply realize that it needs to close the
730 * stream properly by emitting the necessary stream end message.
731 */
732 const bt_message *msg;
733 enum ctf_msg_iter_status status =
734 ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), &msg);
735
736 if (status == CTF_MSG_ITER_STATUS_ERROR) {
737 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
738 "Error getting the next message from CTF message iterator");
739 live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
740 goto end;
741 } else if (status == CTF_MSG_ITER_STATUS_EOF) {
742 BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger,
743 "Reached the end of the live stream iterator.");
744 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
745 goto end;
746 }
747
748 BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
749
750 curr_msg = bt2::ConstMessage::Shared::createWithoutRef(msg);
751
752 end:
753 return live_status;
754 }
755
756 /*
757 * helper function:
758 * handle_no_data_streams()
759 * retry:
760 * - for each ACTIVE_NO_DATA stream:
761 * - query relayd for stream data, or quiescence info.
762 * - if need metadata, get metadata, goto retry.
763 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
764 * - if quiescent, move to QUIESCENT streams
765 * - if fetched data, move to ACTIVE_DATA streams
766 * (at this point each stream either has data, or is quiescent)
767 *
768 *
769 * iterator_next:
770 * handle_new_streams_and_metadata()
771 * - query relayd for known streams, add them as ACTIVE_NO_DATA
772 * - query relayd for metadata
773 *
774 * call handle_active_no_data_streams()
775 *
776 * handle_quiescent_streams()
777 * - if at least one stream is ACTIVE_DATA:
778 * - peek stream event with lowest timestamp -> next_ts
779 * - for each quiescent stream
780 * - if next_ts >= quiescent end
781 * - set state to ACTIVE_NO_DATA
782 * - else
783 * - for each quiescent stream
784 * - set state to ACTIVE_NO_DATA
785 *
786 * call handle_active_no_data_streams()
787 *
788 * handle_active_data_streams()
789 * - if at least one stream is ACTIVE_DATA:
790 * - get stream event with lowest timestamp from heap
791 * - make that stream event the current message.
792 * - move this stream heap position to its next event
793 * - if we need to fetch data from relayd, move
794 * stream to ACTIVE_NO_DATA.
795 * - return OK
796 * - return AGAIN
797 *
798 * end criterion: ctrl-c on client. If relayd exits or the session
799 * closes on the relay daemon side, we keep on waiting for streams.
800 * Eventually handle --end timestamp (also an end criterion).
801 *
802 * When disconnected from relayd: try to re-connect endlessly.
803 */
804 static enum lttng_live_iterator_status
805 lttng_live_iterator_next_msg_on_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
806 struct lttng_live_stream_iterator *stream_iter,
807 bt2::ConstMessage::Shared& curr_msg)
808 {
809 enum lttng_live_iterator_status live_status;
810
811 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
812 "Advancing live stream iterator until next message if possible: "
813 "stream-name=\"{}\", viewer-stream-id={}",
814 stream_iter->name, stream_iter->viewer_stream_id);
815
816 if (stream_iter->has_stream_hung_up) {
817 /*
818 * The stream has hung up and the stream was properly closed
819 * during the last call to the current function. Return _END
820 * status now so that this stream iterator is removed for the
821 * stream iterator list.
822 */
823 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
824 goto end;
825 }
826
827 retry:
828 LTTNG_LIVE_LOGD_STREAM_ITER(stream_iter);
829
830 /*
831 * Make sure we have the most recent metadata and possibly some new
832 * streams.
833 */
834 live_status = lttng_live_iterator_handle_new_streams_and_metadata(lttng_live_msg_iter);
835 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
836 goto end;
837 }
838
839 live_status =
840 lttng_live_iterator_next_handle_one_no_data_stream(lttng_live_msg_iter, stream_iter);
841 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
842 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
843 /*
844 * We overwrite `live_status` since `curr_msg` is
845 * likely set to a valid message in this function.
846 */
847 live_status =
848 lttng_live_iterator_close_stream(lttng_live_msg_iter, stream_iter, curr_msg);
849 }
850 goto end;
851 }
852
853 live_status = lttng_live_iterator_next_handle_one_quiescent_stream(lttng_live_msg_iter,
854 stream_iter, curr_msg);
855 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
856 BT_ASSERT(!curr_msg);
857 goto end;
858 }
859 if (curr_msg) {
860 goto end;
861 }
862 live_status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live_msg_iter,
863 stream_iter, curr_msg);
864 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
865 BT_ASSERT(!curr_msg);
866 }
867
868 end:
869 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
870 BT_CPPLOGD_SPEC(
871 lttng_live_msg_iter->logger,
872 "Ask the relay daemon for an updated view of the data and metadata streams");
873 goto retry;
874 }
875
876 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
877 "Returning from advancing live stream iterator: status={}, "
878 "stream-name=\"{}\", viewer-stream-id={}",
879 live_status, stream_iter->name, stream_iter->viewer_stream_id);
880
881 return live_status;
882 }
883
884 static bool is_discarded_packet_or_event_message(const bt2::ConstMessage msg)
885 {
886 return msg.type() == bt2::MessageType::DiscardedEvents ||
887 msg.type() == bt2::MessageType::DiscardedPackets;
888 }
889
890 static enum lttng_live_iterator_status
891 adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream,
892 const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out,
893 uint64_t new_begin_ts)
894 {
895 enum bt_property_availability availability;
896 const bt_clock_snapshot *clock_snapshot;
897 uint64_t end_ts;
898 uint64_t count;
899
900 clock_snapshot = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg_in);
901 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
902
903 availability = bt_message_discarded_packets_get_count(msg_in, &count);
904 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
905
906 const auto msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
907 iter, stream, new_begin_ts, end_ts);
908
909 if (!msg) {
910 return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
911 }
912
913 bt_message_discarded_packets_set_count(msg, count);
914 msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
915 return LTTNG_LIVE_ITERATOR_STATUS_OK;
916 }
917
918 static enum lttng_live_iterator_status
919 adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream,
920 const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out,
921 uint64_t new_begin_ts)
922 {
923 enum bt_property_availability availability;
924 const bt_clock_snapshot *clock_snapshot;
925 uint64_t end_ts;
926 uint64_t count;
927
928 clock_snapshot = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg_in);
929 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
930
931 availability = bt_message_discarded_events_get_count(msg_in, &count);
932 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
933
934 const auto msg = bt_message_discarded_events_create_with_default_clock_snapshots(
935 iter, stream, new_begin_ts, end_ts);
936
937 if (!msg) {
938 return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
939 }
940
941 bt_message_discarded_events_set_count(msg, count);
942 msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
943 return LTTNG_LIVE_ITERATOR_STATUS_OK;
944 }
945
946 static enum lttng_live_iterator_status
947 handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
948 struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns,
949 const bt2::ConstMessage& late_msg)
950 {
951 const bt_clock_class *clock_class;
952 const bt_stream_class *stream_class;
953 enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status;
954 int64_t last_inactivity_ts_ns;
955 enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
956 enum lttng_live_iterator_status adjust_status;
957 bt2::ConstMessage::Shared adjusted_message;
958
959 /*
960 * The timestamp of the current message is before the last message sent
961 * by this component. We CANNOT send it as is.
962 *
963 * The only expected scenario in which that could happen is the
964 * following, everything else is a bug in this component, relay daemon,
965 * or CTF parser.
966 *
967 * Expected scenario: The CTF message iterator emitted discarded
968 * packets and discarded events with synthesized beginning and end
969 * timestamps from the bounds of the last known packet and the newly
970 * decoded packet header. The CTF message iterator is not aware of
971 * stream inactivity beacons. Hence, we have to adjust the beginning
972 * timestamp of those types of messages if a stream signalled its
973 * inactivity up until _after_ the last known packet's beginning
974 * timestamp.
975 *
976 * Otherwise, the monotonicity guarantee of message timestamps would
977 * not be preserved.
978 *
979 * In short, the only scenario in which it's okay and fixable to
980 * received a late message is when:
981 * 1. the late message is a discarded packets or discarded events
982 * message,
983 * 2. this stream produced an inactivity message downstream, and
984 * 3. the timestamp of the late message is within the inactivity
985 * timespan we sent downstream through the inactivity message.
986 */
987
988 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
989 "Handling late message on live stream iterator: "
990 "stream-name=\"{}\", viewer-stream-id={}",
991 stream_iter->name, stream_iter->viewer_stream_id);
992
993 if (!stream_iter->last_inactivity_ts.is_set) {
994 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
995 "Invalid live stream state: "
996 "have a late message when no inactivity message "
997 "was ever sent for that stream.");
998 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
999 goto end;
1000 }
1001
1002 if (!is_discarded_packet_or_event_message(late_msg)) {
1003 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1004 "Invalid live stream state: "
1005 "have a late message that is not a packet discarded or "
1006 "event discarded message: late-msg-type={}",
1007 late_msg.type());
1008 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1009 goto end;
1010 }
1011
1012 stream_class = bt_stream_borrow_class_const(stream_iter->stream->libObjPtr());
1013 clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class);
1014
1015 ts_ns_status = bt_clock_class_cycles_to_ns_from_origin(
1016 clock_class, stream_iter->last_inactivity_ts.value, &last_inactivity_ts_ns);
1017 if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) {
1018 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1019 "Error converting last "
1020 "inactivity message timestamp to nanoseconds");
1021 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1022 goto end;
1023 }
1024
1025 if (last_inactivity_ts_ns <= late_msg_ts_ns) {
1026 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1027 "Invalid live stream state: "
1028 "have a late message that is none included in a stream "
1029 "inactivity timespan: last-inactivity-ts-ns={}, "
1030 "late-msg-ts-ns={}",
1031 last_inactivity_ts_ns, late_msg_ts_ns);
1032 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1033 goto end;
1034 }
1035
1036 /*
1037 * We now know that it's okay for this message to be late, we can now
1038 * adjust its timestamp to ensure monotonicity.
1039 */
1040 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1041 "Adjusting the timestamp of late message: late-msg-type={}, "
1042 "msg-new-ts-ns={}",
1043 late_msg.type(), stream_iter->last_inactivity_ts.value);
1044 switch (late_msg.type()) {
1045 case bt2::MessageType::DiscardedEvents:
1046 adjust_status = adjust_discarded_events_message(
1047 lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(),
1048 late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
1049 break;
1050 case bt2::MessageType::DiscardedPackets:
1051 adjust_status = adjust_discarded_packets_message(
1052 lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(),
1053 late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
1054 break;
1055 default:
1056 bt_common_abort();
1057 }
1058
1059 if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1060 stream_iter_status = adjust_status;
1061 goto end;
1062 }
1063
1064 BT_ASSERT_DBG(adjusted_message);
1065 stream_iter->current_msg = adjusted_message;
1066 stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
1067
1068 end:
1069 return stream_iter_status;
1070 }
1071
1072 static enum lttng_live_iterator_status
1073 next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
1074 struct lttng_live_trace *live_trace,
1075 struct lttng_live_stream_iterator **youngest_trace_stream_iter)
1076 {
1077 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
1078 enum lttng_live_iterator_status stream_iter_status;
1079 int64_t youngest_candidate_msg_ts = INT64_MAX;
1080 uint64_t stream_iter_idx;
1081
1082 BT_ASSERT_DBG(live_trace);
1083
1084 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1085 "Finding the next stream iterator for trace: "
1086 "trace-id={}",
1087 live_trace->id);
1088 /*
1089 * Update the current message of every stream iterators of this trace.
1090 * The current msg of every stream must have a timestamp equal or
1091 * larger than the last message returned by this iterator. We must
1092 * ensure monotonicity.
1093 */
1094 stream_iter_idx = 0;
1095 while (stream_iter_idx < live_trace->stream_iterators.size()) {
1096 bool stream_iter_is_ended = false;
1097 lttng_live_stream_iterator *stream_iter =
1098 live_trace->stream_iterators[stream_iter_idx].get();
1099
1100 /*
1101 * If there is no current message for this stream, go fetch
1102 * one.
1103 */
1104 while (!stream_iter->current_msg) {
1105 bt2::ConstMessage::Shared msg;
1106 int64_t curr_msg_ts_ns = INT64_MAX;
1107
1108 stream_iter_status =
1109 lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, msg);
1110
1111 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1112 stream_iter_is_ended = true;
1113 break;
1114 }
1115
1116 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1117 goto end;
1118 }
1119
1120 BT_ASSERT_DBG(msg);
1121
1122 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1123 "Live stream iterator returned message: msg-type={}, "
1124 "stream-name=\"{}\", viewer-stream-id={}",
1125 msg->type(), stream_iter->name, stream_iter->viewer_stream_id);
1126
1127 /*
1128 * Get the timestamp in nanoseconds from origin of this
1129 * message.
1130 */
1131 live_get_msg_ts_ns(lttng_live_msg_iter, msg->libObjPtr(),
1132 lttng_live_msg_iter->last_msg_ts_ns, &curr_msg_ts_ns);
1133
1134 /*
1135 * Check if the message of the current live stream
1136 * iterator occurred at the exact same time or after the
1137 * last message returned by this component's message
1138 * iterator. If not, we need to handle it with care.
1139 */
1140 if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
1141 stream_iter->current_msg = std::move(msg);
1142 stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
1143 } else {
1144 /*
1145 * We received a message from the past. This
1146 * may be fixable but it can also be an error.
1147 */
1148 stream_iter_status =
1149 handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, *msg);
1150 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1151 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1152 "Late message could not be handled correctly: "
1153 "lttng-live-msg-iter-addr={}, "
1154 "stream-name=\"{}\", "
1155 "curr-msg-ts={}, last-msg-ts={}",
1156 fmt::ptr(lttng_live_msg_iter), stream_iter->name,
1157 curr_msg_ts_ns,
1158 lttng_live_msg_iter->last_msg_ts_ns);
1159 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1160 goto end;
1161 }
1162 }
1163 }
1164
1165 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1166
1167 if (!stream_iter_is_ended) {
1168 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1169 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1170 /*
1171 * Update the current best candidate message
1172 * for the stream iterator of this live trace
1173 * to be forwarded downstream.
1174 */
1175 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1176 youngest_candidate_stream_iter = stream_iter;
1177 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1178 /*
1179 * Order the messages in an arbitrary but
1180 * deterministic way.
1181 */
1182 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1183 int ret = common_muxing_compare_messages(
1184 stream_iter->current_msg->libObjPtr(),
1185 youngest_candidate_stream_iter->current_msg->libObjPtr());
1186 if (ret < 0) {
1187 /*
1188 * The `youngest_candidate_stream_iter->current_msg`
1189 * should go first. Update the next
1190 * iterator and the current timestamp.
1191 */
1192 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1193 youngest_candidate_stream_iter = stream_iter;
1194 } else if (ret == 0) {
1195 /*
1196 * Unable to pick which one should go
1197 * first.
1198 */
1199 BT_CPPLOGW_SPEC(
1200 lttng_live_msg_iter->logger,
1201 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1202 "stream-iter-addr={}"
1203 "stream-iter-addr={}",
1204 fmt::ptr(stream_iter), fmt::ptr(youngest_candidate_stream_iter));
1205 }
1206 }
1207
1208 stream_iter_idx++;
1209 } else {
1210 /*
1211 * The live stream iterator has ended. That
1212 * iterator is removed from the array, but
1213 * there is no need to increment
1214 * stream_iter_idx as
1215 * g_ptr_array_remove_index_fast replaces the
1216 * removed element with the array's last
1217 * element.
1218 */
1219 bt2c::vectorFastRemove(live_trace->stream_iterators, stream_iter_idx);
1220 }
1221 }
1222
1223 if (youngest_candidate_stream_iter) {
1224 *youngest_trace_stream_iter = youngest_candidate_stream_iter;
1225 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1226 } else {
1227 /*
1228 * The only case where we don't have a candidate for this trace
1229 * is if we reached the end of all the iterators.
1230 */
1231 BT_ASSERT(live_trace->stream_iterators.empty());
1232 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1233 }
1234
1235 end:
1236 return stream_iter_status;
1237 }
1238
1239 static enum lttng_live_iterator_status
1240 next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
1241 struct lttng_live_session *session,
1242 struct lttng_live_stream_iterator **youngest_session_stream_iter)
1243 {
1244 enum lttng_live_iterator_status stream_iter_status;
1245 uint64_t trace_idx = 0;
1246 int64_t youngest_candidate_msg_ts = INT64_MAX;
1247 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
1248
1249 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1250 "Finding the next stream iterator for session: "
1251 "session-id={}",
1252 session->id);
1253 /*
1254 * Make sure we are attached to the session and look for new streams
1255 * and metadata.
1256 */
1257 stream_iter_status = lttng_live_get_session(lttng_live_msg_iter, session);
1258 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
1259 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE &&
1260 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) {
1261 goto end;
1262 }
1263
1264 while (trace_idx < session->traces.size()) {
1265 bool trace_is_ended = false;
1266 struct lttng_live_stream_iterator *stream_iter;
1267 lttng_live_trace *trace = session->traces[trace_idx].get();
1268
1269 stream_iter_status =
1270 next_stream_iterator_for_trace(lttng_live_msg_iter, trace, &stream_iter);
1271 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1272 /*
1273 * All the live stream iterators for this trace are
1274 * ENDed. Remove the trace from this session.
1275 */
1276 trace_is_ended = true;
1277 } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1278 goto end;
1279 }
1280
1281 if (!trace_is_ended) {
1282 BT_ASSERT_DBG(stream_iter);
1283
1284 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1285 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1286 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1287 youngest_candidate_stream_iter = stream_iter;
1288 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1289 /*
1290 * Order the messages in an arbitrary but
1291 * deterministic way.
1292 */
1293 int ret = common_muxing_compare_messages(
1294 stream_iter->current_msg->libObjPtr(),
1295 youngest_candidate_stream_iter->current_msg->libObjPtr());
1296 if (ret < 0) {
1297 /*
1298 * The `youngest_candidate_stream_iter->current_msg`
1299 * should go first. Update the next iterator
1300 * and the current timestamp.
1301 */
1302 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1303 youngest_candidate_stream_iter = stream_iter;
1304 } else if (ret == 0) {
1305 /* Unable to pick which one should go first. */
1306 BT_CPPLOGW_SPEC(
1307 lttng_live_msg_iter->logger,
1308 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1309 "stream-iter-addr={}"
1310 "youngest-candidate-stream-iter-addr={}",
1311 fmt::ptr(stream_iter), fmt::ptr(youngest_candidate_stream_iter));
1312 }
1313 }
1314 trace_idx++;
1315 } else {
1316 /*
1317 * trace_idx is not incremented since
1318 * vectorFastRemove replaces the
1319 * element at trace_idx with the array's last element.
1320 */
1321 bt2c::vectorFastRemove(session->traces, trace_idx);
1322 }
1323 }
1324 if (youngest_candidate_stream_iter) {
1325 *youngest_session_stream_iter = youngest_candidate_stream_iter;
1326 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1327 } else {
1328 /*
1329 * The only cases where we don't have a candidate for this
1330 * trace is:
1331 * 1. if we reached the end of all the iterators of all the
1332 * traces of this session,
1333 * 2. if we never had live stream iterator in the first place.
1334 *
1335 * In either cases, we return END.
1336 */
1337 BT_ASSERT(session->traces.empty());
1338 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1339 }
1340 end:
1341 return stream_iter_status;
1342 }
1343
1344 static inline void put_messages(bt_message_array_const msgs, uint64_t count)
1345 {
1346 uint64_t i;
1347
1348 for (i = 0; i < count; i++) {
1349 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
1350 }
1351 }
1352
1353 bt_message_iterator_class_next_method_status
1354 lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array_const msgs,
1355 uint64_t capacity, uint64_t *count)
1356 {
1357 try {
1358 bt_message_iterator_class_next_method_status status;
1359 enum lttng_live_viewer_status viewer_status;
1360 struct lttng_live_msg_iter *lttng_live_msg_iter =
1361 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_it);
1362 struct lttng_live_component *lttng_live = lttng_live_msg_iter->lttng_live_comp;
1363 enum lttng_live_iterator_status stream_iter_status;
1364 uint64_t session_idx;
1365
1366 *count = 0;
1367
1368 BT_ASSERT_DBG(lttng_live_msg_iter);
1369
1370 if (G_UNLIKELY(lttng_live_msg_iter->was_interrupted)) {
1371 /*
1372 * The iterator was interrupted in a previous call to the
1373 * `_next()` method. We currently do not support generating
1374 * messages after such event. The babeltrace2 CLI should never
1375 * be running the graph after being interrupted. So this check
1376 * is to prevent other graph users from using this live
1377 * iterator in an messed up internal state.
1378 */
1379 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1380 BT_CPPLOGE_APPEND_CAUSE_SPEC(
1381 lttng_live_msg_iter->logger,
1382 "Message iterator was interrupted during a previous call to the `next()` and currently does not support continuing after such event.");
1383 goto end;
1384 }
1385
1386 /*
1387 * Clear all the invalid message reference that might be left over in
1388 * the output array.
1389 */
1390 memset(msgs, 0, capacity * sizeof(*msgs));
1391
1392 /*
1393 * If no session are exposed on the relay found at the url provided by
1394 * the user, session count will be 0. In this case, we return status
1395 * end to return gracefully.
1396 */
1397 if (lttng_live_msg_iter->sessions->len == 0) {
1398 if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
1399 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1400 goto end;
1401 } else {
1402 /*
1403 * The are no more active session for this session
1404 * name. Retry to create a viewer session for the
1405 * requested session name.
1406 */
1407 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1408 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1409 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1410 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1411 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1412 "Error creating LTTng live viewer session");
1413 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1414 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1415 } else {
1416 bt_common_abort();
1417 }
1418 goto end;
1419 }
1420 }
1421 }
1422
1423 if (lttng_live_msg_iter->active_stream_iter == 0) {
1424 lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter);
1425 }
1426
1427 /*
1428 * Here the muxing of message is done.
1429 *
1430 * We need to iterate over all the streams of all the traces of all the
1431 * viewer sessions in order to get the message with the smallest
1432 * timestamp. In this case, a session is a viewer session and there is
1433 * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
1434 * kernel). Each viewer session can have multiple traces, for example,
1435 * 64bit UST viewer sessions could have multiple per-pid traces.
1436 *
1437 * We iterate over the streams of each traces to update and see what is
1438 * their next message's timestamp. From those timestamps, we select the
1439 * message with the smallest timestamp as the best candidate message
1440 * for that trace and do the same thing across all the sessions.
1441 *
1442 * We then compare the timestamp of best candidate message of all the
1443 * sessions to pick the message with the smallest timestamp and we
1444 * return it.
1445 */
1446 while (*count < capacity) {
1447 struct lttng_live_stream_iterator *youngest_stream_iter = NULL,
1448 *candidate_stream_iter = NULL;
1449 int64_t youngest_msg_ts_ns = INT64_MAX;
1450
1451 BT_ASSERT_DBG(lttng_live_msg_iter->sessions);
1452 session_idx = 0;
1453 while (session_idx < lttng_live_msg_iter->sessions->len) {
1454 struct lttng_live_session *session = (lttng_live_session *) g_ptr_array_index(
1455 lttng_live_msg_iter->sessions, session_idx);
1456
1457 /* Find the best candidate message to send downstream. */
1458 stream_iter_status = next_stream_iterator_for_session(lttng_live_msg_iter, session,
1459 &candidate_stream_iter);
1460
1461 /* If we receive an END status, it means that either:
1462 * - Those traces never had active streams (UST with no
1463 * data produced yet),
1464 * - All live stream iterators have ENDed.
1465 */
1466 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1467 if (session->closed && session->traces.empty()) {
1468 /*
1469 * Remove the session from the list.
1470 * session_idx is not modified since
1471 * g_ptr_array_remove_index_fast
1472 * replaces the the removed element with
1473 * the array's last element.
1474 */
1475 g_ptr_array_remove_index_fast(lttng_live_msg_iter->sessions, session_idx);
1476 } else {
1477 session_idx++;
1478 }
1479 continue;
1480 }
1481
1482 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1483 goto return_status;
1484 }
1485
1486 if (G_UNLIKELY(youngest_stream_iter == NULL) ||
1487 candidate_stream_iter->current_msg_ts_ns < youngest_msg_ts_ns) {
1488 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1489 youngest_stream_iter = candidate_stream_iter;
1490 } else if (candidate_stream_iter->current_msg_ts_ns == youngest_msg_ts_ns) {
1491 /*
1492 * The currently selected message to be sent
1493 * downstream next has the exact same timestamp
1494 * that of the current candidate message. We
1495 * must break the tie in a predictable manner.
1496 */
1497 BT_CPPLOGD_STR_SPEC(
1498 lttng_live_msg_iter->logger,
1499 "Two of the next message candidates have the same timestamps, pick one deterministically.");
1500 /*
1501 * Order the messages in an arbitrary but
1502 * deterministic way.
1503 */
1504 int ret = common_muxing_compare_messages(
1505 candidate_stream_iter->current_msg->libObjPtr(),
1506 youngest_stream_iter->current_msg->libObjPtr());
1507 if (ret < 0) {
1508 /*
1509 * The `candidate_stream_iter->current_msg`
1510 * should go first. Update the next
1511 * iterator and the current timestamp.
1512 */
1513 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1514 youngest_stream_iter = candidate_stream_iter;
1515 } else if (ret == 0) {
1516 /* Unable to pick which one should go first. */
1517 BT_CPPLOGW_SPEC(
1518 lttng_live_msg_iter->logger,
1519 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1520 "next-stream-iter-addr={}"
1521 "candidate-stream-iter-addr={}",
1522 fmt::ptr(youngest_stream_iter), fmt::ptr(candidate_stream_iter));
1523 }
1524 }
1525
1526 session_idx++;
1527 }
1528
1529 if (!youngest_stream_iter) {
1530 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1531 goto return_status;
1532 }
1533
1534 BT_ASSERT_DBG(youngest_stream_iter->current_msg);
1535 /* Ensure monotonicity. */
1536 BT_ASSERT_DBG(lttng_live_msg_iter->last_msg_ts_ns <=
1537 youngest_stream_iter->current_msg_ts_ns);
1538
1539 /*
1540 * Insert the next message to the message batch. This will set
1541 * stream iterator current message to NULL so that next time
1542 * we fetch the next message of that stream iterator
1543 */
1544 msgs[*count] = youngest_stream_iter->current_msg.release().libObjPtr();
1545 (*count)++;
1546
1547 /* Update the last timestamp in nanoseconds sent downstream. */
1548 lttng_live_msg_iter->last_msg_ts_ns = youngest_msg_ts_ns;
1549 youngest_stream_iter->current_msg_ts_ns = INT64_MAX;
1550
1551 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1552 }
1553
1554 return_status:
1555 switch (stream_iter_status) {
1556 case LTTNG_LIVE_ITERATOR_STATUS_OK:
1557 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
1558 /*
1559 * If we gathered messages, return _OK even if the graph was
1560 * interrupted. This allows for the components downstream to at
1561 * least get the those messages. If the graph was indeed
1562 * interrupted there should not be another _next() call as the
1563 * application will tear down the graph. This component class
1564 * doesn't support restarting after an interruption.
1565 */
1566 if (*count > 0) {
1567 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
1568 } else {
1569 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1570 }
1571 break;
1572 case LTTNG_LIVE_ITERATOR_STATUS_END:
1573 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1574 break;
1575 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
1576 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1577 "Memory error preparing the next batch of messages: "
1578 "live-iter-status={}",
1579 stream_iter_status);
1580 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1581 break;
1582 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
1583 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
1584 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
1585 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1586 "Error preparing the next batch of messages: "
1587 "live-iter-status={}",
1588 stream_iter_status);
1589
1590 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1591 /* Put all existing messages on error. */
1592 put_messages(msgs, *count);
1593 break;
1594 default:
1595 bt_common_abort();
1596 }
1597
1598 end:
1599 return status;
1600 } catch (const std::bad_alloc&) {
1601 return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1602 } catch (const bt2::Error&) {
1603 return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1604 }
1605 }
1606
1607 static struct lttng_live_msg_iter *
1608 lttng_live_msg_iter_create(struct lttng_live_component *lttng_live_comp,
1609 bt_self_message_iterator *self_msg_it)
1610 {
1611 lttng_live_msg_iter *msg_iter = new lttng_live_msg_iter {lttng_live_comp->logger};
1612 msg_iter->self_comp = lttng_live_comp->self_comp;
1613 msg_iter->lttng_live_comp = lttng_live_comp;
1614 msg_iter->self_msg_iter = self_msg_it;
1615
1616 msg_iter->active_stream_iter = 0;
1617 msg_iter->last_msg_ts_ns = INT64_MIN;
1618 msg_iter->was_interrupted = false;
1619
1620 msg_iter->sessions =
1621 g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_session);
1622 BT_ASSERT(msg_iter->sessions);
1623
1624 return msg_iter;
1625 }
1626
1627 bt_message_iterator_class_initialize_method_status
1628 lttng_live_msg_iter_init(bt_self_message_iterator *self_msg_it,
1629 bt_self_message_iterator_configuration *, bt_self_component_port_output *)
1630 {
1631 try {
1632 bt_message_iterator_class_initialize_method_status status;
1633 struct lttng_live_component *lttng_live;
1634 struct lttng_live_msg_iter *lttng_live_msg_iter;
1635 enum lttng_live_viewer_status viewer_status;
1636 bt_self_component *self_comp = bt_self_message_iterator_borrow_component(self_msg_it);
1637
1638 lttng_live = (lttng_live_component *) bt_self_component_get_data(self_comp);
1639
1640 /* There can be only one downstream iterator at the same time. */
1641 BT_ASSERT(!lttng_live->has_msg_iter);
1642 lttng_live->has_msg_iter = true;
1643
1644 lttng_live_msg_iter = lttng_live_msg_iter_create(lttng_live, self_msg_it);
1645 if (!lttng_live_msg_iter) {
1646 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live->logger,
1647 "Failed to create lttng_live_msg_iter");
1648 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1649 goto error;
1650 }
1651
1652 viewer_status = live_viewer_connection_create(
1653 lttng_live->params.url.c_str(), false, lttng_live_msg_iter, lttng_live_msg_iter->logger,
1654 &lttng_live_msg_iter->viewer_connection);
1655 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1656 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1657 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1658 "Failed to create viewer connection");
1659 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1660 /*
1661 * Interruption in the _iter_init() method is not
1662 * supported. Return an error.
1663 */
1664 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1665 "Interrupted while creating viewer connection");
1666 }
1667
1668 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1669 goto error;
1670 }
1671
1672 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1673 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1674 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1675 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1676 "Failed to create viewer session");
1677 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1678 /*
1679 * Interruption in the _iter_init() method is not
1680 * supported. Return an error.
1681 */
1682 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1683 "Interrupted when creating viewer session");
1684 }
1685
1686 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1687 goto error;
1688 }
1689
1690 if (lttng_live_msg_iter->sessions->len == 0) {
1691 switch (lttng_live->params.sess_not_found_act) {
1692 case SESSION_NOT_FOUND_ACTION_CONTINUE:
1693 BT_CPPLOGI_SPEC(
1694 lttng_live_msg_iter->logger,
1695 "Unable to connect to the requested live viewer session. "
1696 "Keep trying to connect because of {}=\"{}\" component parameter: url=\"{}\"",
1697 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR,
1698 lttng_live->params.url);
1699 break;
1700 case SESSION_NOT_FOUND_ACTION_FAIL:
1701 BT_CPPLOGE_APPEND_CAUSE_SPEC(
1702 lttng_live_msg_iter->logger,
1703 "Unable to connect to the requested live viewer session. "
1704 "Fail the message iterator initialization because of {}=\"{}\" "
1705 "component parameter: url =\"{}\"",
1706 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_FAIL_STR,
1707 lttng_live->params.url);
1708 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1709 goto error;
1710 case SESSION_NOT_FOUND_ACTION_END:
1711 BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger,
1712 "Unable to connect to the requested live viewer session. "
1713 "End gracefully at the first _next() call because of {}=\"{}\""
1714 " component parameter: url=\"{}\"",
1715 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_END_STR,
1716 lttng_live->params.url);
1717 break;
1718 default:
1719 bt_common_abort();
1720 }
1721 }
1722
1723 bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
1724 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
1725 goto end;
1726
1727 error:
1728 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
1729 end:
1730 return status;
1731 } catch (const std::bad_alloc&) {
1732 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1733 } catch (const bt2::Error&) {
1734 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1735 }
1736 }
1737
1738 static struct bt_param_validation_map_value_entry_descr list_sessions_params[] = {
1739 {URL_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
1740 bt_param_validation_value_descr::makeString()},
1741 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
1742
1743 static bt_component_class_query_method_status
1744 lttng_live_query_list_sessions(const bt_value *params, const bt_value **result,
1745 const bt2c::Logger& logger)
1746 {
1747 bt_component_class_query_method_status status;
1748 const bt_value *url_value = NULL;
1749 const char *url;
1750 struct live_viewer_connection *viewer_connection = NULL;
1751 enum lttng_live_viewer_status viewer_status;
1752 enum bt_param_validation_status validation_status;
1753 gchar *validate_error = NULL;
1754
1755 validation_status = bt_param_validation_validate(params, list_sessions_params, &validate_error);
1756 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
1757 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1758 goto error;
1759 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
1760 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1761 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "{}", validate_error);
1762 goto error;
1763 }
1764
1765 url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
1766 url = bt_value_string_get(url_value);
1767
1768 viewer_status = live_viewer_connection_create(url, true, NULL, logger, &viewer_connection);
1769 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1770 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1771 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Failed to create viewer connection");
1772 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1773 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1774 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
1775 } else {
1776 bt_common_abort();
1777 }
1778 goto error;
1779 }
1780
1781 status = live_viewer_connection_list_sessions(viewer_connection, result);
1782 if (status != BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK) {
1783 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Failed to list viewer sessions");
1784 goto error;
1785 }
1786
1787 goto end;
1788
1789 error:
1790 BT_VALUE_PUT_REF_AND_RESET(*result);
1791
1792 if (status >= 0) {
1793 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1794 }
1795
1796 end:
1797 if (viewer_connection) {
1798 live_viewer_connection_destroy(viewer_connection);
1799 }
1800
1801 g_free(validate_error);
1802
1803 return status;
1804 }
1805
1806 static bt_component_class_query_method_status
1807 lttng_live_query_support_info(const bt_value *params, const bt_value **result,
1808 const bt2c::Logger& logger)
1809 {
1810 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
1811 const bt_value *input_type_value;
1812 const bt_value *input_value;
1813 double weight = 0;
1814 struct bt_common_lttng_live_url_parts parts = {};
1815
1816 /* Used by the logging macros */
1817 __attribute__((unused)) bt_self_component *self_comp = NULL;
1818
1819 *result = NULL;
1820 input_type_value = bt_value_map_borrow_entry_value_const(params, "type");
1821 if (!input_type_value) {
1822 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Missing expected `type` parameter.");
1823 goto error;
1824 }
1825
1826 if (!bt_value_is_string(input_type_value)) {
1827 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "`type` parameter is not a string value.");
1828 goto error;
1829 }
1830
1831 if (strcmp(bt_value_string_get(input_type_value), "string") != 0) {
1832 /* We don't handle file system paths */
1833 goto create_result;
1834 }
1835
1836 input_value = bt_value_map_borrow_entry_value_const(params, "input");
1837 if (!input_value) {
1838 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Missing expected `input` parameter.");
1839 goto error;
1840 }
1841
1842 if (!bt_value_is_string(input_value)) {
1843 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "`input` parameter is not a string value.");
1844 goto error;
1845 }
1846
1847 parts = bt_common_parse_lttng_live_url(bt_value_string_get(input_value), NULL, 0);
1848 if (parts.session_name) {
1849 /*
1850 * Looks pretty much like an LTTng live URL: we got the
1851 * session name part, which forms a complete URL.
1852 */
1853 weight = .75;
1854 }
1855
1856 create_result:
1857 *result = bt_value_real_create_init(weight);
1858 if (!*result) {
1859 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1860 goto error;
1861 }
1862
1863 goto end;
1864
1865 error:
1866 if (status >= 0) {
1867 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1868 }
1869
1870 BT_ASSERT(!*result);
1871
1872 end:
1873 bt_common_destroy_lttng_live_url_parts(&parts);
1874 return status;
1875 }
1876
1877 bt_component_class_query_method_status lttng_live_query(bt_self_component_class_source *comp_class,
1878 bt_private_query_executor *priv_query_exec,
1879 const char *object, const bt_value *params,
1880 __attribute__((unused)) void *method_data,
1881 const bt_value **result)
1882 {
1883 try {
1884 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
1885 bt2c::Logger logger {bt2::SelfComponentClass {comp_class},
1886 bt2::PrivateQueryExecutor {priv_query_exec},
1887 "PLUGIN/SRC.CTF.LTTNG-LIVE/QUERY"};
1888
1889 if (strcmp(object, "sessions") == 0) {
1890 status = lttng_live_query_list_sessions(params, result, logger);
1891 } else if (strcmp(object, "babeltrace.support-info") == 0) {
1892 status = lttng_live_query_support_info(params, result, logger);
1893 } else {
1894 BT_CPPLOGI_SPEC(logger, "Unknown query object `{}`", object);
1895 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT;
1896 goto end;
1897 }
1898
1899 end:
1900 return status;
1901 } catch (const std::bad_alloc&) {
1902 return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1903 } catch (const bt2::Error&) {
1904 return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1905 }
1906 }
1907
1908 void lttng_live_component_finalize(bt_self_component_source *component)
1909 {
1910 lttng_live_component::UP {static_cast<lttng_live_component *>(
1911 bt_self_component_get_data(bt_self_component_source_as_self_component(component)))};
1912 }
1913
1914 static enum session_not_found_action
1915 parse_session_not_found_action_param(const bt_value *no_session_param)
1916 {
1917 enum session_not_found_action action;
1918 const char *no_session_act_str = bt_value_string_get(no_session_param);
1919
1920 if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) {
1921 action = SESSION_NOT_FOUND_ACTION_CONTINUE;
1922 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) {
1923 action = SESSION_NOT_FOUND_ACTION_FAIL;
1924 } else {
1925 BT_ASSERT(strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0);
1926 action = SESSION_NOT_FOUND_ACTION_END;
1927 }
1928
1929 return action;
1930 }
1931
1932 static bt_param_validation_value_descr inputs_elem_descr =
1933 bt_param_validation_value_descr::makeString();
1934
1935 static const char *sess_not_found_action_choices[] = {
1936 SESS_NOT_FOUND_ACTION_CONTINUE_STR,
1937 SESS_NOT_FOUND_ACTION_FAIL_STR,
1938 SESS_NOT_FOUND_ACTION_END_STR,
1939 };
1940
1941 static struct bt_param_validation_map_value_entry_descr params_descr[] = {
1942 {INPUTS_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
1943 bt_param_validation_value_descr::makeArray(1, 1, inputs_elem_descr)},
1944 {SESS_NOT_FOUND_ACTION_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
1945 bt_param_validation_value_descr::makeString(sess_not_found_action_choices)},
1946 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
1947
1948 static bt_component_class_initialize_method_status
1949 lttng_live_component_create(const bt_value *params, bt_self_component_source *self_comp,
1950 lttng_live_component::UP& component)
1951 {
1952 const bt_value *inputs_value;
1953 const bt_value *url_value;
1954 const bt_value *value;
1955 enum bt_param_validation_status validation_status;
1956 gchar *validation_error = NULL;
1957 bt2c::Logger logger {bt2::SelfSourceComponent {self_comp}, "PLUGIN/SRC.CTF.LTTNG-LIVE/COMP"};
1958
1959 validation_status = bt_param_validation_validate(params, params_descr, &validation_error);
1960 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
1961 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1962 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
1963 bt2c::GCharUP errorFreer {validation_error};
1964 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "{}", validation_error);
1965 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1966 }
1967
1968 auto lttng_live = bt2s::make_unique<lttng_live_component>(std::move(logger));
1969 lttng_live->self_comp = bt_self_component_source_as_self_component(self_comp);
1970 lttng_live->max_query_size = MAX_QUERY_SIZE;
1971 lttng_live->has_msg_iter = false;
1972
1973 inputs_value = bt_value_map_borrow_entry_value_const(params, INPUTS_PARAM);
1974 url_value = bt_value_array_borrow_element_by_index_const(inputs_value, 0);
1975 lttng_live->params.url = bt_value_string_get(url_value);
1976
1977 value = bt_value_map_borrow_entry_value_const(params, SESS_NOT_FOUND_ACTION_PARAM);
1978 if (value) {
1979 lttng_live->params.sess_not_found_act = parse_session_not_found_action_param(value);
1980 } else {
1981 BT_CPPLOGI_SPEC(lttng_live->logger,
1982 "Optional `{}` parameter is missing: defaulting to `{}`.",
1983 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR);
1984 lttng_live->params.sess_not_found_act = SESSION_NOT_FOUND_ACTION_CONTINUE;
1985 }
1986
1987 component = std::move(lttng_live);
1988 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
1989 }
1990
1991 bt_component_class_initialize_method_status
1992 lttng_live_component_init(bt_self_component_source *self_comp_src,
1993 bt_self_component_source_configuration *, const bt_value *params, void *)
1994 {
1995 try {
1996 lttng_live_component::UP lttng_live;
1997 bt_component_class_initialize_method_status ret;
1998 bt_self_component *self_comp = bt_self_component_source_as_self_component(self_comp_src);
1999 bt_self_component_add_port_status add_port_status;
2000
2001 ret = lttng_live_component_create(params, self_comp_src, lttng_live);
2002 if (ret != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
2003 return ret;
2004 }
2005
2006 add_port_status =
2007 bt_self_component_source_add_output_port(self_comp_src, "out", NULL, NULL);
2008 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
2009 ret = (bt_component_class_initialize_method_status) add_port_status;
2010 return ret;
2011 }
2012
2013 bt_self_component_set_data(self_comp, lttng_live.release());
2014
2015 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
2016 } catch (const std::bad_alloc&) {
2017 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2018 } catch (const bt2::Error&) {
2019 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
2020 }
2021 }
This page took 0.11424 seconds and 4 git commands to generate.