src.ctf.lttng-live: add bt_common_lttng_live_url_parts_deleter
[deliverable/babeltrace.git] / src / plugins / ctf / lttng-live / lttng-live.cpp
1 /*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
5 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
6 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
7 *
8 * Babeltrace CTF LTTng-live Client Component
9 */
10
11 #define BT_CLOG_CFG logCfg
12 #define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE"
13
14 #include <inttypes.h>
15 #include <stdbool.h>
16 #include <unistd.h>
17
18 #include <glib.h>
19
20 #include "common/assert.h"
21 #include <babeltrace2/babeltrace.h>
22 #include "compat/compiler.h"
23 #include <babeltrace2/types.h>
24 #include "cpp-common/exc.hpp"
25 #include "cpp-common/glib-up.hpp"
26 #include "cpp-common/make-unique.hpp"
27 #include "cpp-common/vector.hpp"
28
29 #include "plugins/common/muxing/muxing.h"
30 #include "plugins/common/param-validation/param-validation.h"
31 #include "cpp-common/cfg-logging-error-reporting.hpp"
32
33 #include "data-stream.hpp"
34 #include "metadata.hpp"
35 #include "lttng-live.hpp"
36
37 #define MAX_QUERY_SIZE (256 * 1024)
38 #define URL_PARAM "url"
39 #define INPUTS_PARAM "inputs"
40 #define SESS_NOT_FOUND_ACTION_PARAM "session-not-found-action"
41 #define SESS_NOT_FOUND_ACTION_CONTINUE_STR "continue"
42 #define SESS_NOT_FOUND_ACTION_FAIL_STR "fail"
43 #define SESS_NOT_FOUND_ACTION_END_STR "end"
44
45 #define print_dbg(fmt, ...) BT_CLOGD(fmt, ##__VA_ARGS__)
46
47 static const char *lttng_live_iterator_status_string(enum lttng_live_iterator_status status)
48 {
49 switch (status) {
50 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
51 return "LTTNG_LIVE_ITERATOR_STATUS_CONTINUE";
52 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
53 return "LTTNG_LIVE_ITERATOR_STATUS_AGAIN";
54 case LTTNG_LIVE_ITERATOR_STATUS_END:
55 return "LTTNG_LIVE_ITERATOR_STATUS_END";
56 case LTTNG_LIVE_ITERATOR_STATUS_OK:
57 return "LTTNG_LIVE_ITERATOR_STATUS_OK";
58 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
59 return "LTTNG_LIVE_ITERATOR_STATUS_INVAL";
60 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
61 return "LTTNG_LIVE_ITERATOR_STATUS_ERROR";
62 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
63 return "LTTNG_LIVE_ITERATOR_STATUS_NOMEM";
64 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
65 return "LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED";
66 default:
67 bt_common_abort();
68 }
69 }
70
71 static const char *lttng_live_stream_state_string(enum lttng_live_stream_state state)
72 {
73 switch (state) {
74 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
75 return "ACTIVE_NO_DATA";
76 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
77 return "QUIESCENT_NO_DATA";
78 case LTTNG_LIVE_STREAM_QUIESCENT:
79 return "QUIESCENT";
80 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
81 return "ACTIVE_DATA";
82 case LTTNG_LIVE_STREAM_EOF:
83 return "EOF";
84 default:
85 return "ERROR";
86 }
87 }
88
89 void lttng_live_stream_iterator_set_state(struct lttng_live_stream_iterator *stream_iter,
90 enum lttng_live_stream_state new_state)
91 {
92 const bt2_common::LogCfg& logCfg = stream_iter->logCfg;
93
94 BT_CLOGD("Setting live stream iterator state: viewer-stream-id=%" PRIu64
95 ", old-state=%s, new-state=%s",
96 stream_iter->viewer_stream_id, lttng_live_stream_state_string(stream_iter->state),
97 lttng_live_stream_state_string(new_state));
98
99 stream_iter->state = new_state;
100 }
101
102 #define LTTNG_LIVE_LOGD_STREAM_ITER(live_stream_iter) \
103 do { \
104 BT_CLOGD("Live stream iterator state=%s, " \
105 "last-inact-ts-is-set=%d, last-inact-ts-value=%" PRId64 ", " \
106 "curr-inact-ts=%" PRId64, \
107 lttng_live_stream_state_string(live_stream_iter->state), \
108 live_stream_iter->last_inactivity_ts.is_set, \
109 live_stream_iter->last_inactivity_ts.value, \
110 live_stream_iter->current_inactivity_ts); \
111 } while (0);
112
113 BT_HIDDEN
114 bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter)
115 {
116 bool ret;
117
118 if (!msg_iter) {
119 ret = false;
120 goto end;
121 }
122
123 ret = bt_self_message_iterator_is_interrupted(msg_iter->self_msg_iter);
124
125 end:
126 return ret;
127 }
128
129 static struct lttng_live_trace *
130 lttng_live_session_borrow_trace_by_id(struct lttng_live_session *session, uint64_t trace_id)
131 {
132 for (lttng_live_trace::UP& trace : session->traces) {
133 if (trace->id == trace_id) {
134 return trace.get();
135 }
136 }
137
138 return nullptr;
139 }
140
141 static struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
142 uint64_t trace_id)
143 {
144 const bt2_common::LogCfg& logCfg = session->logCfg;
145
146 BT_CLOGD("Creating live trace: "
147 "session-id=%" PRIu64 ", trace-id=%" PRIu64,
148 session->id, trace_id);
149
150 lttng_live_trace::UP trace = bt2_common::makeUnique<lttng_live_trace>(logCfg);
151 trace->session = session;
152 trace->id = trace_id;
153 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
154
155 lttng_live_trace *ret = trace.get();
156 session->traces.emplace_back(std::move(trace));
157
158 return ret;
159 }
160
161 BT_HIDDEN
162 struct lttng_live_trace *
163 lttng_live_session_borrow_or_create_trace_by_id(struct lttng_live_session *session,
164 uint64_t trace_id)
165 {
166 struct lttng_live_trace *trace;
167
168 trace = lttng_live_session_borrow_trace_by_id(session, trace_id);
169 if (trace) {
170 goto end;
171 }
172
173 /* The session is the owner of the newly created trace. */
174 trace = lttng_live_create_trace(session, trace_id);
175
176 end:
177 return trace;
178 }
179
180 BT_HIDDEN
181 int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint64_t session_id,
182 const char *hostname, const char *session_name)
183 {
184 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
185
186 BT_CLOGD("Adding live session: "
187 "session-id=%" PRIu64 ", hostname=\"%s\" session-name=\"%s\"",
188 session_id, hostname, session_name);
189
190 lttng_live_session::UP session = bt2_common::makeUnique<lttng_live_session>(logCfg);
191 session->self_comp = lttng_live_msg_iter->self_comp;
192 session->id = session_id;
193 session->lttng_live_msg_iter = lttng_live_msg_iter;
194 session->new_streams_needed = true;
195 session->hostname = hostname;
196 session->session_name = session_name;
197
198 lttng_live_msg_iter->sessions.emplace_back(std::move(session));
199
200 return 0;
201 }
202
203 lttng_live_session::~lttng_live_session()
204 {
205 BT_CLOGD("Destroying live session: "
206 "session-id=%" PRIu64 ", session-name=\"%s\"",
207 this->id, this->session_name.c_str());
208 if (this->id != -1ULL) {
209 if (lttng_live_session_detach(this)) {
210 if (!lttng_live_graph_is_canceled(this->lttng_live_msg_iter)) {
211 /* Old relayd cannot detach sessions. */
212 BT_CLOGD("Unable to detach lttng live session %" PRIu64, this->id);
213 }
214 }
215 this->id = -1ULL;
216 }
217 }
218
219 lttng_live_msg_iter::~lttng_live_msg_iter()
220 {
221 BT_ASSERT(this->lttng_live_comp);
222 BT_ASSERT(this->lttng_live_comp->has_msg_iter);
223
224 /* All stream iterators must be destroyed at this point. */
225 BT_ASSERT(this->active_stream_iter == 0);
226 this->lttng_live_comp->has_msg_iter = false;
227 }
228
229 BT_HIDDEN
230 void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
231 {
232 lttng_live_msg_iter::UP {
233 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_iter)};
234 }
235
236 static enum lttng_live_iterator_status
237 lttng_live_iterator_next_check_stream_state(struct lttng_live_stream_iterator *lttng_live_stream)
238 {
239 const bt2_common::LogCfg& logCfg = lttng_live_stream->logCfg;
240
241 switch (lttng_live_stream->state) {
242 case LTTNG_LIVE_STREAM_QUIESCENT:
243 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
244 break;
245 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
246 /* Invalid state. */
247 BT_CLOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
248 bt_common_abort();
249 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
250 /* Invalid state. */
251 BT_CLOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
252 bt_common_abort();
253 case LTTNG_LIVE_STREAM_EOF:
254 break;
255 }
256 return LTTNG_LIVE_ITERATOR_STATUS_OK;
257 }
258
259 /*
260 * For active no data stream, fetch next index. As a result of that it can
261 * become either:
262 * - quiescent: won't have events for a bit,
263 * - have data: need to get that data and produce the event,
264 * - have no data on this stream at this point: need to retry (AGAIN) or return
265 * EOF.
266 */
267 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
268 struct lttng_live_msg_iter *lttng_live_msg_iter,
269 struct lttng_live_stream_iterator *lttng_live_stream)
270 {
271 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
272 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
273 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
274 struct packet_index index;
275
276 if (lttng_live_stream->trace->metadata_stream_state ==
277 LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
278 BT_CLOGD(
279 "Need to get an update for the metadata stream before proceeding further with this stream: "
280 "stream-name=\"%s\"",
281 lttng_live_stream->name.c_str());
282 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
283 goto end;
284 }
285
286 if (lttng_live_stream->trace->session->new_streams_needed) {
287 BT_CLOGD("Need to get an update of all streams before proceeding further with this stream: "
288 "stream-name=\"%s\"",
289 lttng_live_stream->name.c_str());
290 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
291 goto end;
292 }
293
294 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
295 lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
296 goto end;
297 }
298 ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream, &index);
299 if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
300 goto end;
301 }
302
303 BT_ASSERT_DBG(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
304
305 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
306 uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts.value,
307 curr_inact_ts = lttng_live_stream->current_inactivity_ts;
308
309 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA && last_inact_ts == curr_inact_ts) {
310 /*
311 * Because the stream is in the QUIESCENT_NO_DATA
312 * state, we can assert that the last_inactivity_ts was
313 * set and can be safely used in the `if` above.
314 */
315 BT_ASSERT(lttng_live_stream->last_inactivity_ts.is_set);
316
317 ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
318 LTTNG_LIVE_LOGD_STREAM_ITER(lttng_live_stream);
319 } else {
320 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
321 }
322 goto end;
323 }
324
325 lttng_live_stream->base_offset = index.offset;
326 lttng_live_stream->offset = index.offset;
327 lttng_live_stream->len = index.packet_size / CHAR_BIT;
328
329 BT_CLOGD("Setting live stream reading info: stream-name=\"%s\", "
330 "viewer-stream-id=%" PRIu64 ", stream-base-offset=%" PRIu64 ", stream-offset=%" PRIu64
331 ", stream-len=%" PRIu64,
332 lttng_live_stream->name.c_str(), lttng_live_stream->viewer_stream_id,
333 lttng_live_stream->base_offset, lttng_live_stream->offset, lttng_live_stream->len);
334
335 end:
336 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
337 ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
338 }
339 return ret;
340 }
341
342 /*
343 * Creation of the message requires the ctf trace class to be created
344 * beforehand, but the live protocol gives us all streams (including metadata)
345 * at once. So we split it in three steps: getting streams, getting metadata
346 * (which creates the ctf trace class), and then creating the per-stream
347 * messages.
348 */
349 static enum lttng_live_iterator_status
350 lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
351 struct lttng_live_session *session)
352 {
353 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
354 enum lttng_live_iterator_status status;
355
356 if (!session->attached) {
357 BT_CLOGD("Attach to session: session-id=%" PRIu64, session->id);
358 enum lttng_live_viewer_status attach_status =
359 lttng_live_session_attach(session, lttng_live_msg_iter->self_msg_iter);
360 if (attach_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
361 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
362 /*
363 * Clear any causes appended in
364 * `lttng_live_attach_session()` as we want to
365 * return gracefully since the graph was
366 * cancelled.
367 */
368 bt_current_thread_clear_error();
369 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
370 } else {
371 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
372 BT_CLOGE_APPEND_CAUSE("Error attaching to LTTng live session");
373 }
374 goto end;
375 }
376 }
377
378 BT_CLOGD("Updating all data streams: "
379 "session-id=%" PRIu64 ", session-name=\"%s\"",
380 session->id, session->session_name.c_str());
381
382 status = lttng_live_session_get_new_streams(session, lttng_live_msg_iter->self_msg_iter);
383 switch (status) {
384 case LTTNG_LIVE_ITERATOR_STATUS_OK:
385 break;
386 case LTTNG_LIVE_ITERATOR_STATUS_END:
387 /*
388 * We received a `_END` from the `_get_new_streams()` function,
389 * which means no more data will ever be received from the data
390 * streams of this session. But it's possible that the metadata
391 * is incomplete.
392 * The live protocol guarantees that we receive all the
393 * metadata needed before we receive data streams needing it.
394 * But it's possible to receive metadata NOT needed by
395 * data streams after the session was closed. For example, this
396 * could happen if a new event is registered and the session is
397 * stopped before any tracepoint for that event is actually
398 * fired.
399 */
400 BT_CLOGD(
401 "Updating streams returned _END status. Override status to _OK in order fetch any remaining metadata:"
402 "session-id=%" PRIu64 ", session-name=\"%s\"",
403 session->id, session->session_name.c_str());
404 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
405 break;
406 default:
407 goto end;
408 }
409
410 BT_CLOGD("Updating metadata stream for session: "
411 "session-id=%" PRIu64 ", session-name=\"%s\"",
412 session->id, session->session_name.c_str());
413
414 for (lttng_live_trace::UP& trace : session->traces) {
415 status = lttng_live_metadata_update(trace.get());
416 switch (status) {
417 case LTTNG_LIVE_ITERATOR_STATUS_END:
418 case LTTNG_LIVE_ITERATOR_STATUS_OK:
419 break;
420 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
421 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
422 goto end;
423 default:
424 BT_CLOGE_APPEND_CAUSE("Error updating trace metadata: "
425 "stream-iter-status=%s, trace-id=%" PRIu64,
426 lttng_live_iterator_status_string(status), trace->id);
427 goto end;
428 }
429 }
430
431 /*
432 * Now that we have the metadata we can initialize the downstream
433 * iterator.
434 */
435 status = lttng_live_lazy_msg_init(session, lttng_live_msg_iter->self_msg_iter);
436
437 end:
438 return status;
439 }
440
441 static void
442 lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
443 {
444 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
445
446 for (lttng_live_session::UP& session : lttng_live_msg_iter->sessions) {
447 BT_CLOGD("Force marking session as needing new streams: "
448 "session-id=%" PRIu64,
449 session->id);
450 session->new_streams_needed = true;
451 for (lttng_live_trace::UP& trace : session->traces) {
452 BT_CLOGD("Force marking trace metadata state as needing an update: "
453 "session-id=%" PRIu64 ", trace-id=%" PRIu64,
454 session->id, trace->id);
455
456 BT_ASSERT(trace->metadata_stream_state != LTTNG_LIVE_METADATA_STREAM_STATE_CLOSED);
457
458 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
459 }
460 }
461 }
462
463 static enum lttng_live_iterator_status
464 lttng_live_iterator_handle_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
465 {
466 enum lttng_live_iterator_status status;
467 enum lttng_live_viewer_status viewer_status;
468 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
469 uint64_t nr_sessions_opened = 0;
470 enum session_not_found_action sess_not_found_act =
471 lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
472
473 BT_CLOGD("Update data and metadata of all sessions: "
474 "live-msg-iter-addr=%p",
475 lttng_live_msg_iter);
476 /*
477 * In a remotely distant future, we could add a "new
478 * session" flag to the protocol, which would tell us that we
479 * need to query for new sessions even though we have sessions
480 * currently ongoing.
481 */
482 if (lttng_live_msg_iter->sessions.empty()) {
483 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
484 BT_CLOGD(
485 "No session found. Exiting in accordance with the `session-not-found-action` parameter");
486 status = LTTNG_LIVE_ITERATOR_STATUS_END;
487 goto end;
488 } else {
489 BT_CLOGD(
490 "No session found. Try creating a new one in accordance with the `session-not-found-action` parameter");
491 /*
492 * Retry to create a viewer session for the requested
493 * session name.
494 */
495 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
496 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
497 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
498 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
499 BT_CLOGE_APPEND_CAUSE("Error creating LTTng live viewer session");
500 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
501 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
502 } else {
503 bt_common_abort();
504 }
505 goto end;
506 }
507 }
508 }
509
510 for (lttng_live_session::UP& session : lttng_live_msg_iter->sessions) {
511 status = lttng_live_get_session(lttng_live_msg_iter, session.get());
512 switch (status) {
513 case LTTNG_LIVE_ITERATOR_STATUS_OK:
514 case LTTNG_LIVE_ITERATOR_STATUS_END:
515 /*
516 * A session returned `_END`. Other sessions may still
517 * be active so we override the status and continue
518 * looping if needed.
519 */
520 break;
521 default:
522 goto end;
523 }
524 if (!session->closed) {
525 nr_sessions_opened++;
526 }
527 }
528
529 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE && nr_sessions_opened == 0) {
530 status = LTTNG_LIVE_ITERATOR_STATUS_END;
531 } else {
532 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
533 }
534
535 end:
536 return status;
537 }
538
539 static enum lttng_live_iterator_status
540 emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
541 struct lttng_live_stream_iterator *stream_iter,
542 nonstd::optional<bt2::ConstMessage::Shared>& message, uint64_t timestamp)
543 {
544 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
545 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
546 bt_message *msg = NULL;
547
548 BT_ASSERT(stream_iter->trace->clock_class);
549
550 BT_CLOGD("Emitting inactivity message for stream: ctf-stream-id=%" PRIu64
551 ", viewer-stream-id=%" PRIu64 ", timestamp=%" PRIu64,
552 stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id, timestamp);
553
554 msg = bt_message_message_iterator_inactivity_create(lttng_live_msg_iter->self_msg_iter,
555 stream_iter->trace->clock_class, timestamp);
556 if (!msg) {
557 BT_CLOGE_APPEND_CAUSE("Error emitting message iterator inactivity message");
558 goto error;
559 }
560
561 message = bt2::ConstMessage::Shared::createWithoutRef(msg);
562
563 end:
564 return ret;
565
566 error:
567 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
568 goto end;
569 }
570
571 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
572 struct lttng_live_msg_iter *lttng_live_msg_iter,
573 struct lttng_live_stream_iterator *lttng_live_stream,
574 nonstd::optional<bt2::ConstMessage::Shared>& message)
575 {
576 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
577
578 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
579 return LTTNG_LIVE_ITERATOR_STATUS_OK;
580 }
581
582 /*
583 * Check if we already sent an inactivty message downstream for this
584 * `current_inactivity_ts` value.
585 */
586 if (lttng_live_stream->last_inactivity_ts.is_set &&
587 lttng_live_stream->current_inactivity_ts == lttng_live_stream->last_inactivity_ts.value) {
588 lttng_live_stream_iterator_set_state(lttng_live_stream,
589 LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA);
590
591 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
592 goto end;
593 }
594
595 ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream, message,
596 lttng_live_stream->current_inactivity_ts);
597
598 lttng_live_stream->last_inactivity_ts.value = lttng_live_stream->current_inactivity_ts;
599 lttng_live_stream->last_inactivity_ts.is_set = true;
600 end:
601 return ret;
602 }
603
604 static int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter,
605 struct lttng_live_msg_iter *lttng_live_msg_iter,
606 const bt_message *msg, int64_t last_msg_ts_ns, int64_t *ts_ns)
607 {
608 const bt_clock_snapshot *clock_snapshot = NULL;
609 int ret = 0;
610 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
611
612 BT_ASSERT_DBG(msg);
613 BT_ASSERT_DBG(ts_ns);
614
615 BT_CLOGD("Getting message's timestamp: iter-data-addr=%p, msg-addr=%p, "
616 "last-msg-ts=%" PRId64,
617 lttng_live_msg_iter, msg, last_msg_ts_ns);
618
619 switch (bt_message_get_type(msg)) {
620 case BT_MESSAGE_TYPE_EVENT:
621 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(msg);
622 break;
623 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
624 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
625 break;
626 case BT_MESSAGE_TYPE_PACKET_END:
627 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
628 break;
629 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
630 clock_snapshot =
631 bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
632 break;
633 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
634 clock_snapshot =
635 bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
636 break;
637 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
638 clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const(msg);
639 break;
640 default:
641 /* All the other messages have a higher priority */
642 BT_CLOGD_STR("Message has no timestamp: using the last message timestamp.");
643 *ts_ns = last_msg_ts_ns;
644 goto end;
645 }
646
647 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
648 if (ret) {
649 BT_CLOGE_APPEND_CAUSE("Cannot get nanoseconds from Epoch of clock snapshot: "
650 "clock-snapshot-addr=%p",
651 clock_snapshot);
652 goto error;
653 }
654
655 goto end;
656
657 error:
658 ret = -1;
659
660 end:
661 if (ret == 0) {
662 BT_CLOGD("Found message's timestamp: "
663 "iter-data-addr=%p, msg-addr=%p, "
664 "last-msg-ts=%" PRId64 ", ts=%" PRId64,
665 lttng_live_msg_iter, msg, last_msg_ts_ns, *ts_ns);
666 }
667
668 return ret;
669 }
670
671 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
672 struct lttng_live_msg_iter *lttng_live_msg_iter,
673 struct lttng_live_stream_iterator *lttng_live_stream,
674 nonstd::optional<bt2::ConstMessage::Shared>& message)
675 {
676 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
677 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
678 enum ctf_msg_iter_status status;
679
680 for (lttng_live_session::UP& session : lttng_live_msg_iter->sessions) {
681 if (session->new_streams_needed) {
682 BT_CLOGD("Need an update for streams: "
683 "session-id=%" PRIu64,
684 session->id);
685 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
686 goto end;
687 }
688 for (lttng_live_trace::UP& trace : session->traces) {
689 if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
690 BT_CLOGD("Need an update for metadata stream: "
691 "session-id=%" PRIu64 ", trace-id=%" PRIu64,
692 session->id, trace->id);
693 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
694 goto end;
695 }
696 }
697 }
698
699 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
700 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
701 BT_CLOGE_APPEND_CAUSE("Invalid state of live stream iterator"
702 "stream-iter-status=%s",
703 lttng_live_stream_state_string(lttng_live_stream->state));
704 goto end;
705 }
706
707 const bt_message *msg;
708 status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), &msg);
709 switch (status) {
710 case CTF_MSG_ITER_STATUS_EOF:
711 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
712 break;
713 case CTF_MSG_ITER_STATUS_OK:
714 message = bt2::ConstMessage::Shared::createWithoutRef(msg);
715 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
716 break;
717 case CTF_MSG_ITER_STATUS_AGAIN:
718 /*
719 * Continue immediately (end of packet). The next
720 * get_index may return AGAIN to delay the following
721 * attempt.
722 */
723 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
724 break;
725 case CTF_MSG_ITER_STATUS_ERROR:
726 default:
727 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
728 BT_CLOGE_APPEND_CAUSE("CTF message iterator failed to get next message: "
729 "msg-iter=%p, msg-iter-status=%s",
730 lttng_live_stream->msg_iter.get(),
731 ctf_msg_iter_status_string(status));
732 break;
733 }
734
735 end:
736 return ret;
737 }
738
739 static enum lttng_live_iterator_status
740 lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
741 struct lttng_live_stream_iterator *stream_iter,
742 nonstd::optional<bt2::ConstMessage::Shared>& curr_msg)
743 {
744 enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
745 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
746
747 BT_CLOGD("Closing live stream iterator: stream-name=\"%s\", "
748 "viewer-stream-id=%" PRIu64,
749 stream_iter->name.c_str(), stream_iter->viewer_stream_id);
750
751 /*
752 * The viewer has hung up on us so we are closing the stream. The
753 * `ctf_msg_iter` should simply realize that it needs to close the
754 * stream properly by emitting the necessary stream end message.
755 */
756 const bt_message *msg;
757 enum ctf_msg_iter_status status =
758 ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), &msg);
759
760 if (status == CTF_MSG_ITER_STATUS_ERROR) {
761 BT_CLOGE_APPEND_CAUSE("Error getting the next message from CTF message iterator");
762 live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
763 goto end;
764 } else if (status == CTF_MSG_ITER_STATUS_EOF) {
765 BT_CLOGI("Reached the end of the live stream iterator.");
766 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
767 goto end;
768 }
769
770 BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
771
772 curr_msg = bt2::ConstMessage::Shared::createWithoutRef(msg);
773
774 end:
775 return live_status;
776 }
777
778 /*
779 * helper function:
780 * handle_no_data_streams()
781 * retry:
782 * - for each ACTIVE_NO_DATA stream:
783 * - query relayd for stream data, or quiescence info.
784 * - if need metadata, get metadata, goto retry.
785 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
786 * - if quiescent, move to QUIESCENT streams
787 * - if fetched data, move to ACTIVE_DATA streams
788 * (at this point each stream either has data, or is quiescent)
789 *
790 *
791 * iterator_next:
792 * handle_new_streams_and_metadata()
793 * - query relayd for known streams, add them as ACTIVE_NO_DATA
794 * - query relayd for metadata
795 *
796 * call handle_active_no_data_streams()
797 *
798 * handle_quiescent_streams()
799 * - if at least one stream is ACTIVE_DATA:
800 * - peek stream event with lowest timestamp -> next_ts
801 * - for each quiescent stream
802 * - if next_ts >= quiescent end
803 * - set state to ACTIVE_NO_DATA
804 * - else
805 * - for each quiescent stream
806 * - set state to ACTIVE_NO_DATA
807 *
808 * call handle_active_no_data_streams()
809 *
810 * handle_active_data_streams()
811 * - if at least one stream is ACTIVE_DATA:
812 * - get stream event with lowest timestamp from heap
813 * - make that stream event the current message.
814 * - move this stream heap position to its next event
815 * - if we need to fetch data from relayd, move
816 * stream to ACTIVE_NO_DATA.
817 * - return OK
818 * - return AGAIN
819 *
820 * end criterion: ctrl-c on client. If relayd exits or the session
821 * closes on the relay daemon side, we keep on waiting for streams.
822 * Eventually handle --end timestamp (also an end criterion).
823 *
824 * When disconnected from relayd: try to re-connect endlessly.
825 */
826 static enum lttng_live_iterator_status
827 lttng_live_iterator_next_msg_on_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
828 struct lttng_live_stream_iterator *stream_iter,
829 nonstd::optional<bt2::ConstMessage::Shared>& curr_msg)
830 {
831 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
832 enum lttng_live_iterator_status live_status;
833
834 BT_CLOGD("Advancing live stream iterator until next message if possible: "
835 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
836 stream_iter->name.c_str(), stream_iter->viewer_stream_id);
837
838 if (stream_iter->has_stream_hung_up) {
839 /*
840 * The stream has hung up and the stream was properly closed
841 * during the last call to the current function. Return _END
842 * status now so that this stream iterator is removed for the
843 * stream iterator list.
844 */
845 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
846 goto end;
847 }
848
849 retry:
850 LTTNG_LIVE_LOGD_STREAM_ITER(stream_iter);
851
852 /*
853 * Make sure we have the most recent metadata and possibly some new
854 * streams.
855 */
856 live_status = lttng_live_iterator_handle_new_streams_and_metadata(lttng_live_msg_iter);
857 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
858 goto end;
859 }
860
861 live_status =
862 lttng_live_iterator_next_handle_one_no_data_stream(lttng_live_msg_iter, stream_iter);
863 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
864 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
865 /*
866 * We overwrite `live_status` since `curr_msg` is
867 * likely set to a valid message in this function.
868 */
869 live_status =
870 lttng_live_iterator_close_stream(lttng_live_msg_iter, stream_iter, curr_msg);
871 }
872 goto end;
873 }
874
875 live_status = lttng_live_iterator_next_handle_one_quiescent_stream(lttng_live_msg_iter,
876 stream_iter, curr_msg);
877 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
878 BT_ASSERT(!curr_msg);
879 goto end;
880 }
881 if (curr_msg) {
882 goto end;
883 }
884 live_status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live_msg_iter,
885 stream_iter, curr_msg);
886 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
887 BT_ASSERT(!curr_msg);
888 }
889
890 end:
891 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
892 BT_CLOGD("Ask the relay daemon for an updated view of the data and metadata streams");
893 goto retry;
894 }
895
896 BT_CLOGD("Returning from advancing live stream iterator: status=%s, "
897 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
898 lttng_live_iterator_status_string(live_status), stream_iter->name.c_str(),
899 stream_iter->viewer_stream_id);
900
901 return live_status;
902 }
903
904 static bool is_discarded_packet_or_event_message(bt2::ConstMessage msg)
905 {
906 const bt2::MessageType type = msg.type();
907
908 return type == bt2::MessageType::DISCARDED_EVENTS ||
909 type == bt2::MessageType::DISCARDED_PACKETS;
910 }
911
912 static enum lttng_live_iterator_status adjust_discarded_packets_message(
913 bt_self_message_iterator *iter, const bt_stream *stream, const bt_message *msg_in,
914 nonstd::optional<bt2::ConstMessage::Shared>& msg_out, uint64_t new_begin_ts)
915 {
916 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
917 enum bt_property_availability availability;
918 const bt_clock_snapshot *clock_snapshot;
919 uint64_t end_ts;
920 uint64_t count;
921
922 clock_snapshot = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg_in);
923 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
924
925 availability = bt_message_discarded_packets_get_count(msg_in, &count);
926 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
927
928 bt_message *msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
929 iter, stream, new_begin_ts, end_ts);
930 if (!msg) {
931 status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
932 goto end;
933 }
934
935 bt_message_discarded_packets_set_count(msg, count);
936 msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
937
938 end:
939 return status;
940 }
941
942 static enum lttng_live_iterator_status adjust_discarded_events_message(
943 bt_self_message_iterator *iter, const bt_stream *stream, const bt_message *msg_in,
944 nonstd::optional<bt2::ConstMessage::Shared>& msg_out, uint64_t new_begin_ts)
945 {
946 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
947 enum bt_property_availability availability;
948 const bt_clock_snapshot *clock_snapshot;
949 uint64_t end_ts;
950 uint64_t count;
951
952 clock_snapshot = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg_in);
953 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
954
955 availability = bt_message_discarded_events_get_count(msg_in, &count);
956 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
957
958 bt_message *msg = bt_message_discarded_events_create_with_default_clock_snapshots(
959 iter, stream, new_begin_ts, end_ts);
960 if (!msg) {
961 status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
962 goto end;
963 }
964
965 bt_message_discarded_events_set_count(msg, count);
966 msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
967
968 end:
969 return status;
970 }
971
972 static enum lttng_live_iterator_status
973 handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
974 struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns,
975 bt2::ConstMessage::Shared late_msg)
976 {
977 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
978 const bt_clock_class *clock_class;
979 const bt_stream_class *stream_class;
980 enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status;
981 int64_t last_inactivity_ts_ns;
982 enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
983 enum lttng_live_iterator_status adjust_status;
984 nonstd::optional<bt2::ConstMessage::Shared> adjusted_message;
985
986 /*
987 * The timestamp of the current message is before the last message sent
988 * by this component. We CANNOT send it as is.
989 *
990 * The only expected scenario in which that could happen is the
991 * following, everything else is a bug in this component, relay deamon,
992 * or CTF parser.
993 *
994 * Expected scenario: The CTF message iterator emitted discarded
995 * packets and discarded events with synthesized beginning and end
996 * timestamps from the bounds of the last known packet and the newly
997 * decoded packet header. The CTF message iterator is not aware of
998 * stream inactivity beacons. Hence, we have to adjust the beginning
999 * timestamp of those types of messages if a stream signalled its
1000 * inactivity up until _after_ the last known packet's beginning
1001 * timestamp.
1002 *
1003 * Otherwise, the monotonicity guarantee of message timestamps would
1004 * not be preserved.
1005 *
1006 * In short, the only scenario in which it's okay and fixable to
1007 * received a late message is when:
1008 * 1. the late message is a discarded packets or discarded events
1009 * message,
1010 * 2. this stream produced an inactivity message downstream, and
1011 * 3. the timestamp of the late message is within the inactivity
1012 * timespan we sent downstream through the inactivity message.
1013 */
1014
1015 BT_CLOGD("Handling late message on live stream iterator: "
1016 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
1017 stream_iter->name.c_str(), stream_iter->viewer_stream_id);
1018
1019 if (!stream_iter->last_inactivity_ts.is_set) {
1020 BT_CLOGE_APPEND_CAUSE("Invalid live stream state: "
1021 "have a late message when no inactivity message "
1022 "was ever sent for that stream.");
1023 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1024 goto end;
1025 }
1026
1027 if (!is_discarded_packet_or_event_message(*late_msg)) {
1028 BT_CLOGE_APPEND_CAUSE(
1029 "Invalid live stream state: "
1030 "have a late message that is not a packet discarded or "
1031 "event discarded message: late-msg-type=%s",
1032 bt_common_message_type_string(bt_message_get_type(late_msg->libObjPtr())));
1033 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1034 goto end;
1035 }
1036
1037 stream_class = bt_stream_borrow_class_const((*stream_iter->stream)->libObjPtr());
1038 clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class);
1039
1040 ts_ns_status = bt_clock_class_cycles_to_ns_from_origin(
1041 clock_class, stream_iter->last_inactivity_ts.value, &last_inactivity_ts_ns);
1042 if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) {
1043 BT_CLOGE_APPEND_CAUSE("Error converting last "
1044 "inactivity message timestamp to nanoseconds");
1045 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1046 goto end;
1047 }
1048
1049 if (last_inactivity_ts_ns <= late_msg_ts_ns) {
1050 BT_CLOGE_APPEND_CAUSE("Invalid live stream state: "
1051 "have a late message that is none included in a stream "
1052 "inactivity timespan: last-inactivity-ts-ns=%" PRIu64
1053 "late-msg-ts-ns=%" PRIu64,
1054 last_inactivity_ts_ns, late_msg_ts_ns);
1055 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1056 goto end;
1057 }
1058
1059 /*
1060 * We now know that it's okay for this message to be late, we can now
1061 * adjust its timestamp to ensure monotonicity.
1062 */
1063 BT_CLOGD("Adjusting the timestamp of late message: late-msg-type=%s, "
1064 "msg-new-ts-ns=%" PRIu64,
1065 bt_common_message_type_string(bt_message_get_type(late_msg->libObjPtr())),
1066 stream_iter->last_inactivity_ts.value);
1067 switch (late_msg->type()) {
1068 case bt2::MessageType::DISCARDED_EVENTS:
1069 adjust_status = adjust_discarded_events_message(
1070 lttng_live_msg_iter->self_msg_iter, (*stream_iter->stream)->libObjPtr(),
1071 late_msg->libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
1072 break;
1073 case bt2::MessageType::DISCARDED_PACKETS:
1074 adjust_status = adjust_discarded_packets_message(
1075 lttng_live_msg_iter->self_msg_iter, (*stream_iter->stream)->libObjPtr(),
1076 late_msg->libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
1077 break;
1078 default:
1079 bt_common_abort();
1080 }
1081
1082 if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1083 stream_iter_status = adjust_status;
1084 goto end;
1085 }
1086
1087 BT_ASSERT_DBG(adjusted_message);
1088 stream_iter->current_msg = adjusted_message;
1089 stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
1090
1091 end:
1092 return stream_iter_status;
1093 }
1094
1095 static enum lttng_live_iterator_status
1096 next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
1097 struct lttng_live_trace *live_trace,
1098 struct lttng_live_stream_iterator **youngest_trace_stream_iter)
1099 {
1100 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
1101 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
1102 enum lttng_live_iterator_status stream_iter_status;
1103 ;
1104 int64_t youngest_candidate_msg_ts = INT64_MAX;
1105 uint64_t stream_iter_idx;
1106
1107 BT_ASSERT_DBG(live_trace);
1108
1109 BT_CLOGD("Finding the next stream iterator for trace: "
1110 "trace-id=%" PRIu64,
1111 live_trace->id);
1112 /*
1113 * Update the current message of every stream iterators of this trace.
1114 * The current msg of every stream must have a timestamp equal or
1115 * larger than the last message returned by this iterator. We must
1116 * ensure monotonicity.
1117 */
1118 stream_iter_idx = 0;
1119 while (stream_iter_idx < live_trace->stream_iterators.size()) {
1120 bool stream_iter_is_ended = false;
1121 lttng_live_stream_iterator *stream_iter =
1122 live_trace->stream_iterators[stream_iter_idx].get();
1123
1124 /*
1125 * If there is no current message for this stream, go fetch
1126 * one.
1127 */
1128 while (!stream_iter->current_msg) {
1129 nonstd::optional<bt2::ConstMessage::Shared> msg;
1130 int64_t curr_msg_ts_ns = INT64_MAX;
1131
1132 stream_iter_status =
1133 lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, msg);
1134
1135 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1136 stream_iter_is_ended = true;
1137 break;
1138 }
1139
1140 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1141 goto end;
1142 }
1143
1144 BT_ASSERT_DBG(msg);
1145
1146 BT_CLOGD("Live stream iterator returned message: msg-type=%s, "
1147 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
1148 bt_common_message_type_string(bt_message_get_type((*msg)->libObjPtr())),
1149 stream_iter->name.c_str(), stream_iter->viewer_stream_id);
1150
1151 /*
1152 * Get the timestamp in nanoseconds from origin of this
1153 * messsage.
1154 */
1155 live_get_msg_ts_ns(stream_iter, lttng_live_msg_iter, (*msg)->libObjPtr(),
1156 lttng_live_msg_iter->last_msg_ts_ns, &curr_msg_ts_ns);
1157
1158 /*
1159 * Check if the message of the current live stream
1160 * iterator occurred at the exact same time or after the
1161 * last message returned by this component's message
1162 * iterator. If not, we need to handle it with care.
1163 */
1164 if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
1165 stream_iter->current_msg = std::move(*msg);
1166 stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
1167 } else {
1168 /*
1169 * We received a message from the past. This
1170 * may be fixable but it can also be an error.
1171 */
1172 stream_iter_status = handle_late_message(lttng_live_msg_iter, stream_iter,
1173 curr_msg_ts_ns, std::move(*msg));
1174 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1175 BT_CLOGE_APPEND_CAUSE("Late message could not be handled correctly: "
1176 "lttng-live-msg-iter-addr=%p, "
1177 "stream-name=\"%s\", "
1178 "curr-msg-ts=%" PRId64 ", last-msg-ts=%" PRId64,
1179 lttng_live_msg_iter, stream_iter->name.c_str(),
1180 curr_msg_ts_ns, lttng_live_msg_iter->last_msg_ts_ns);
1181 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1182 goto end;
1183 }
1184 }
1185 }
1186
1187 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1188
1189 if (!stream_iter_is_ended) {
1190 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1191 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1192 /*
1193 * Update the current best candidate message
1194 * for the stream iterator of this live trace
1195 * to be forwarded downstream.
1196 */
1197 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1198 youngest_candidate_stream_iter = stream_iter;
1199 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1200 /*
1201 * Order the messages in an arbitrary but
1202 * deterministic way.
1203 */
1204 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1205 int ret = common_muxing_compare_messages(
1206 (*stream_iter->current_msg)->libObjPtr(),
1207 (*youngest_candidate_stream_iter->current_msg)->libObjPtr());
1208 if (ret < 0) {
1209 /*
1210 * The `youngest_candidate_stream_iter->current_msg`
1211 * should go first. Update the next
1212 * iterator and the current timestamp.
1213 */
1214 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1215 youngest_candidate_stream_iter = stream_iter;
1216 } else if (ret == 0) {
1217 /*
1218 * Unable to pick which one should go
1219 * first.
1220 */
1221 BT_CLOGW(
1222 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1223 "stream-iter-addr=%p"
1224 "stream-iter-addr=%p",
1225 stream_iter, youngest_candidate_stream_iter);
1226 }
1227 }
1228
1229 stream_iter_idx++;
1230 } else {
1231 /*
1232 * The live stream iterator has ended. That
1233 * iterator is removed from the array, but
1234 * there is no need to increment
1235 * stream_iter_idx as
1236 * g_ptr_array_remove_index_fast replaces the
1237 * removed element with the array's last
1238 * element.
1239 */
1240 bt2_common::vectorFastRemove(live_trace->stream_iterators, stream_iter_idx);
1241 }
1242 }
1243
1244 if (youngest_candidate_stream_iter) {
1245 *youngest_trace_stream_iter = youngest_candidate_stream_iter;
1246 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1247 } else {
1248 /*
1249 * The only case where we don't have a candidate for this trace
1250 * is if we reached the end of all the iterators.
1251 */
1252 BT_ASSERT(live_trace->stream_iterators.empty());
1253 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1254 }
1255
1256 end:
1257 return stream_iter_status;
1258 }
1259
1260 static enum lttng_live_iterator_status
1261 next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
1262 struct lttng_live_session *session,
1263 struct lttng_live_stream_iterator **youngest_session_stream_iter)
1264 {
1265 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
1266 enum lttng_live_iterator_status stream_iter_status;
1267 uint64_t trace_idx = 0;
1268 int64_t youngest_candidate_msg_ts = INT64_MAX;
1269 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
1270
1271 BT_CLOGD("Finding the next stream iterator for session: "
1272 "session-id=%" PRIu64,
1273 session->id);
1274 /*
1275 * Make sure we are attached to the session and look for new streams
1276 * and metadata.
1277 */
1278 stream_iter_status = lttng_live_get_session(lttng_live_msg_iter, session);
1279 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
1280 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE &&
1281 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) {
1282 goto end;
1283 }
1284
1285 while (trace_idx < session->traces.size()) {
1286 bool trace_is_ended = false;
1287 struct lttng_live_stream_iterator *stream_iter;
1288 lttng_live_trace *trace = session->traces[trace_idx].get();
1289
1290 stream_iter_status =
1291 next_stream_iterator_for_trace(lttng_live_msg_iter, trace, &stream_iter);
1292 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1293 /*
1294 * All the live stream iterators for this trace are
1295 * ENDed. Remove the trace from this session.
1296 */
1297 trace_is_ended = true;
1298 } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1299 goto end;
1300 }
1301
1302 if (!trace_is_ended) {
1303 BT_ASSERT_DBG(stream_iter);
1304
1305 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1306 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1307 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1308 youngest_candidate_stream_iter = stream_iter;
1309 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1310 /*
1311 * Order the messages in an arbitrary but
1312 * deterministic way.
1313 */
1314 int ret = common_muxing_compare_messages(
1315 (*stream_iter->current_msg)->libObjPtr(),
1316 (*youngest_candidate_stream_iter->current_msg)->libObjPtr());
1317 if (ret < 0) {
1318 /*
1319 * The `youngest_candidate_stream_iter->current_msg`
1320 * should go first. Update the next iterator
1321 * and the current timestamp.
1322 */
1323 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1324 youngest_candidate_stream_iter = stream_iter;
1325 } else if (ret == 0) {
1326 /* Unable to pick which one should go first. */
1327 BT_CLOGW(
1328 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1329 "stream-iter-addr=%p"
1330 "stream-iter-addr=%p",
1331 stream_iter, youngest_candidate_stream_iter);
1332 }
1333 }
1334 trace_idx++;
1335 } else {
1336 /*
1337 * trace_idx is not incremented since
1338 * vectorFastRemove replaces the
1339 * element at trace_idx with the array's last element.
1340 */
1341 bt2_common::vectorFastRemove(session->traces, trace_idx);
1342 }
1343 }
1344 if (youngest_candidate_stream_iter) {
1345 *youngest_session_stream_iter = youngest_candidate_stream_iter;
1346 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1347 } else {
1348 /*
1349 * The only cases where we don't have a candidate for this
1350 * trace is:
1351 * 1. if we reached the end of all the iterators of all the
1352 * traces of this session,
1353 * 2. if we never had live stream iterator in the first place.
1354 *
1355 * In either cases, we return END.
1356 */
1357 BT_ASSERT(session->traces.empty());
1358 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1359 }
1360 end:
1361 return stream_iter_status;
1362 }
1363
1364 static inline void put_messages(bt_message_array_const msgs, uint64_t count)
1365 {
1366 uint64_t i;
1367
1368 for (i = 0; i < count; i++) {
1369 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
1370 }
1371 }
1372
1373 BT_HIDDEN
1374 bt_message_iterator_class_next_method_status
1375 lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array_const msgs,
1376 uint64_t capacity, uint64_t *count)
1377 {
1378 struct lttng_live_msg_iter *lttng_live_msg_iter =
1379 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_it);
1380 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
1381
1382 try {
1383 bt_message_iterator_class_next_method_status status;
1384 enum lttng_live_viewer_status viewer_status;
1385
1386 struct lttng_live_component *lttng_live = lttng_live_msg_iter->lttng_live_comp;
1387 enum lttng_live_iterator_status stream_iter_status;
1388
1389 *count = 0;
1390
1391 BT_ASSERT_DBG(lttng_live_msg_iter);
1392
1393 if (G_UNLIKELY(lttng_live_msg_iter->was_interrupted)) {
1394 /*
1395 * The iterator was interrupted in a previous call to the
1396 * `_next()` method. We currently do not support generating
1397 * messages after such event. The babeltrace2 CLI should never
1398 * be running the graph after being interrupted. So this check
1399 * is to prevent other graph users from using this live
1400 * iterator in an messed up internal state.
1401 */
1402 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1403 BT_CLOGE_APPEND_CAUSE(
1404 "Message iterator was interrupted during a previous call to the `next()` and currently does not support continuing after such event.");
1405 goto end;
1406 }
1407
1408 /*
1409 * Clear all the invalid message reference that might be left over in
1410 * the output array.
1411 */
1412 memset(msgs, 0, capacity * sizeof(*msgs));
1413
1414 /*
1415 * If no session are exposed on the relay found at the url provided by
1416 * the user, session count will be 0. In this case, we return status
1417 * end to return gracefully.
1418 */
1419 if (lttng_live_msg_iter->sessions.empty()) {
1420 if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
1421 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1422 goto end;
1423 } else {
1424 /*
1425 * The are no more active session for this session
1426 * name. Retry to create a viewer session for the
1427 * requested session name.
1428 */
1429 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1430 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1431 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1432 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1433 BT_CLOGE_APPEND_CAUSE("Error creating LTTng live viewer session");
1434 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1435 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1436 } else {
1437 bt_common_abort();
1438 }
1439 goto end;
1440 }
1441 }
1442 }
1443
1444 if (lttng_live_msg_iter->active_stream_iter == 0) {
1445 lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter);
1446 }
1447
1448 /*
1449 * Here the muxing of message is done.
1450 *
1451 * We need to iterate over all the streams of all the traces of all the
1452 * viewer sessions in order to get the message with the smallest
1453 * timestamp. In this case, a session is a viewer session and there is
1454 * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
1455 * kernel). Each viewer session can have multiple traces, for example,
1456 * 64bit UST viewer sessions could have multiple per-pid traces.
1457 *
1458 * We iterate over the streams of each traces to update and see what is
1459 * their next message's timestamp. From those timestamps, we select the
1460 * message with the smallest timestamp as the best candidate message
1461 * for that trace and do the same thing across all the sessions.
1462 *
1463 * We then compare the timestamp of best candidate message of all the
1464 * sessions to pick the message with the smallest timestamp and we
1465 * return it.
1466 */
1467 while (*count < capacity) {
1468 struct lttng_live_stream_iterator *youngest_stream_iter = NULL,
1469 *candidate_stream_iter = NULL;
1470 int64_t youngest_msg_ts_ns = INT64_MAX;
1471
1472 uint64_t session_idx = 0;
1473 while (session_idx < lttng_live_msg_iter->sessions.size()) {
1474 lttng_live_session *session = lttng_live_msg_iter->sessions[session_idx].get();
1475
1476 /* Find the best candidate message to send downstream. */
1477 stream_iter_status = next_stream_iterator_for_session(lttng_live_msg_iter, session,
1478 &candidate_stream_iter);
1479
1480 /*
1481 * If we receive an END status, it means that either:
1482 * - Those traces never had active streams (UST with no
1483 * data produced yet),
1484 * - All live stream iterators have ENDed.
1485 */
1486 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1487 if (session->closed && session->traces.empty()) {
1488 /*
1489 * Remove the session from the list.
1490 * session_idx is not modified since
1491 * g_ptr_array_remove_index_fast
1492 * replaces the the removed element with
1493 * the array's last element.
1494 */
1495 bt2_common::vectorFastRemove(lttng_live_msg_iter->sessions, session_idx);
1496 } else {
1497 session_idx++;
1498 }
1499 continue;
1500 }
1501
1502 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1503 goto return_status;
1504 }
1505
1506 if (G_UNLIKELY(youngest_stream_iter == NULL) ||
1507 candidate_stream_iter->current_msg_ts_ns < youngest_msg_ts_ns) {
1508 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1509 youngest_stream_iter = candidate_stream_iter;
1510 } else if (candidate_stream_iter->current_msg_ts_ns == youngest_msg_ts_ns) {
1511 /*
1512 * The currently selected message to be sent
1513 * downstream next has the exact same timestamp
1514 * that of the current candidate message. We
1515 * must break the tie in a predictable manner.
1516 */
1517 BT_CLOGD_STR(
1518 "Two of the next message candidates have the same timestamps, pick one deterministically.");
1519 /*
1520 * Order the messages in an arbitrary but
1521 * deterministic way.
1522 */
1523 int ret = common_muxing_compare_messages(
1524 (*candidate_stream_iter->current_msg)->libObjPtr(),
1525 (*youngest_stream_iter->current_msg)->libObjPtr());
1526 if (ret < 0) {
1527 /*
1528 * The `candidate_stream_iter->current_msg`
1529 * should go first. Update the next
1530 * iterator and the current timestamp.
1531 */
1532 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1533 youngest_stream_iter = candidate_stream_iter;
1534 } else if (ret == 0) {
1535 /* Unable to pick which one should go first. */
1536 BT_CLOGW(
1537 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1538 "next-stream-iter-addr=%p"
1539 "candidate-stream-iter-addr=%p",
1540 youngest_stream_iter, candidate_stream_iter);
1541 }
1542 }
1543
1544 session_idx++;
1545 }
1546
1547 if (!youngest_stream_iter) {
1548 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1549 goto return_status;
1550 }
1551
1552 BT_ASSERT_DBG(youngest_stream_iter->current_msg);
1553 /* Ensure monotonicity. */
1554 BT_ASSERT_DBG(lttng_live_msg_iter->last_msg_ts_ns <=
1555 youngest_stream_iter->current_msg_ts_ns);
1556
1557 /*
1558 * Insert the next message to the message batch. This will set
1559 * stream iterator current messsage to NULL so that next time
1560 * we fetch the next message of that stream iterator
1561 */
1562 msgs[*count] = youngest_stream_iter->current_msg->release().libObjPtr();
1563 youngest_stream_iter->current_msg.reset();
1564 (*count)++;
1565
1566 /* Update the last timestamp in nanoseconds sent downstream. */
1567 lttng_live_msg_iter->last_msg_ts_ns = youngest_msg_ts_ns;
1568 youngest_stream_iter->current_msg_ts_ns = INT64_MAX;
1569
1570 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1571 }
1572
1573 return_status:
1574 switch (stream_iter_status) {
1575 case LTTNG_LIVE_ITERATOR_STATUS_OK:
1576 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
1577 /*
1578 * If we gathered messages, return _OK even if the graph was
1579 * interrupted. This allows for the components downstream to at
1580 * least get the thoses messages. If the graph was indeed
1581 * interrupted there should not be another _next() call as the
1582 * application will tear down the graph. This component class
1583 * doesn't support restarting after an interruption.
1584 */
1585 if (*count > 0) {
1586 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
1587 } else {
1588 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1589 }
1590 break;
1591 case LTTNG_LIVE_ITERATOR_STATUS_END:
1592 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1593 break;
1594 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
1595 BT_CLOGE_APPEND_CAUSE("Memory error preparing the next batch of messages: "
1596 "live-iter-status=%s",
1597 lttng_live_iterator_status_string(stream_iter_status));
1598 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1599 break;
1600 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
1601 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
1602 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
1603 BT_CLOGE_APPEND_CAUSE("Error preparing the next batch of messages: "
1604 "live-iter-status=%s",
1605 lttng_live_iterator_status_string(stream_iter_status));
1606
1607 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1608 /* Put all existing messages on error. */
1609 put_messages(msgs, *count);
1610 break;
1611 default:
1612 bt_common_abort();
1613 }
1614
1615 end:
1616 return status;
1617 } catch (const std::bad_alloc&) {
1618 return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1619 } catch (const bt2_common::Error&) {
1620 BT_CLOGE_APPEND_CAUSE("Failed to fetch next messages");
1621 return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1622 }
1623 }
1624
1625 static lttng_live_msg_iter::UP
1626 lttng_live_msg_iter_create(struct lttng_live_component *lttng_live_comp,
1627 bt_self_message_iterator *self_msg_it)
1628 {
1629 lttng_live_msg_iter::UP lttng_live_msg_iter =
1630 bt2_common::makeUnique<struct lttng_live_msg_iter>(lttng_live_comp->logCfg);
1631 lttng_live_msg_iter->self_comp = lttng_live_comp->self_comp;
1632 lttng_live_msg_iter->lttng_live_comp = lttng_live_comp;
1633 lttng_live_msg_iter->self_msg_iter = self_msg_it;
1634
1635 lttng_live_msg_iter->active_stream_iter = 0;
1636 lttng_live_msg_iter->last_msg_ts_ns = INT64_MIN;
1637 lttng_live_msg_iter->was_interrupted = false;
1638
1639 return lttng_live_msg_iter;
1640 }
1641
1642 BT_HIDDEN
1643 bt_message_iterator_class_initialize_method_status
1644 lttng_live_msg_iter_init(bt_self_message_iterator *self_msg_it,
1645 bt_self_message_iterator_configuration *config,
1646 bt_self_component_port_output *self_port)
1647 {
1648 bt_self_component *self_comp = bt_self_message_iterator_borrow_component(self_msg_it);
1649 lttng_live_component *lttng_live =
1650 (lttng_live_component *) bt_self_component_get_data(self_comp);
1651 const bt2_common::LogCfg& logCfg = lttng_live->logCfg;
1652
1653 try {
1654 enum lttng_live_viewer_status viewer_status;
1655
1656 /* There can be only one downstream iterator at the same time. */
1657 BT_ASSERT(!lttng_live->has_msg_iter);
1658 lttng_live->has_msg_iter = true;
1659
1660 lttng_live_msg_iter::UP lttng_live_msg_iter =
1661 lttng_live_msg_iter_create(lttng_live, self_msg_it);
1662 if (!lttng_live_msg_iter) {
1663 BT_CLOGE_APPEND_CAUSE("Failed to create lttng_live_msg_iter");
1664 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1665 }
1666
1667 viewer_status = live_viewer_connection_create(lttng_live->params.url.c_str(), false,
1668 lttng_live_msg_iter.get(), logCfg,
1669 lttng_live_msg_iter->viewer_connection);
1670 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1671 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1672 BT_CLOGE_APPEND_CAUSE("Failed to create viewer connection");
1673 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1674 /*
1675 * Interruption in the _iter_init() method is not
1676 * supported. Return an error.
1677 */
1678 BT_CLOGE_APPEND_CAUSE("Interrupted while creating viewer connection");
1679 }
1680
1681 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1682 }
1683
1684 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter.get());
1685 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1686 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1687 BT_CLOGE_APPEND_CAUSE("Failed to create viewer session");
1688 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1689 /*
1690 * Interruption in the _iter_init() method is not
1691 * supported. Return an error.
1692 */
1693 BT_CLOGE_APPEND_CAUSE("Interrupted when creating viewer session");
1694 }
1695
1696 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1697 }
1698
1699 if (lttng_live_msg_iter->sessions.empty()) {
1700 switch (lttng_live->params.sess_not_found_act) {
1701 case SESSION_NOT_FOUND_ACTION_CONTINUE:
1702 BT_CLOGI(
1703 "Unable to connect to the requested live viewer session. Keep trying to connect because of "
1704 "%s=\"%s\" component parameter: url=\"%s\"",
1705 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR,
1706 lttng_live->params.url.c_str());
1707 break;
1708 case SESSION_NOT_FOUND_ACTION_FAIL:
1709 BT_CLOGE_APPEND_CAUSE(
1710 "Unable to connect to the requested live viewer session. Fail the message iterator initialization because of %s=\"%s\" "
1711 "component parameter: url =\"%s\"",
1712 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_FAIL_STR,
1713 lttng_live->params.url.c_str());
1714 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1715 case SESSION_NOT_FOUND_ACTION_END:
1716 BT_CLOGI(
1717 "Unable to connect to the requested live viewer session. End gracefully at the first _next() "
1718 "call because of %s=\"%s\" component parameter: "
1719 "url=\"%s\"",
1720 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_END_STR,
1721 lttng_live->params.url.c_str());
1722 break;
1723 default:
1724 bt_common_abort();
1725 }
1726 }
1727
1728 bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter.release());
1729 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
1730 } catch (const std::bad_alloc&) {
1731 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1732 } catch (const bt2_common::Error&) {
1733 BT_CLOGE_APPEND_CAUSE("Failed to initialize iterator");
1734 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1735 }
1736 }
1737
1738 static struct bt_param_validation_map_value_entry_descr list_sessions_params[] = {
1739 {URL_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
1740 bt_param_validation_value_descr::makeString()},
1741 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
1742
1743 static bt_component_class_query_method_status
1744 lttng_live_query_list_sessions(const bt_value *params, const bt_value **result,
1745 const bt2_common::LogCfg& logCfg)
1746 {
1747 bt_component_class_query_method_status status;
1748 const bt_value *url_value = NULL;
1749 const char *url;
1750 live_viewer_connection::UP viewer_connection;
1751 enum lttng_live_viewer_status viewer_status;
1752 enum bt_param_validation_status validation_status;
1753 gchar *validate_error = NULL;
1754
1755 validation_status = bt_param_validation_validate(params, list_sessions_params, &validate_error);
1756 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
1757 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1758 goto error;
1759 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
1760 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1761 BT_CLOGE_APPEND_CAUSE("%s", validate_error);
1762 goto error;
1763 }
1764
1765 url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
1766 url = bt_value_string_get(url_value);
1767
1768 viewer_status = live_viewer_connection_create(url, true, NULL, logCfg, viewer_connection);
1769 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1770 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1771 BT_CLOGE_APPEND_CAUSE("Failed to create viewer connection");
1772 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1773 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1774 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
1775 } else {
1776 bt_common_abort();
1777 }
1778 goto error;
1779 }
1780
1781 status = live_viewer_connection_list_sessions(viewer_connection.get(), result);
1782 if (status != BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK) {
1783 BT_CLOGE_APPEND_CAUSE("Failed to list viewer sessions");
1784 goto error;
1785 }
1786
1787 goto end;
1788
1789 error:
1790 BT_VALUE_PUT_REF_AND_RESET(*result);
1791
1792 if (status >= 0) {
1793 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1794 }
1795
1796 end:
1797 g_free(validate_error);
1798
1799 return status;
1800 }
1801
1802 static bt_component_class_query_method_status
1803 lttng_live_query_support_info(const bt_value *params, const bt_value **result,
1804 const bt2_common::LogCfg& logCfg)
1805 {
1806 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
1807 const bt_value *input_type_value;
1808 const bt_value *input_value;
1809 double weight = 0;
1810 struct bt_common_lttng_live_url_parts parts = {0};
1811 bt_common_lttng_live_url_parts_deleter partsDeleter {parts};
1812
1813 /* Used by the logging macros */
1814 __attribute__((unused)) bt_self_component *self_comp = NULL;
1815
1816 *result = NULL;
1817 input_type_value = bt_value_map_borrow_entry_value_const(params, "type");
1818 if (!input_type_value) {
1819 BT_CLOGE_APPEND_CAUSE("Missing expected `type` parameter.");
1820 goto error;
1821 }
1822
1823 if (!bt_value_is_string(input_type_value)) {
1824 BT_CLOGE_APPEND_CAUSE("`type` parameter is not a string value.");
1825 goto error;
1826 }
1827
1828 if (strcmp(bt_value_string_get(input_type_value), "string") != 0) {
1829 /* We don't handle file system paths */
1830 goto create_result;
1831 }
1832
1833 input_value = bt_value_map_borrow_entry_value_const(params, "input");
1834 if (!input_value) {
1835 BT_CLOGE_APPEND_CAUSE("Missing expected `input` parameter.");
1836 goto error;
1837 }
1838
1839 if (!bt_value_is_string(input_value)) {
1840 BT_CLOGE_APPEND_CAUSE("`input` parameter is not a string value.");
1841 goto error;
1842 }
1843
1844 parts = bt_common_parse_lttng_live_url(bt_value_string_get(input_value), NULL, 0);
1845 if (parts.session_name) {
1846 /*
1847 * Looks pretty much like an LTTng live URL: we got the
1848 * session name part, which forms a complete URL.
1849 */
1850 weight = .75;
1851 }
1852
1853 create_result:
1854 *result = bt_value_real_create_init(weight);
1855 if (!*result) {
1856 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1857 goto error;
1858 }
1859
1860 goto end;
1861
1862 error:
1863 if (status >= 0) {
1864 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1865 }
1866
1867 BT_ASSERT(!*result);
1868
1869 end:
1870 return status;
1871 }
1872
1873 BT_HIDDEN
1874 bt_component_class_query_method_status lttng_live_query(bt_self_component_class_source *comp_class,
1875 bt_private_query_executor *priv_query_exec,
1876 const char *object, const bt_value *params,
1877 __attribute__((unused)) void *method_data,
1878 const bt_value **result)
1879 {
1880 bt_self_component_class *self_comp_class =
1881 bt_self_component_class_source_as_self_component_class(comp_class);
1882 bt_logging_level log_level = bt_query_executor_get_logging_level(
1883 bt_private_query_executor_as_query_executor_const(priv_query_exec));
1884 bt2_common::LogCfg logCfg(log_level, *self_comp_class);
1885
1886 try {
1887 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
1888
1889 if (strcmp(object, "sessions") == 0) {
1890 status = lttng_live_query_list_sessions(params, result, logCfg);
1891 } else if (strcmp(object, "babeltrace.support-info") == 0) {
1892 status = lttng_live_query_support_info(params, result, logCfg);
1893 } else {
1894 BT_CLOGI("Unknown query object `%s`", object);
1895 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT;
1896 goto end;
1897 }
1898
1899 end:
1900 return status;
1901 } catch (const std::bad_alloc&) {
1902 return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1903 } catch (const bt2_common::Error&) {
1904 BT_CLOGE_APPEND_CAUSE("Failed to execute query");
1905 return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1906 }
1907 }
1908
1909 BT_HIDDEN
1910 void lttng_live_component_finalize(bt_self_component_source *component)
1911 {
1912 lttng_live_component::UP {(lttng_live_component *) bt_self_component_get_data(
1913 bt_self_component_source_as_self_component(component))};
1914 }
1915
1916 static enum session_not_found_action
1917 parse_session_not_found_action_param(const bt_value *no_session_param)
1918 {
1919 enum session_not_found_action action;
1920 const char *no_session_act_str = bt_value_string_get(no_session_param);
1921
1922 if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) {
1923 action = SESSION_NOT_FOUND_ACTION_CONTINUE;
1924 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) {
1925 action = SESSION_NOT_FOUND_ACTION_FAIL;
1926 } else {
1927 BT_ASSERT(strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0);
1928 action = SESSION_NOT_FOUND_ACTION_END;
1929 }
1930
1931 return action;
1932 }
1933
1934 static bt_param_validation_value_descr inputs_elem_descr =
1935 bt_param_validation_value_descr::makeString();
1936
1937 static const char *sess_not_found_action_choices[] = {
1938 SESS_NOT_FOUND_ACTION_CONTINUE_STR,
1939 SESS_NOT_FOUND_ACTION_FAIL_STR,
1940 SESS_NOT_FOUND_ACTION_END_STR,
1941 };
1942
1943 static struct bt_param_validation_map_value_entry_descr params_descr[] = {
1944 {INPUTS_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
1945 bt_param_validation_value_descr::makeArray(1, 1, inputs_elem_descr)},
1946 {SESS_NOT_FOUND_ACTION_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
1947 bt_param_validation_value_descr::makeString(sess_not_found_action_choices)},
1948 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
1949
1950 static bt_component_class_initialize_method_status
1951 lttng_live_component_create(const bt_value *params, bt_self_component *self_comp,
1952 const bt2_common::LogCfg& logCfg, lttng_live_component::UP& component)
1953 {
1954 const bt_value *inputs_value;
1955 const bt_value *url_value;
1956 const bt_value *value;
1957 const char *url;
1958 enum bt_param_validation_status validation_status;
1959 gchar *validation_error = NULL;
1960
1961 validation_status = bt_param_validation_validate(params, params_descr, &validation_error);
1962 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
1963 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1964 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
1965 bt2_common::GCharUP errorFreer {validation_error};
1966 BT_CLOGE_APPEND_CAUSE("%s", validation_error);
1967 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1968 }
1969
1970 lttng_live_component::UP lttng_live = bt2_common::makeUnique<lttng_live_component>(logCfg);
1971 lttng_live->self_comp = self_comp;
1972 lttng_live->max_query_size = MAX_QUERY_SIZE;
1973 lttng_live->has_msg_iter = false;
1974
1975 inputs_value = bt_value_map_borrow_entry_value_const(params, INPUTS_PARAM);
1976 url_value = bt_value_array_borrow_element_by_index_const(inputs_value, 0);
1977 url = bt_value_string_get(url_value);
1978
1979 lttng_live->params.url = url;
1980
1981 value = bt_value_map_borrow_entry_value_const(params, SESS_NOT_FOUND_ACTION_PARAM);
1982 if (value) {
1983 lttng_live->params.sess_not_found_act = parse_session_not_found_action_param(value);
1984 } else {
1985 BT_CLOGI("Optional `%s` parameter is missing: "
1986 "defaulting to `%s`.",
1987 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR);
1988 lttng_live->params.sess_not_found_act = SESSION_NOT_FOUND_ACTION_CONTINUE;
1989 }
1990
1991 component = std::move(lttng_live);
1992 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
1993 }
1994
1995 BT_HIDDEN
1996 bt_component_class_initialize_method_status
1997 lttng_live_component_init(bt_self_component_source *self_comp_src,
1998 bt_self_component_source_configuration *config, const bt_value *params,
1999 __attribute__((unused)) void *init_method_data)
2000 {
2001 bt_self_component *self_comp = bt_self_component_source_as_self_component(self_comp_src);
2002 bt_logging_level log_level =
2003 bt_component_get_logging_level(bt_self_component_as_component(self_comp));
2004 bt2_common::LogCfg logCfg(log_level, *self_comp);
2005
2006 try {
2007 lttng_live_component::UP lttng_live;
2008 bt_component_class_initialize_method_status ret;
2009 bt_self_component_add_port_status add_port_status;
2010
2011 ret = lttng_live_component_create(params, self_comp, logCfg, lttng_live);
2012 if (ret != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
2013 return ret;
2014 }
2015
2016 add_port_status =
2017 bt_self_component_source_add_output_port(self_comp_src, "out", NULL, NULL);
2018 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
2019 ret = (bt_component_class_initialize_method_status) add_port_status;
2020 return ret;
2021 }
2022
2023 bt_self_component_set_data(self_comp, lttng_live.release());
2024
2025 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
2026 } catch (const std::bad_alloc&) {
2027 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2028 } catch (const bt2_common::Error&) {
2029 BT_CLOGE_APPEND_CAUSE("Failed to initialize component");
2030 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
2031 }
2032 }
This page took 0.07717 seconds and 5 git commands to generate.