src.ctf.lttng-live: introduce lttng_live_session::UP and use it
[babeltrace.git] / src / plugins / ctf / lttng-live / lttng-live.cpp
CommitLineData
f3bc2010 1/*
0235b0db 2 * SPDX-License-Identifier: MIT
f3bc2010 3 *
14f28187 4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
f3bc2010 5 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7cdc2bab 6 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
f3bc2010 7 *
0235b0db 8 * Babeltrace CTF LTTng-live Client Component
f3bc2010
JG
9 */
10
c802cacb 11#include <glib.h>
14f28187
FD
12#include <unistd.h>
13
578e048b 14#include "common/assert.h"
0f5c5d5c 15#include "cpp-common/bt2c/fmt.hpp"
6838d8aa 16#include "cpp-common/bt2c/glib-up.hpp"
db00b877 17#include "cpp-common/bt2c/vector.hpp"
ef9e5f5d 18#include "cpp-common/bt2s/make-unique.hpp"
0f5c5d5c 19#include "cpp-common/vendor/fmt/format.h"
f3bc2010 20
376fc2bd 21#include "plugins/common/muxing/muxing.h"
80aff5ef 22#include "plugins/common/param-validation/param-validation.h"
376fc2bd 23
087cd0f5 24#include "data-stream.hpp"
087cd0f5 25#include "lttng-live.hpp"
c802cacb 26#include "metadata.hpp"
7cdc2bab 27
4164020e
SM
28#define MAX_QUERY_SIZE (256 * 1024)
29#define URL_PARAM "url"
30#define INPUTS_PARAM "inputs"
31#define SESS_NOT_FOUND_ACTION_PARAM "session-not-found-action"
32#define SESS_NOT_FOUND_ACTION_CONTINUE_STR "continue"
33#define SESS_NOT_FOUND_ACTION_FAIL_STR "fail"
34#define SESS_NOT_FOUND_ACTION_END_STR "end"
7cdc2bab 35
34533ae0 36void lttng_live_stream_iterator_set_state(struct lttng_live_stream_iterator *stream_iter,
4164020e 37 enum lttng_live_stream_state new_state)
34533ae0 38{
0f5c5d5c
SM
39 BT_CPPLOGD_SPEC(stream_iter->logger,
40 "Setting live stream iterator state: viewer-stream-id={}, "
41 "old-state={}, new-state={}",
42 stream_iter->viewer_stream_id, stream_iter->state, new_state);
34533ae0 43
4164020e 44 stream_iter->state = new_state;
34533ae0
FD
45}
46
4164020e
SM
47#define LTTNG_LIVE_LOGD_STREAM_ITER(live_stream_iter) \
48 do { \
0f5c5d5c
SM
49 BT_CPPLOGD_SPEC((live_stream_iter)->logger, \
50 "Live stream iterator state={}, " \
51 "last-inact-ts-is-set={}, last-inact-ts-value={}, " \
52 "curr-inact-ts={}", \
53 (live_stream_iter)->state, (live_stream_iter)->last_inactivity_ts.is_set, \
54 (live_stream_iter)->last_inactivity_ts.value, \
55 (live_stream_iter)->current_inactivity_ts); \
4164020e 56 } while (0);
6f79a7cf 57
9b4f9b42 58bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter)
6f79a7cf 59{
4164020e 60 bool ret;
6f79a7cf 61
4164020e
SM
62 if (!msg_iter) {
63 ret = false;
64 goto end;
65 }
7cdc2bab 66
4164020e 67 ret = bt_self_message_iterator_is_interrupted(msg_iter->self_msg_iter);
4bf0e537 68
14f28187 69end:
4164020e 70 return ret;
7cdc2bab
MD
71}
72
4164020e
SM
73static struct lttng_live_trace *
74lttng_live_session_borrow_trace_by_id(struct lttng_live_session *session, uint64_t trace_id)
d3e4dcd8 75{
6dcda123 76 for (lttng_live_trace::UP& trace : session->traces) {
4164020e 77 if (trace->id == trace_id) {
6dcda123 78 return trace.get();
4164020e
SM
79 }
80 }
14f28187 81
6dcda123 82 return nullptr;
7cdc2bab
MD
83}
84
4164020e
SM
85static struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
86 uint64_t trace_id)
7cdc2bab 87{
0f5c5d5c
SM
88 BT_CPPLOGD_SPEC(session->logger, "Creating live trace: session-id={}, trace-id={}", session->id,
89 trace_id);
4164020e 90
deb86334
SM
91 auto trace = bt2s::make_unique<lttng_live_trace>(session->logger);
92
4164020e
SM
93 trace->session = session;
94 trace->id = trace_id;
4164020e 95 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
4164020e 96
deb86334 97 const auto ret = trace.get();
6dcda123 98 session->traces.emplace_back(std::move(trace));
deb86334 99 return ret;
7cdc2bab
MD
100}
101
4164020e
SM
102struct lttng_live_trace *
103lttng_live_session_borrow_or_create_trace_by_id(struct lttng_live_session *session,
104 uint64_t trace_id)
7cdc2bab 105{
4164020e 106 struct lttng_live_trace *trace;
7cdc2bab 107
4164020e
SM
108 trace = lttng_live_session_borrow_trace_by_id(session, trace_id);
109 if (trace) {
110 goto end;
111 }
7cdc2bab 112
4164020e
SM
113 /* The session is the owner of the newly created trace. */
114 trace = lttng_live_create_trace(session, trace_id);
7cdc2bab 115
14f28187 116end:
4164020e 117 return trace;
7cdc2bab
MD
118}
119
4164020e
SM
120int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint64_t session_id,
121 const char *hostname, const char *session_name)
7cdc2bab 122{
0f5c5d5c
SM
123 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
124 "Adding live session: "
125 "session-id={}, hostname=\"{}\", session-name=\"{}\"",
126 session_id, hostname, session_name);
4164020e 127
459dcd39
SM
128 auto session = bt2s::make_unique<lttng_live_session>(lttng_live_msg_iter->logger);
129
4164020e
SM
130 session->self_comp = lttng_live_msg_iter->self_comp;
131 session->id = session_id;
4164020e
SM
132 session->lttng_live_msg_iter = lttng_live_msg_iter;
133 session->new_streams_needed = true;
220476b8
SM
134 session->hostname = hostname;
135 session->session_name = session_name;
4164020e 136
459dcd39 137 g_ptr_array_add(lttng_live_msg_iter->sessions, session.release());
afb0f12b
SM
138
139 return 0;
7cdc2bab
MD
140}
141
c28da405 142lttng_live_session::~lttng_live_session()
7cdc2bab 143{
c28da405
SM
144 BT_CPPLOGD_SPEC(this->logger, "Destroying live session: session-id={}, session-name=\"{}\"",
145 this->id, this->session_name);
4164020e 146
c28da405
SM
147 if (this->id != -1ULL) {
148 if (lttng_live_session_detach(this)) {
149 if (!lttng_live_graph_is_canceled(this->lttng_live_msg_iter)) {
4164020e 150 /* Old relayd cannot detach sessions. */
c28da405 151 BT_CPPLOGD_SPEC(this->logger, "Unable to detach lttng live session {}", this->id);
4164020e
SM
152 }
153 }
c28da405
SM
154
155 this->id = -1ULL;
4164020e 156 }
c28da405 157}
4164020e 158
c28da405
SM
159static void lttng_live_destroy_session(struct lttng_live_session *session)
160{
afb0f12b 161 delete session;
7cdc2bab
MD
162}
163
4164020e 164static void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 165{
4164020e
SM
166 if (!lttng_live_msg_iter) {
167 goto end;
168 }
7cdc2bab 169
4164020e
SM
170 if (lttng_live_msg_iter->sessions) {
171 g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
172 }
14f28187 173
4164020e
SM
174 if (lttng_live_msg_iter->viewer_connection) {
175 live_viewer_connection_destroy(lttng_live_msg_iter->viewer_connection);
176 }
177 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
178 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
14f28187 179
4164020e
SM
180 /* All stream iterators must be destroyed at this point. */
181 BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0);
182 lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false;
14f28187 183
afb0f12b 184 delete lttng_live_msg_iter;
14f28187
FD
185
186end:
4164020e 187 return;
14f28187
FD
188}
189
14f28187
FD
190void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
191{
4164020e 192 struct lttng_live_msg_iter *lttng_live_msg_iter;
14f28187 193
4164020e 194 BT_ASSERT(self_msg_iter);
14f28187 195
4164020e
SM
196 lttng_live_msg_iter =
197 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_iter);
198 BT_ASSERT(lttng_live_msg_iter);
199 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
7cdc2bab
MD
200}
201
4164020e
SM
202static enum lttng_live_iterator_status
203lttng_live_iterator_next_check_stream_state(struct lttng_live_stream_iterator *lttng_live_stream)
7cdc2bab 204{
4164020e
SM
205 switch (lttng_live_stream->state) {
206 case LTTNG_LIVE_STREAM_QUIESCENT:
207 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
208 break;
209 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
210 /* Invalid state. */
0f5c5d5c 211 BT_CPPLOGF_SPEC(lttng_live_stream->logger, "Unexpected stream state \"ACTIVE_NO_DATA\"");
4164020e
SM
212 bt_common_abort();
213 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
214 /* Invalid state. */
0f5c5d5c 215 BT_CPPLOGF_SPEC(lttng_live_stream->logger, "Unexpected stream state \"QUIESCENT_NO_DATA\"");
4164020e
SM
216 bt_common_abort();
217 case LTTNG_LIVE_STREAM_EOF:
218 break;
219 }
220 return LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
221}
222
223/*
f93afbf9
FD
224 * For active no data stream, fetch next index. As a result of that it can
225 * become either:
226 * - quiescent: won't have events for a bit,
227 * - have data: need to get that data and produce the event,
228 * - have no data on this stream at this point: need to retry (AGAIN) or return
229 * EOF.
7cdc2bab 230 */
4164020e
SM
231static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
232 struct lttng_live_msg_iter *lttng_live_msg_iter,
233 struct lttng_live_stream_iterator *lttng_live_stream)
7cdc2bab 234{
4164020e
SM
235 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
236 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
237 struct packet_index index;
238
239 if (lttng_live_stream->trace->metadata_stream_state ==
240 LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
e28ca558
SM
241 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
242 "Need to get an update for the metadata stream before proceeding "
243 "further with this stream: stream-name=\"{}\"",
244 lttng_live_stream->name);
4164020e
SM
245 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
246 goto end;
247 }
248
249 if (lttng_live_stream->trace->session->new_streams_needed) {
e28ca558
SM
250 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
251 "Need to get an update of all streams before proceeding further "
252 "with this stream: stream-name=\"{}\"",
253 lttng_live_stream->name);
4164020e
SM
254 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
255 goto end;
256 }
257
258 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
259 lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
260 goto end;
261 }
262 ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream, &index);
263 if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
264 goto end;
265 }
266
267 BT_ASSERT_DBG(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
268
269 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
270 uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts.value,
271 curr_inact_ts = lttng_live_stream->current_inactivity_ts;
272
273 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA && last_inact_ts == curr_inact_ts) {
274 /*
5cb27175
JG
275 * Because the stream is in the QUIESCENT_NO_DATA
276 * state, we can assert that the last_inactivity_ts was
277 * set and can be safely used in the `if` above.
278 */
4164020e
SM
279 BT_ASSERT(lttng_live_stream->last_inactivity_ts.is_set);
280
281 ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
282 LTTNG_LIVE_LOGD_STREAM_ITER(lttng_live_stream);
283 } else {
284 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
285 }
286 goto end;
287 }
288
289 lttng_live_stream->base_offset = index.offset;
290 lttng_live_stream->offset = index.offset;
291 lttng_live_stream->len = index.packet_size / CHAR_BIT;
292
0f5c5d5c
SM
293 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
294 "Setting live stream reading info: stream-name=\"{}\", "
295 "viewer-stream-id={}, stream-base-offset={}, stream-offset={}, stream-len={}",
e28ca558 296 lttng_live_stream->name, lttng_live_stream->viewer_stream_id,
0f5c5d5c
SM
297 lttng_live_stream->base_offset, lttng_live_stream->offset,
298 lttng_live_stream->len);
f93afbf9 299
7cdc2bab 300end:
4164020e
SM
301 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
302 ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
303 }
304 return ret;
7cdc2bab
MD
305}
306
307/*
14f28187
FD
308 * Creation of the message requires the ctf trace class to be created
309 * beforehand, but the live protocol gives us all streams (including metadata)
310 * at once. So we split it in three steps: getting streams, getting metadata
311 * (which creates the ctf trace class), and then creating the per-stream
312 * messages.
7cdc2bab 313 */
4164020e
SM
314static enum lttng_live_iterator_status
315lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
316 struct lttng_live_session *session)
7cdc2bab 317{
4164020e 318 enum lttng_live_iterator_status status;
4164020e
SM
319
320 if (!session->attached) {
0f5c5d5c
SM
321 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Attach to session: session-id={}",
322 session->id);
4164020e
SM
323 enum lttng_live_viewer_status attach_status =
324 lttng_live_session_attach(session, lttng_live_msg_iter->self_msg_iter);
325 if (attach_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
326 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
327 /*
328 * Clear any causes appended in
329 * `lttng_live_attach_session()` as we want to
330 * return gracefully since the graph was
331 * cancelled.
332 */
333 bt_current_thread_clear_error();
334 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
335 } else {
336 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
0f5c5d5c
SM
337 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
338 "Error attaching to LTTng live session");
4164020e
SM
339 }
340 goto end;
341 }
342 }
343
0f5c5d5c 344 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
220476b8
SM
345 "Updating all data streams: session-id={}, session-name=\"{}\"", session->id,
346 session->session_name);
4164020e
SM
347
348 status = lttng_live_session_get_new_streams(session, lttng_live_msg_iter->self_msg_iter);
a4f118a3
FD
349 switch (status) {
350 case LTTNG_LIVE_ITERATOR_STATUS_OK:
351 break;
352 case LTTNG_LIVE_ITERATOR_STATUS_END:
353 /*
f8a11f24
SM
354 * We received a `_END` from the `_get_new_streams()` function,
355 * which means no more data will ever be received from the data
356 * streams of this session. But it's possible that the metadata
357 * is incomplete.
358 * The live protocol guarantees that we receive all the
359 * metadata needed before we receive data streams needing it.
360 * But it's possible to receive metadata NOT needed by
361 * data streams after the session was closed. For example, this
362 * could happen if a new event is registered and the session is
363 * stopped before any tracepoint for that event is actually
364 * fired.
365 */
0f5c5d5c
SM
366 BT_CPPLOGD_SPEC(
367 lttng_live_msg_iter->logger,
a4f118a3 368 "Updating streams returned _END status. Override status to _OK in order fetch any remaining metadata:"
0f5c5d5c 369 "session-id={}, session-name=\"{}\"",
220476b8 370 session->id, session->session_name);
a4f118a3
FD
371 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
372 break;
373 default:
4164020e
SM
374 goto end;
375 }
376
0f5c5d5c 377 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
220476b8
SM
378 "Updating metadata stream for session: session-id={}, session-name=\"{}\"",
379 session->id, session->session_name);
a4f118a3 380
6dcda123
SM
381 for (lttng_live_trace::UP& trace : session->traces) {
382 status = lttng_live_metadata_update(trace.get());
4164020e
SM
383 switch (status) {
384 case LTTNG_LIVE_ITERATOR_STATUS_END:
385 case LTTNG_LIVE_ITERATOR_STATUS_OK:
4164020e
SM
386 break;
387 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
388 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
389 goto end;
390 default:
0f5c5d5c
SM
391 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
392 "Error updating trace metadata: "
393 "stream-iter-status={}, trace-id={}",
394 status, trace->id);
4164020e
SM
395 goto end;
396 }
397 }
398
399 /*
400 * Now that we have the metadata we can initialize the downstream
401 * iterator.
402 */
403 status = lttng_live_lazy_msg_init(session, lttng_live_msg_iter->self_msg_iter);
14f28187
FD
404
405end:
4164020e 406 return status;
7cdc2bab
MD
407}
408
4164020e
SM
409static void
410lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 411{
6dcda123 412 uint64_t session_idx;
4164020e
SM
413
414 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
415 struct lttng_live_session *session =
416 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
0f5c5d5c
SM
417 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
418 "Force marking session as needing new streams: "
419 "session-id={}",
420 session->id);
4164020e 421 session->new_streams_needed = true;
6dcda123 422 for (lttng_live_trace::UP& trace : session->traces) {
0f5c5d5c
SM
423 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
424 "Force marking trace metadata state as needing an update: "
425 "session-id={}, trace-id={}",
426 session->id, trace->id);
4164020e
SM
427
428 BT_ASSERT(trace->metadata_stream_state != LTTNG_LIVE_METADATA_STREAM_STATE_CLOSED);
429
430 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
431 }
432 }
7cdc2bab
MD
433}
434
4164020e
SM
435static enum lttng_live_iterator_status
436lttng_live_iterator_handle_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 437{
4164020e
SM
438 enum lttng_live_iterator_status status;
439 enum lttng_live_viewer_status viewer_status;
4164020e
SM
440 uint64_t session_idx = 0, nr_sessions_opened = 0;
441 struct lttng_live_session *session;
442 enum session_not_found_action sess_not_found_act =
443 lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
444
0f5c5d5c
SM
445 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
446 "Update data and metadata of all sessions: "
447 "live-msg-iter-addr={}",
448 fmt::ptr(lttng_live_msg_iter));
4164020e
SM
449 /*
450 * In a remotely distant future, we could add a "new
451 * session" flag to the protocol, which would tell us that we
452 * need to query for new sessions even though we have sessions
453 * currently ongoing.
454 */
455 if (lttng_live_msg_iter->sessions->len == 0) {
456 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
0f5c5d5c
SM
457 BT_CPPLOGD_SPEC(
458 lttng_live_msg_iter->logger,
4164020e
SM
459 "No session found. Exiting in accordance with the `session-not-found-action` parameter");
460 status = LTTNG_LIVE_ITERATOR_STATUS_END;
461 goto end;
462 } else {
0f5c5d5c
SM
463 BT_CPPLOGD_SPEC(
464 lttng_live_msg_iter->logger,
4164020e
SM
465 "No session found. Try creating a new one in accordance with the `session-not-found-action` parameter");
466 /*
467 * Retry to create a viewer session for the requested
468 * session name.
469 */
470 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
471 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
472 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
473 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
0f5c5d5c
SM
474 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
475 "Error creating LTTng live viewer session");
4164020e
SM
476 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
477 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
478 } else {
479 bt_common_abort();
480 }
481 goto end;
482 }
483 }
484 }
485
486 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
487 session =
488 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
489 status = lttng_live_get_session(lttng_live_msg_iter, session);
490 switch (status) {
491 case LTTNG_LIVE_ITERATOR_STATUS_OK:
4164020e 492 case LTTNG_LIVE_ITERATOR_STATUS_END:
a4f118a3
FD
493 /*
494 * A session returned `_END`. Other sessions may still
495 * be active so we override the status and continue
496 * looping if needed.
497 */
4164020e
SM
498 break;
499 default:
500 goto end;
501 }
502 if (!session->closed) {
503 nr_sessions_opened++;
504 }
505 }
506
507 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE && nr_sessions_opened == 0) {
508 status = LTTNG_LIVE_ITERATOR_STATUS_END;
509 } else {
510 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
511 }
f79c2d7a
FD
512
513end:
4164020e 514 return status;
7cdc2bab
MD
515}
516
4164020e
SM
517static enum lttng_live_iterator_status
518emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
35e432d7
SM
519 struct lttng_live_stream_iterator *stream_iter,
520 bt2::ConstMessage::Shared& message, uint64_t timestamp)
7cdc2bab 521{
4164020e 522 BT_ASSERT(stream_iter->trace->clock_class);
0f5c5d5c
SM
523 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
524 "Emitting inactivity message for stream: ctf-stream-id={}, "
525 "viewer-stream-id={}, timestamp={}",
526 stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id,
527 timestamp);
4164020e 528
35e432d7
SM
529 const auto msg = bt_message_message_iterator_inactivity_create(
530 lttng_live_msg_iter->self_msg_iter, stream_iter->trace->clock_class, timestamp);
531
4164020e 532 if (!msg) {
0f5c5d5c
SM
533 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
534 "Error emitting message iterator inactivity message");
35e432d7 535 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
4164020e
SM
536 }
537
35e432d7
SM
538 message = bt2::ConstMessage::Shared::createWithoutRef(msg);
539 return LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
540}
541
4164020e
SM
542static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
543 struct lttng_live_msg_iter *lttng_live_msg_iter,
35e432d7 544 struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message)
7cdc2bab 545{
4164020e
SM
546 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
547
548 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
549 return LTTNG_LIVE_ITERATOR_STATUS_OK;
550 }
551
552 /*
553 * Check if we already sent an inactivty message downstream for this
554 * `current_inactivity_ts` value.
555 */
556 if (lttng_live_stream->last_inactivity_ts.is_set &&
557 lttng_live_stream->current_inactivity_ts == lttng_live_stream->last_inactivity_ts.value) {
558 lttng_live_stream_iterator_set_state(lttng_live_stream,
559 LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA);
560
561 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
562 goto end;
563 }
564
565 ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream, message,
566 lttng_live_stream->current_inactivity_ts);
567
568 lttng_live_stream->last_inactivity_ts.value = lttng_live_stream->current_inactivity_ts;
569 lttng_live_stream->last_inactivity_ts.is_set = true;
14f28187 570end:
4164020e 571 return ret;
14f28187
FD
572}
573
7d91f1ac 574static int live_get_msg_ts_ns(struct lttng_live_msg_iter *lttng_live_msg_iter,
4164020e 575 const bt_message *msg, int64_t last_msg_ts_ns, int64_t *ts_ns)
14f28187 576{
4164020e
SM
577 const bt_clock_snapshot *clock_snapshot = NULL;
578 int ret = 0;
4164020e
SM
579
580 BT_ASSERT_DBG(msg);
581 BT_ASSERT_DBG(ts_ns);
582
0f5c5d5c
SM
583 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
584 "Getting message's timestamp: iter-data-addr={}, msg-addr={}, "
585 "last-msg-ts={}",
586 fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns);
4164020e
SM
587
588 switch (bt_message_get_type(msg)) {
589 case BT_MESSAGE_TYPE_EVENT:
4164020e
SM
590 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(msg);
591 break;
592 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
4164020e
SM
593 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
594 break;
595 case BT_MESSAGE_TYPE_PACKET_END:
4164020e
SM
596 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
597 break;
598 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
4164020e
SM
599 clock_snapshot =
600 bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
601 break;
602 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
4164020e
SM
603 clock_snapshot =
604 bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
605 break;
606 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
607 clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const(msg);
608 break;
609 default:
610 /* All the other messages have a higher priority */
0f5c5d5c
SM
611 BT_CPPLOGD_STR_SPEC(lttng_live_msg_iter->logger,
612 "Message has no timestamp: using the last message timestamp.");
4164020e
SM
613 *ts_ns = last_msg_ts_ns;
614 goto end;
615 }
616
4164020e
SM
617 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
618 if (ret) {
0f5c5d5c
SM
619 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
620 "Cannot get nanoseconds from Epoch of clock snapshot: "
621 "clock-snapshot-addr={}",
622 fmt::ptr(clock_snapshot));
4164020e
SM
623 goto error;
624 }
625
626 goto end;
14f28187 627
14f28187 628error:
4164020e 629 ret = -1;
7cdc2bab 630
7cdc2bab 631end:
4164020e 632 if (ret == 0) {
0f5c5d5c
SM
633 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
634 "Found message's timestamp: iter-data-addr={}, msg-addr={}, "
635 "last-msg-ts={}, ts={}",
636 fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns, *ts_ns);
4164020e
SM
637 }
638
639 return ret;
7cdc2bab
MD
640}
641
4164020e
SM
642static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
643 struct lttng_live_msg_iter *lttng_live_msg_iter,
35e432d7 644 struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message)
7cdc2bab 645{
4164020e 646 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
4164020e 647 enum ctf_msg_iter_status status;
6dcda123 648 uint64_t session_idx;
4164020e
SM
649
650 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
651 struct lttng_live_session *session =
652 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
653
654 if (session->new_streams_needed) {
0f5c5d5c
SM
655 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
656 "Need an update for streams: "
657 "session-id={}",
658 session->id);
4164020e
SM
659 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
660 goto end;
661 }
6dcda123 662 for (lttng_live_trace::UP& trace : session->traces) {
4164020e 663 if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
0f5c5d5c
SM
664 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
665 "Need an update for metadata stream: "
666 "session-id={}, trace-id={}",
667 session->id, trace->id);
4164020e
SM
668 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
669 goto end;
670 }
671 }
672 }
673
674 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
675 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
0f5c5d5c
SM
676 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
677 "Invalid state of live stream iterator"
678 "stream-iter-status={}",
679 lttng_live_stream->state);
4164020e
SM
680 goto end;
681 }
682
35e432d7
SM
683 const bt_message *msg;
684 status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), &msg);
4164020e
SM
685 switch (status) {
686 case CTF_MSG_ITER_STATUS_EOF:
687 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
688 break;
689 case CTF_MSG_ITER_STATUS_OK:
35e432d7 690 message = bt2::ConstMessage::Shared::createWithoutRef(msg);
4164020e
SM
691 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
692 break;
693 case CTF_MSG_ITER_STATUS_AGAIN:
694 /*
695 * Continue immediately (end of packet). The next
696 * get_index may return AGAIN to delay the following
697 * attempt.
698 */
699 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
700 break;
701 case CTF_MSG_ITER_STATUS_ERROR:
702 default:
703 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
0f5c5d5c
SM
704 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
705 "CTF message iterator failed to get next message: "
706 "msg-iter={}, msg-iter-status={}",
707 fmt::ptr(lttng_live_stream->msg_iter), status);
4164020e
SM
708 break;
709 }
14f28187
FD
710
711end:
4164020e 712 return ret;
7cdc2bab
MD
713}
714
4164020e
SM
715static enum lttng_live_iterator_status
716lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
717 struct lttng_live_stream_iterator *stream_iter,
35e432d7 718 bt2::ConstMessage::Shared& curr_msg)
4a39caef 719{
4164020e 720 enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
4164020e 721
0f5c5d5c
SM
722 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
723 "Closing live stream iterator: stream-name=\"{}\", "
724 "viewer-stream-id={}",
e28ca558 725 stream_iter->name, stream_iter->viewer_stream_id);
4164020e
SM
726
727 /*
728 * The viewer has hung up on us so we are closing the stream. The
729 * `ctf_msg_iter` should simply realize that it needs to close the
730 * stream properly by emitting the necessary stream end message.
731 */
35e432d7 732 const bt_message *msg;
4164020e 733 enum ctf_msg_iter_status status =
35e432d7 734 ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), &msg);
4164020e
SM
735
736 if (status == CTF_MSG_ITER_STATUS_ERROR) {
0f5c5d5c
SM
737 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
738 "Error getting the next message from CTF message iterator");
4164020e
SM
739 live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
740 goto end;
741 } else if (status == CTF_MSG_ITER_STATUS_EOF) {
0f5c5d5c
SM
742 BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger,
743 "Reached the end of the live stream iterator.");
4164020e
SM
744 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
745 goto end;
746 }
747
748 BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
4a39caef 749
35e432d7
SM
750 curr_msg = bt2::ConstMessage::Shared::createWithoutRef(msg);
751
4a39caef 752end:
4164020e 753 return live_status;
4a39caef
FD
754}
755
7cdc2bab
MD
756/*
757 * helper function:
758 * handle_no_data_streams()
759 * retry:
760 * - for each ACTIVE_NO_DATA stream:
761 * - query relayd for stream data, or quiescence info.
762 * - if need metadata, get metadata, goto retry.
763 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
764 * - if quiescent, move to QUIESCENT streams
765 * - if fetched data, move to ACTIVE_DATA streams
766 * (at this point each stream either has data, or is quiescent)
767 *
768 *
769 * iterator_next:
770 * handle_new_streams_and_metadata()
771 * - query relayd for known streams, add them as ACTIVE_NO_DATA
772 * - query relayd for metadata
773 *
774 * call handle_active_no_data_streams()
775 *
776 * handle_quiescent_streams()
777 * - if at least one stream is ACTIVE_DATA:
778 * - peek stream event with lowest timestamp -> next_ts
779 * - for each quiescent stream
780 * - if next_ts >= quiescent end
781 * - set state to ACTIVE_NO_DATA
782 * - else
783 * - for each quiescent stream
784 * - set state to ACTIVE_NO_DATA
785 *
786 * call handle_active_no_data_streams()
787 *
788 * handle_active_data_streams()
789 * - if at least one stream is ACTIVE_DATA:
790 * - get stream event with lowest timestamp from heap
d6e69534 791 * - make that stream event the current message.
7cdc2bab
MD
792 * - move this stream heap position to its next event
793 * - if we need to fetch data from relayd, move
794 * stream to ACTIVE_NO_DATA.
795 * - return OK
796 * - return AGAIN
797 *
798 * end criterion: ctrl-c on client. If relayd exits or the session
799 * closes on the relay daemon side, we keep on waiting for streams.
800 * Eventually handle --end timestamp (also an end criterion).
801 *
802 * When disconnected from relayd: try to re-connect endlessly.
803 */
4164020e
SM
804static enum lttng_live_iterator_status
805lttng_live_iterator_next_msg_on_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
806 struct lttng_live_stream_iterator *stream_iter,
35e432d7 807 bt2::ConstMessage::Shared& curr_msg)
7cdc2bab 808{
4164020e
SM
809 enum lttng_live_iterator_status live_status;
810
0f5c5d5c
SM
811 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
812 "Advancing live stream iterator until next message if possible: "
813 "stream-name=\"{}\", viewer-stream-id={}",
e28ca558 814 stream_iter->name, stream_iter->viewer_stream_id);
4164020e
SM
815
816 if (stream_iter->has_stream_hung_up) {
817 /*
818 * The stream has hung up and the stream was properly closed
819 * during the last call to the current function. Return _END
820 * status now so that this stream iterator is removed for the
821 * stream iterator list.
822 */
823 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
824 goto end;
825 }
4a39caef 826
7cdc2bab 827retry:
4164020e
SM
828 LTTNG_LIVE_LOGD_STREAM_ITER(stream_iter);
829
830 /*
831 * Make sure we have the most recent metadata and possibly some new
832 * streams.
833 */
834 live_status = lttng_live_iterator_handle_new_streams_and_metadata(lttng_live_msg_iter);
835 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
836 goto end;
837 }
838
839 live_status =
840 lttng_live_iterator_next_handle_one_no_data_stream(lttng_live_msg_iter, stream_iter);
841 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
842 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
843 /*
844 * We overwrite `live_status` since `curr_msg` is
845 * likely set to a valid message in this function.
846 */
847 live_status =
848 lttng_live_iterator_close_stream(lttng_live_msg_iter, stream_iter, curr_msg);
849 }
850 goto end;
851 }
852
853 live_status = lttng_live_iterator_next_handle_one_quiescent_stream(lttng_live_msg_iter,
854 stream_iter, curr_msg);
855 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
35e432d7 856 BT_ASSERT(!curr_msg);
4164020e
SM
857 goto end;
858 }
35e432d7 859 if (curr_msg) {
4164020e
SM
860 goto end;
861 }
862 live_status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live_msg_iter,
863 stream_iter, curr_msg);
864 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
35e432d7 865 BT_ASSERT(!curr_msg);
4164020e 866 }
7cdc2bab
MD
867
868end:
4164020e 869 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
0f5c5d5c
SM
870 BT_CPPLOGD_SPEC(
871 lttng_live_msg_iter->logger,
872 "Ask the relay daemon for an updated view of the data and metadata streams");
4164020e
SM
873 goto retry;
874 }
14f28187 875
0f5c5d5c
SM
876 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
877 "Returning from advancing live stream iterator: status={}, "
878 "stream-name=\"{}\", viewer-stream-id={}",
e28ca558 879 live_status, stream_iter->name, stream_iter->viewer_stream_id);
f93afbf9 880
4164020e 881 return live_status;
7cdc2bab
MD
882}
883
35e432d7 884static bool is_discarded_packet_or_event_message(const bt2::ConstMessage msg)
8ec4d5ff 885{
35e432d7
SM
886 return msg.type() == bt2::MessageType::DiscardedEvents ||
887 msg.type() == bt2::MessageType::DiscardedPackets;
8ec4d5ff
JG
888}
889
4164020e
SM
890static enum lttng_live_iterator_status
891adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream,
35e432d7 892 const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out,
4164020e 893 uint64_t new_begin_ts)
8ec4d5ff 894{
4164020e
SM
895 enum bt_property_availability availability;
896 const bt_clock_snapshot *clock_snapshot;
897 uint64_t end_ts;
898 uint64_t count;
899
900 clock_snapshot = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg_in);
901 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
902
903 availability = bt_message_discarded_packets_get_count(msg_in, &count);
904 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
905
35e432d7 906 const auto msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
4164020e 907 iter, stream, new_begin_ts, end_ts);
35e432d7
SM
908
909 if (!msg) {
910 return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
4164020e
SM
911 }
912
35e432d7
SM
913 bt_message_discarded_packets_set_count(msg, count);
914 msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
915 return LTTNG_LIVE_ITERATOR_STATUS_OK;
8ec4d5ff
JG
916}
917
4164020e
SM
918static enum lttng_live_iterator_status
919adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream,
35e432d7 920 const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out,
4164020e 921 uint64_t new_begin_ts)
8ec4d5ff 922{
4164020e
SM
923 enum bt_property_availability availability;
924 const bt_clock_snapshot *clock_snapshot;
925 uint64_t end_ts;
926 uint64_t count;
927
928 clock_snapshot = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg_in);
929 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
930
931 availability = bt_message_discarded_events_get_count(msg_in, &count);
932 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
933
35e432d7 934 const auto msg = bt_message_discarded_events_create_with_default_clock_snapshots(
4164020e 935 iter, stream, new_begin_ts, end_ts);
35e432d7
SM
936
937 if (!msg) {
938 return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
4164020e
SM
939 }
940
35e432d7
SM
941 bt_message_discarded_events_set_count(msg, count);
942 msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
943 return LTTNG_LIVE_ITERATOR_STATUS_OK;
8ec4d5ff
JG
944}
945
4164020e
SM
946static enum lttng_live_iterator_status
947handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
948 struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns,
35e432d7 949 const bt2::ConstMessage& late_msg)
285951be 950{
4164020e
SM
951 const bt_clock_class *clock_class;
952 const bt_stream_class *stream_class;
953 enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status;
954 int64_t last_inactivity_ts_ns;
955 enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
956 enum lttng_live_iterator_status adjust_status;
35e432d7 957 bt2::ConstMessage::Shared adjusted_message;
4164020e
SM
958
959 /*
960 * The timestamp of the current message is before the last message sent
961 * by this component. We CANNOT send it as is.
962 *
963 * The only expected scenario in which that could happen is the
e7401568 964 * following, everything else is a bug in this component, relay daemon,
4164020e
SM
965 * or CTF parser.
966 *
967 * Expected scenario: The CTF message iterator emitted discarded
968 * packets and discarded events with synthesized beginning and end
969 * timestamps from the bounds of the last known packet and the newly
970 * decoded packet header. The CTF message iterator is not aware of
971 * stream inactivity beacons. Hence, we have to adjust the beginning
972 * timestamp of those types of messages if a stream signalled its
973 * inactivity up until _after_ the last known packet's beginning
974 * timestamp.
975 *
976 * Otherwise, the monotonicity guarantee of message timestamps would
977 * not be preserved.
978 *
979 * In short, the only scenario in which it's okay and fixable to
980 * received a late message is when:
981 * 1. the late message is a discarded packets or discarded events
f8a11f24 982 * message,
4164020e
SM
983 * 2. this stream produced an inactivity message downstream, and
984 * 3. the timestamp of the late message is within the inactivity
f8a11f24 985 * timespan we sent downstream through the inactivity message.
4164020e
SM
986 */
987
0f5c5d5c
SM
988 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
989 "Handling late message on live stream iterator: "
990 "stream-name=\"{}\", viewer-stream-id={}",
e28ca558 991 stream_iter->name, stream_iter->viewer_stream_id);
4164020e
SM
992
993 if (!stream_iter->last_inactivity_ts.is_set) {
0f5c5d5c
SM
994 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
995 "Invalid live stream state: "
996 "have a late message when no inactivity message "
997 "was ever sent for that stream.");
4164020e
SM
998 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
999 goto end;
1000 }
1001
1002 if (!is_discarded_packet_or_event_message(late_msg)) {
0f5c5d5c
SM
1003 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1004 "Invalid live stream state: "
1005 "have a late message that is not a packet discarded or "
1006 "event discarded message: late-msg-type={}",
35e432d7 1007 late_msg.type());
4164020e
SM
1008 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1009 goto end;
1010 }
1011
0b68f2bc 1012 stream_class = bt_stream_borrow_class_const(stream_iter->stream->libObjPtr());
4164020e
SM
1013 clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class);
1014
1015 ts_ns_status = bt_clock_class_cycles_to_ns_from_origin(
1016 clock_class, stream_iter->last_inactivity_ts.value, &last_inactivity_ts_ns);
1017 if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) {
0f5c5d5c
SM
1018 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1019 "Error converting last "
1020 "inactivity message timestamp to nanoseconds");
4164020e
SM
1021 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1022 goto end;
1023 }
1024
1025 if (last_inactivity_ts_ns <= late_msg_ts_ns) {
0f5c5d5c
SM
1026 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1027 "Invalid live stream state: "
1028 "have a late message that is none included in a stream "
1029 "inactivity timespan: last-inactivity-ts-ns={}, "
1030 "late-msg-ts-ns={}",
1031 last_inactivity_ts_ns, late_msg_ts_ns);
4164020e
SM
1032 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1033 goto end;
1034 }
1035
1036 /*
1037 * We now know that it's okay for this message to be late, we can now
1038 * adjust its timestamp to ensure monotonicity.
1039 */
0f5c5d5c
SM
1040 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1041 "Adjusting the timestamp of late message: late-msg-type={}, "
1042 "msg-new-ts-ns={}",
35e432d7
SM
1043 late_msg.type(), stream_iter->last_inactivity_ts.value);
1044 switch (late_msg.type()) {
1045 case bt2::MessageType::DiscardedEvents:
4164020e 1046 adjust_status = adjust_discarded_events_message(
35e432d7
SM
1047 lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(),
1048 late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
4164020e 1049 break;
35e432d7 1050 case bt2::MessageType::DiscardedPackets:
4164020e 1051 adjust_status = adjust_discarded_packets_message(
35e432d7
SM
1052 lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(),
1053 late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
4164020e
SM
1054 break;
1055 default:
1056 bt_common_abort();
1057 }
1058
1059 if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1060 stream_iter_status = adjust_status;
1061 goto end;
1062 }
1063
1064 BT_ASSERT_DBG(adjusted_message);
1065 stream_iter->current_msg = adjusted_message;
1066 stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
cefd84f8 1067
285951be 1068end:
4164020e 1069 return stream_iter_status;
285951be
FD
1070}
1071
4164020e
SM
1072static enum lttng_live_iterator_status
1073next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
1074 struct lttng_live_trace *live_trace,
1075 struct lttng_live_stream_iterator **youngest_trace_stream_iter)
7cdc2bab 1076{
4164020e 1077 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
4164020e 1078 enum lttng_live_iterator_status stream_iter_status;
4164020e
SM
1079 int64_t youngest_candidate_msg_ts = INT64_MAX;
1080 uint64_t stream_iter_idx;
1081
1082 BT_ASSERT_DBG(live_trace);
4164020e 1083
0f5c5d5c
SM
1084 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1085 "Finding the next stream iterator for trace: "
1086 "trace-id={}",
1087 live_trace->id);
4164020e
SM
1088 /*
1089 * Update the current message of every stream iterators of this trace.
1090 * The current msg of every stream must have a timestamp equal or
1091 * larger than the last message returned by this iterator. We must
1092 * ensure monotonicity.
1093 */
1094 stream_iter_idx = 0;
db00b877 1095 while (stream_iter_idx < live_trace->stream_iterators.size()) {
4164020e 1096 bool stream_iter_is_ended = false;
db00b877
SM
1097 lttng_live_stream_iterator *stream_iter =
1098 live_trace->stream_iterators[stream_iter_idx].get();
4164020e
SM
1099
1100 /*
1101 * If there is no current message for this stream, go fetch
1102 * one.
1103 */
1104 while (!stream_iter->current_msg) {
35e432d7 1105 bt2::ConstMessage::Shared msg;
4164020e
SM
1106 int64_t curr_msg_ts_ns = INT64_MAX;
1107
1108 stream_iter_status =
35e432d7 1109 lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, msg);
4164020e
SM
1110
1111 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1112 stream_iter_is_ended = true;
1113 break;
1114 }
1115
1116 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1117 goto end;
1118 }
1119
1120 BT_ASSERT_DBG(msg);
1121
0f5c5d5c
SM
1122 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1123 "Live stream iterator returned message: msg-type={}, "
1124 "stream-name=\"{}\", viewer-stream-id={}",
35e432d7 1125 msg->type(), stream_iter->name, stream_iter->viewer_stream_id);
4164020e
SM
1126
1127 /*
1128 * Get the timestamp in nanoseconds from origin of this
e7401568 1129 * message.
4164020e 1130 */
35e432d7
SM
1131 live_get_msg_ts_ns(lttng_live_msg_iter, msg->libObjPtr(),
1132 lttng_live_msg_iter->last_msg_ts_ns, &curr_msg_ts_ns);
4164020e
SM
1133
1134 /*
1135 * Check if the message of the current live stream
1136 * iterator occurred at the exact same time or after the
1137 * last message returned by this component's message
1138 * iterator. If not, we need to handle it with care.
1139 */
1140 if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
35e432d7 1141 stream_iter->current_msg = std::move(msg);
4164020e
SM
1142 stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
1143 } else {
1144 /*
1145 * We received a message from the past. This
1146 * may be fixable but it can also be an error.
1147 */
1148 stream_iter_status =
35e432d7 1149 handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, *msg);
4164020e 1150 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
0f5c5d5c
SM
1151 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1152 "Late message could not be handled correctly: "
1153 "lttng-live-msg-iter-addr={}, "
1154 "stream-name=\"{}\", "
1155 "curr-msg-ts={}, last-msg-ts={}",
e28ca558
SM
1156 fmt::ptr(lttng_live_msg_iter), stream_iter->name,
1157 curr_msg_ts_ns,
0f5c5d5c 1158 lttng_live_msg_iter->last_msg_ts_ns);
4164020e
SM
1159 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1160 goto end;
1161 }
1162 }
1163 }
1164
1165 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1166
1167 if (!stream_iter_is_ended) {
1168 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1169 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1170 /*
1171 * Update the current best candidate message
1172 * for the stream iterator of this live trace
1173 * to be forwarded downstream.
1174 */
1175 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1176 youngest_candidate_stream_iter = stream_iter;
1177 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1178 /*
1179 * Order the messages in an arbitrary but
1180 * deterministic way.
1181 */
1182 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1183 int ret = common_muxing_compare_messages(
35e432d7
SM
1184 stream_iter->current_msg->libObjPtr(),
1185 youngest_candidate_stream_iter->current_msg->libObjPtr());
4164020e
SM
1186 if (ret < 0) {
1187 /*
1188 * The `youngest_candidate_stream_iter->current_msg`
1189 * should go first. Update the next
1190 * iterator and the current timestamp.
1191 */
1192 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1193 youngest_candidate_stream_iter = stream_iter;
1194 } else if (ret == 0) {
1195 /*
1196 * Unable to pick which one should go
1197 * first.
1198 */
0f5c5d5c
SM
1199 BT_CPPLOGW_SPEC(
1200 lttng_live_msg_iter->logger,
4164020e 1201 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
0f5c5d5c
SM
1202 "stream-iter-addr={}"
1203 "stream-iter-addr={}",
1204 fmt::ptr(stream_iter), fmt::ptr(youngest_candidate_stream_iter));
4164020e
SM
1205 }
1206 }
1207
1208 stream_iter_idx++;
1209 } else {
1210 /*
1211 * The live stream iterator has ended. That
1212 * iterator is removed from the array, but
1213 * there is no need to increment
1214 * stream_iter_idx as
1215 * g_ptr_array_remove_index_fast replaces the
1216 * removed element with the array's last
1217 * element.
1218 */
db00b877 1219 bt2c::vectorFastRemove(live_trace->stream_iterators, stream_iter_idx);
4164020e
SM
1220 }
1221 }
1222
1223 if (youngest_candidate_stream_iter) {
1224 *youngest_trace_stream_iter = youngest_candidate_stream_iter;
1225 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1226 } else {
1227 /*
1228 * The only case where we don't have a candidate for this trace
1229 * is if we reached the end of all the iterators.
1230 */
db00b877 1231 BT_ASSERT(live_trace->stream_iterators.empty());
4164020e
SM
1232 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1233 }
14f28187
FD
1234
1235end:
4164020e 1236 return stream_iter_status;
14f28187
FD
1237}
1238
4164020e
SM
1239static enum lttng_live_iterator_status
1240next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
1241 struct lttng_live_session *session,
1242 struct lttng_live_stream_iterator **youngest_session_stream_iter)
14f28187 1243{
4164020e
SM
1244 enum lttng_live_iterator_status stream_iter_status;
1245 uint64_t trace_idx = 0;
1246 int64_t youngest_candidate_msg_ts = INT64_MAX;
1247 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
1248
0f5c5d5c
SM
1249 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1250 "Finding the next stream iterator for session: "
1251 "session-id={}",
1252 session->id);
4164020e
SM
1253 /*
1254 * Make sure we are attached to the session and look for new streams
1255 * and metadata.
1256 */
1257 stream_iter_status = lttng_live_get_session(lttng_live_msg_iter, session);
1258 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
1259 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE &&
1260 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) {
1261 goto end;
1262 }
1263
6dcda123 1264 while (trace_idx < session->traces.size()) {
4164020e
SM
1265 bool trace_is_ended = false;
1266 struct lttng_live_stream_iterator *stream_iter;
6dcda123 1267 lttng_live_trace *trace = session->traces[trace_idx].get();
4164020e
SM
1268
1269 stream_iter_status =
1270 next_stream_iterator_for_trace(lttng_live_msg_iter, trace, &stream_iter);
1271 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1272 /*
1273 * All the live stream iterators for this trace are
1274 * ENDed. Remove the trace from this session.
1275 */
1276 trace_is_ended = true;
1277 } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1278 goto end;
1279 }
1280
1281 if (!trace_is_ended) {
1282 BT_ASSERT_DBG(stream_iter);
1283
1284 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1285 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1286 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1287 youngest_candidate_stream_iter = stream_iter;
1288 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1289 /*
1290 * Order the messages in an arbitrary but
1291 * deterministic way.
1292 */
1293 int ret = common_muxing_compare_messages(
35e432d7
SM
1294 stream_iter->current_msg->libObjPtr(),
1295 youngest_candidate_stream_iter->current_msg->libObjPtr());
4164020e
SM
1296 if (ret < 0) {
1297 /*
1298 * The `youngest_candidate_stream_iter->current_msg`
1299 * should go first. Update the next iterator
1300 * and the current timestamp.
1301 */
1302 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1303 youngest_candidate_stream_iter = stream_iter;
1304 } else if (ret == 0) {
1305 /* Unable to pick which one should go first. */
0f5c5d5c
SM
1306 BT_CPPLOGW_SPEC(
1307 lttng_live_msg_iter->logger,
4164020e 1308 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
0f5c5d5c
SM
1309 "stream-iter-addr={}"
1310 "youngest-candidate-stream-iter-addr={}",
1311 fmt::ptr(stream_iter), fmt::ptr(youngest_candidate_stream_iter));
4164020e
SM
1312 }
1313 }
1314 trace_idx++;
1315 } else {
1316 /*
1317 * trace_idx is not incremented since
6dcda123 1318 * vectorFastRemove replaces the
4164020e
SM
1319 * element at trace_idx with the array's last element.
1320 */
6dcda123 1321 bt2c::vectorFastRemove(session->traces, trace_idx);
4164020e
SM
1322 }
1323 }
1324 if (youngest_candidate_stream_iter) {
1325 *youngest_session_stream_iter = youngest_candidate_stream_iter;
1326 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1327 } else {
1328 /*
1329 * The only cases where we don't have a candidate for this
1330 * trace is:
1331 * 1. if we reached the end of all the iterators of all the
1332 * traces of this session,
1333 * 2. if we never had live stream iterator in the first place.
1334 *
1335 * In either cases, we return END.
1336 */
6dcda123 1337 BT_ASSERT(session->traces.empty());
4164020e
SM
1338 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1339 }
7cdc2bab 1340end:
4164020e 1341 return stream_iter_status;
14f28187
FD
1342}
1343
4164020e 1344static inline void put_messages(bt_message_array_const msgs, uint64_t count)
14f28187 1345{
4164020e 1346 uint64_t i;
14f28187 1347
4164020e
SM
1348 for (i = 0; i < count; i++) {
1349 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
1350 }
7cdc2bab
MD
1351}
1352
4164020e
SM
1353bt_message_iterator_class_next_method_status
1354lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array_const msgs,
1355 uint64_t capacity, uint64_t *count)
d3eb6e8f 1356{
1e690349
SM
1357 try {
1358 bt_message_iterator_class_next_method_status status;
1359 enum lttng_live_viewer_status viewer_status;
1360 struct lttng_live_msg_iter *lttng_live_msg_iter =
1361 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_it);
1362 struct lttng_live_component *lttng_live = lttng_live_msg_iter->lttng_live_comp;
1363 enum lttng_live_iterator_status stream_iter_status;
1364 uint64_t session_idx;
4164020e 1365
1e690349 1366 *count = 0;
4164020e 1367
1e690349 1368 BT_ASSERT_DBG(lttng_live_msg_iter);
4164020e 1369
1e690349
SM
1370 if (G_UNLIKELY(lttng_live_msg_iter->was_interrupted)) {
1371 /*
f8a11f24
SM
1372 * The iterator was interrupted in a previous call to the
1373 * `_next()` method. We currently do not support generating
1374 * messages after such event. The babeltrace2 CLI should never
1375 * be running the graph after being interrupted. So this check
1376 * is to prevent other graph users from using this live
1377 * iterator in an messed up internal state.
1378 */
1e690349
SM
1379 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1380 BT_CPPLOGE_APPEND_CAUSE_SPEC(
1381 lttng_live_msg_iter->logger,
1382 "Message iterator was interrupted during a previous call to the `next()` and currently does not support continuing after such event.");
1383 goto end;
1384 }
4164020e 1385
1e690349 1386 /*
f8a11f24
SM
1387 * Clear all the invalid message reference that might be left over in
1388 * the output array.
1389 */
1e690349 1390 memset(msgs, 0, capacity * sizeof(*msgs));
4164020e 1391
1e690349 1392 /*
f8a11f24
SM
1393 * If no session are exposed on the relay found at the url provided by
1394 * the user, session count will be 0. In this case, we return status
1395 * end to return gracefully.
1396 */
1e690349
SM
1397 if (lttng_live_msg_iter->sessions->len == 0) {
1398 if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
1399 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1400 goto end;
1401 } else {
1402 /*
f8a11f24
SM
1403 * The are no more active session for this session
1404 * name. Retry to create a viewer session for the
1405 * requested session name.
1406 */
1e690349
SM
1407 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1408 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1409 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1410 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1411 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1412 "Error creating LTTng live viewer session");
1413 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1414 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1415 } else {
1416 bt_common_abort();
1417 }
1418 goto end;
4164020e 1419 }
4164020e
SM
1420 }
1421 }
4164020e 1422
1e690349
SM
1423 if (lttng_live_msg_iter->active_stream_iter == 0) {
1424 lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter);
1425 }
4164020e 1426
1e690349 1427 /*
f8a11f24
SM
1428 * Here the muxing of message is done.
1429 *
1430 * We need to iterate over all the streams of all the traces of all the
1431 * viewer sessions in order to get the message with the smallest
1432 * timestamp. In this case, a session is a viewer session and there is
1433 * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
1434 * kernel). Each viewer session can have multiple traces, for example,
1435 * 64bit UST viewer sessions could have multiple per-pid traces.
1436 *
1437 * We iterate over the streams of each traces to update and see what is
1438 * their next message's timestamp. From those timestamps, we select the
1439 * message with the smallest timestamp as the best candidate message
1440 * for that trace and do the same thing across all the sessions.
1441 *
1442 * We then compare the timestamp of best candidate message of all the
1443 * sessions to pick the message with the smallest timestamp and we
1444 * return it.
1445 */
1e690349
SM
1446 while (*count < capacity) {
1447 struct lttng_live_stream_iterator *youngest_stream_iter = NULL,
1448 *candidate_stream_iter = NULL;
1449 int64_t youngest_msg_ts_ns = INT64_MAX;
1450
1451 BT_ASSERT_DBG(lttng_live_msg_iter->sessions);
1452 session_idx = 0;
1453 while (session_idx < lttng_live_msg_iter->sessions->len) {
1454 struct lttng_live_session *session = (lttng_live_session *) g_ptr_array_index(
1455 lttng_live_msg_iter->sessions, session_idx);
1456
1457 /* Find the best candidate message to send downstream. */
1458 stream_iter_status = next_stream_iterator_for_session(lttng_live_msg_iter, session,
1459 &candidate_stream_iter);
1460
1461 /* If we receive an END status, it means that either:
f8a11f24
SM
1462 * - Those traces never had active streams (UST with no
1463 * data produced yet),
1464 * - All live stream iterators have ENDed.
1465 */
1e690349 1466 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
6dcda123 1467 if (session->closed && session->traces.empty()) {
1e690349 1468 /*
f8a11f24
SM
1469 * Remove the session from the list.
1470 * session_idx is not modified since
1471 * g_ptr_array_remove_index_fast
1472 * replaces the the removed element with
1473 * the array's last element.
1474 */
1e690349
SM
1475 g_ptr_array_remove_index_fast(lttng_live_msg_iter->sessions, session_idx);
1476 } else {
1477 session_idx++;
1478 }
1479 continue;
4164020e 1480 }
4164020e 1481
1e690349
SM
1482 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1483 goto return_status;
1484 }
4164020e 1485
1e690349
SM
1486 if (G_UNLIKELY(youngest_stream_iter == NULL) ||
1487 candidate_stream_iter->current_msg_ts_ns < youngest_msg_ts_ns) {
1488 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1489 youngest_stream_iter = candidate_stream_iter;
1490 } else if (candidate_stream_iter->current_msg_ts_ns == youngest_msg_ts_ns) {
1491 /*
f8a11f24
SM
1492 * The currently selected message to be sent
1493 * downstream next has the exact same timestamp
1494 * that of the current candidate message. We
1495 * must break the tie in a predictable manner.
1496 */
1e690349
SM
1497 BT_CPPLOGD_STR_SPEC(
1498 lttng_live_msg_iter->logger,
1499 "Two of the next message candidates have the same timestamps, pick one deterministically.");
1500 /*
f8a11f24
SM
1501 * Order the messages in an arbitrary but
1502 * deterministic way.
1503 */
35e432d7
SM
1504 int ret = common_muxing_compare_messages(
1505 candidate_stream_iter->current_msg->libObjPtr(),
1506 youngest_stream_iter->current_msg->libObjPtr());
1e690349
SM
1507 if (ret < 0) {
1508 /*
f8a11f24
SM
1509 * The `candidate_stream_iter->current_msg`
1510 * should go first. Update the next
1511 * iterator and the current timestamp.
1512 */
1e690349
SM
1513 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1514 youngest_stream_iter = candidate_stream_iter;
1515 } else if (ret == 0) {
1516 /* Unable to pick which one should go first. */
1517 BT_CPPLOGW_SPEC(
1518 lttng_live_msg_iter->logger,
1519 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1520 "next-stream-iter-addr={}"
1521 "candidate-stream-iter-addr={}",
1522 fmt::ptr(youngest_stream_iter), fmt::ptr(candidate_stream_iter));
1523 }
4164020e 1524 }
4164020e 1525
1e690349
SM
1526 session_idx++;
1527 }
4164020e 1528
1e690349
SM
1529 if (!youngest_stream_iter) {
1530 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1531 goto return_status;
1532 }
4164020e 1533
1e690349
SM
1534 BT_ASSERT_DBG(youngest_stream_iter->current_msg);
1535 /* Ensure monotonicity. */
1536 BT_ASSERT_DBG(lttng_live_msg_iter->last_msg_ts_ns <=
1537 youngest_stream_iter->current_msg_ts_ns);
4164020e 1538
1e690349 1539 /*
f8a11f24
SM
1540 * Insert the next message to the message batch. This will set
1541 * stream iterator current message to NULL so that next time
1542 * we fetch the next message of that stream iterator
1543 */
35e432d7 1544 msgs[*count] = youngest_stream_iter->current_msg.release().libObjPtr();
1e690349 1545 (*count)++;
4164020e 1546
1e690349
SM
1547 /* Update the last timestamp in nanoseconds sent downstream. */
1548 lttng_live_msg_iter->last_msg_ts_ns = youngest_msg_ts_ns;
1549 youngest_stream_iter->current_msg_ts_ns = INT64_MAX;
4164020e 1550
1e690349
SM
1551 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1552 }
f79c2d7a
FD
1553
1554return_status:
1e690349
SM
1555 switch (stream_iter_status) {
1556 case LTTNG_LIVE_ITERATOR_STATUS_OK:
1557 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
1558 /*
f8a11f24
SM
1559 * If we gathered messages, return _OK even if the graph was
1560 * interrupted. This allows for the components downstream to at
1561 * least get the those messages. If the graph was indeed
1562 * interrupted there should not be another _next() call as the
1563 * application will tear down the graph. This component class
1564 * doesn't support restarting after an interruption.
1565 */
1e690349
SM
1566 if (*count > 0) {
1567 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
1568 } else {
1569 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1570 }
1571 break;
1572 case LTTNG_LIVE_ITERATOR_STATUS_END:
1573 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1574 break;
1575 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
1576 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1577 "Memory error preparing the next batch of messages: "
1578 "live-iter-status={}",
1579 stream_iter_status);
1580 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1581 break;
1582 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
1583 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
1584 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
1585 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1586 "Error preparing the next batch of messages: "
1587 "live-iter-status={}",
1588 stream_iter_status);
4164020e 1589
1e690349
SM
1590 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1591 /* Put all existing messages on error. */
1592 put_messages(msgs, *count);
1593 break;
1594 default:
1595 bt_common_abort();
1596 }
14f28187 1597
f79c2d7a 1598end:
1e690349
SM
1599 return status;
1600 } catch (const std::bad_alloc&) {
1601 return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1602 } catch (const bt2::Error&) {
1603 return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1604 }
7cdc2bab 1605}
41a2b7ae 1606
4164020e
SM
1607static struct lttng_live_msg_iter *
1608lttng_live_msg_iter_create(struct lttng_live_component *lttng_live_comp,
1609 bt_self_message_iterator *self_msg_it)
4f74db52 1610{
0f5c5d5c
SM
1611 lttng_live_msg_iter *msg_iter = new lttng_live_msg_iter {lttng_live_comp->logger};
1612 msg_iter->self_comp = lttng_live_comp->self_comp;
1613 msg_iter->lttng_live_comp = lttng_live_comp;
1614 msg_iter->self_msg_iter = self_msg_it;
4164020e 1615
0f5c5d5c
SM
1616 msg_iter->active_stream_iter = 0;
1617 msg_iter->last_msg_ts_ns = INT64_MIN;
1618 msg_iter->was_interrupted = false;
4f74db52 1619
0f5c5d5c 1620 msg_iter->sessions =
4164020e 1621 g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_session);
0f5c5d5c 1622 BT_ASSERT(msg_iter->sessions);
4164020e 1623
0f5c5d5c 1624 return msg_iter;
4f74db52
FD
1625}
1626
4164020e
SM
1627bt_message_iterator_class_initialize_method_status
1628lttng_live_msg_iter_init(bt_self_message_iterator *self_msg_it,
7d91f1ac 1629 bt_self_message_iterator_configuration *, bt_self_component_port_output *)
7cdc2bab 1630{
1e690349
SM
1631 try {
1632 bt_message_iterator_class_initialize_method_status status;
1633 struct lttng_live_component *lttng_live;
1634 struct lttng_live_msg_iter *lttng_live_msg_iter;
1635 enum lttng_live_viewer_status viewer_status;
1636 bt_self_component *self_comp = bt_self_message_iterator_borrow_component(self_msg_it);
1637
1638 lttng_live = (lttng_live_component *) bt_self_component_get_data(self_comp);
1639
1640 /* There can be only one downstream iterator at the same time. */
1641 BT_ASSERT(!lttng_live->has_msg_iter);
1642 lttng_live->has_msg_iter = true;
1643
1644 lttng_live_msg_iter = lttng_live_msg_iter_create(lttng_live, self_msg_it);
1645 if (!lttng_live_msg_iter) {
1646 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live->logger,
1647 "Failed to create lttng_live_msg_iter");
1648 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1649 goto error;
1650 }
4164020e 1651
1e690349 1652 viewer_status = live_viewer_connection_create(
2780ec75 1653 lttng_live->params.url.c_str(), false, lttng_live_msg_iter, lttng_live_msg_iter->logger,
1e690349
SM
1654 &lttng_live_msg_iter->viewer_connection);
1655 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1656 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1657 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1658 "Failed to create viewer connection");
1659 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1660 /*
f8a11f24
SM
1661 * Interruption in the _iter_init() method is not
1662 * supported. Return an error.
1663 */
1e690349
SM
1664 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1665 "Interrupted while creating viewer connection");
1666 }
10265ebe 1667
1e690349
SM
1668 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1669 goto error;
1670 }
4164020e 1671
1e690349
SM
1672 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1673 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1674 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1675 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1676 "Failed to create viewer session");
1677 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1678 /*
f8a11f24
SM
1679 * Interruption in the _iter_init() method is not
1680 * supported. Return an error.
1681 */
1e690349
SM
1682 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1683 "Interrupted when creating viewer session");
1684 }
4164020e 1685
10265ebe 1686 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
4164020e 1687 goto error;
4164020e 1688 }
4164020e 1689
1e690349
SM
1690 if (lttng_live_msg_iter->sessions->len == 0) {
1691 switch (lttng_live->params.sess_not_found_act) {
1692 case SESSION_NOT_FOUND_ACTION_CONTINUE:
1693 BT_CPPLOGI_SPEC(
1694 lttng_live_msg_iter->logger,
1695 "Unable to connect to the requested live viewer session. "
1696 "Keep trying to connect because of {}=\"{}\" component parameter: url=\"{}\"",
1697 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR,
2780ec75 1698 lttng_live->params.url);
1e690349
SM
1699 break;
1700 case SESSION_NOT_FOUND_ACTION_FAIL:
1701 BT_CPPLOGE_APPEND_CAUSE_SPEC(
1702 lttng_live_msg_iter->logger,
1703 "Unable to connect to the requested live viewer session. "
1704 "Fail the message iterator initialization because of {}=\"{}\" "
1705 "component parameter: url =\"{}\"",
1706 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_FAIL_STR,
2780ec75 1707 lttng_live->params.url);
1e690349
SM
1708 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1709 goto error;
1710 case SESSION_NOT_FOUND_ACTION_END:
1711 BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger,
1712 "Unable to connect to the requested live viewer session. "
1713 "End gracefully at the first _next() call because of {}=\"{}\""
1714 " component parameter: url=\"{}\"",
1715 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_END_STR,
2780ec75 1716 lttng_live->params.url);
1e690349
SM
1717 break;
1718 default:
1719 bt_common_abort();
1720 }
1721 }
1722
1723 bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
1724 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
1725 goto end;
f79c2d7a 1726
14f28187 1727error:
1e690349 1728 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
7cdc2bab 1729end:
1e690349
SM
1730 return status;
1731 } catch (const std::bad_alloc&) {
1732 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1733 } catch (const bt2::Error&) {
1734 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1735 }
7cdc2bab
MD
1736}
1737
80aff5ef 1738static struct bt_param_validation_map_value_entry_descr list_sessions_params[] = {
88730e42
SM
1739 {URL_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
1740 bt_param_validation_value_descr::makeString()},
4164020e
SM
1741 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
1742
1743static bt_component_class_query_method_status
1744lttng_live_query_list_sessions(const bt_value *params, const bt_value **result,
0f5c5d5c 1745 const bt2c::Logger& logger)
7cdc2bab 1746{
4164020e
SM
1747 bt_component_class_query_method_status status;
1748 const bt_value *url_value = NULL;
1749 const char *url;
1750 struct live_viewer_connection *viewer_connection = NULL;
1751 enum lttng_live_viewer_status viewer_status;
1752 enum bt_param_validation_status validation_status;
1753 gchar *validate_error = NULL;
1754
1755 validation_status = bt_param_validation_validate(params, list_sessions_params, &validate_error);
1756 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
1757 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1758 goto error;
1759 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
1760 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
0f5c5d5c 1761 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "{}", validate_error);
4164020e
SM
1762 goto error;
1763 }
1764
1765 url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
1766 url = bt_value_string_get(url_value);
1767
0f5c5d5c 1768 viewer_status = live_viewer_connection_create(url, true, NULL, logger, &viewer_connection);
4164020e
SM
1769 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1770 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
0f5c5d5c 1771 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Failed to create viewer connection");
4164020e
SM
1772 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1773 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1774 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
1775 } else {
1776 bt_common_abort();
1777 }
1778 goto error;
1779 }
1780
1781 status = live_viewer_connection_list_sessions(viewer_connection, result);
1782 if (status != BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK) {
0f5c5d5c 1783 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Failed to list viewer sessions");
4164020e
SM
1784 goto error;
1785 }
1786
1787 goto end;
c7eee084 1788
7cdc2bab 1789error:
4164020e 1790 BT_VALUE_PUT_REF_AND_RESET(*result);
c7eee084 1791
4164020e
SM
1792 if (status >= 0) {
1793 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1794 }
c7eee084 1795
7cdc2bab 1796end:
4164020e
SM
1797 if (viewer_connection) {
1798 live_viewer_connection_destroy(viewer_connection);
1799 }
80aff5ef 1800
4164020e 1801 g_free(validate_error);
80aff5ef 1802
4164020e 1803 return status;
7cdc2bab
MD
1804}
1805
4164020e
SM
1806static bt_component_class_query_method_status
1807lttng_live_query_support_info(const bt_value *params, const bt_value **result,
0f5c5d5c 1808 const bt2c::Logger& logger)
312df793 1809{
4164020e
SM
1810 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
1811 const bt_value *input_type_value;
1812 const bt_value *input_value;
1813 double weight = 0;
dd420a9b 1814 struct bt_common_lttng_live_url_parts parts = {};
4164020e
SM
1815
1816 /* Used by the logging macros */
1817 __attribute__((unused)) bt_self_component *self_comp = NULL;
1818
1819 *result = NULL;
1820 input_type_value = bt_value_map_borrow_entry_value_const(params, "type");
1821 if (!input_type_value) {
0f5c5d5c 1822 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Missing expected `type` parameter.");
4164020e
SM
1823 goto error;
1824 }
1825
1826 if (!bt_value_is_string(input_type_value)) {
0f5c5d5c 1827 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "`type` parameter is not a string value.");
4164020e
SM
1828 goto error;
1829 }
1830
1831 if (strcmp(bt_value_string_get(input_type_value), "string") != 0) {
1832 /* We don't handle file system paths */
1833 goto create_result;
1834 }
1835
1836 input_value = bt_value_map_borrow_entry_value_const(params, "input");
1837 if (!input_value) {
0f5c5d5c 1838 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Missing expected `input` parameter.");
4164020e
SM
1839 goto error;
1840 }
1841
1842 if (!bt_value_is_string(input_value)) {
0f5c5d5c 1843 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "`input` parameter is not a string value.");
4164020e
SM
1844 goto error;
1845 }
1846
1847 parts = bt_common_parse_lttng_live_url(bt_value_string_get(input_value), NULL, 0);
1848 if (parts.session_name) {
1849 /*
1850 * Looks pretty much like an LTTng live URL: we got the
1851 * session name part, which forms a complete URL.
1852 */
1853 weight = .75;
1854 }
312df793
PP
1855
1856create_result:
4164020e
SM
1857 *result = bt_value_real_create_init(weight);
1858 if (!*result) {
1859 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1860 goto error;
1861 }
312df793 1862
4164020e 1863 goto end;
312df793
PP
1864
1865error:
4164020e
SM
1866 if (status >= 0) {
1867 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1868 }
312df793 1869
4164020e 1870 BT_ASSERT(!*result);
312df793
PP
1871
1872end:
4164020e
SM
1873 bt_common_destroy_lttng_live_url_parts(&parts);
1874 return status;
312df793
PP
1875}
1876
4164020e
SM
1877bt_component_class_query_method_status lttng_live_query(bt_self_component_class_source *comp_class,
1878 bt_private_query_executor *priv_query_exec,
1879 const char *object, const bt_value *params,
1880 __attribute__((unused)) void *method_data,
1881 const bt_value **result)
7cdc2bab 1882{
1e690349
SM
1883 try {
1884 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
1885 bt2c::Logger logger {bt2::SelfComponentClass {comp_class},
1886 bt2::PrivateQueryExecutor {priv_query_exec},
1887 "PLUGIN/SRC.CTF.LTTNG-LIVE/QUERY"};
1888
1889 if (strcmp(object, "sessions") == 0) {
1890 status = lttng_live_query_list_sessions(params, result, logger);
1891 } else if (strcmp(object, "babeltrace.support-info") == 0) {
1892 status = lttng_live_query_support_info(params, result, logger);
1893 } else {
1894 BT_CPPLOGI_SPEC(logger, "Unknown query object `{}`", object);
1895 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT;
1896 goto end;
1897 }
14f28187
FD
1898
1899end:
1e690349
SM
1900 return status;
1901 } catch (const std::bad_alloc&) {
1902 return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1903 } catch (const bt2::Error&) {
1904 return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1905 }
7cdc2bab
MD
1906}
1907
14f28187 1908void lttng_live_component_finalize(bt_self_component_source *component)
7cdc2bab 1909{
ef9e5f5d
SM
1910 lttng_live_component::UP {static_cast<lttng_live_component *>(
1911 bt_self_component_get_data(bt_self_component_source_as_self_component(component)))};
7cdc2bab
MD
1912}
1913
4164020e
SM
1914static enum session_not_found_action
1915parse_session_not_found_action_param(const bt_value *no_session_param)
14f28187 1916{
4164020e
SM
1917 enum session_not_found_action action;
1918 const char *no_session_act_str = bt_value_string_get(no_session_param);
1919
1920 if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) {
1921 action = SESSION_NOT_FOUND_ACTION_CONTINUE;
1922 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) {
1923 action = SESSION_NOT_FOUND_ACTION_FAIL;
1924 } else {
1925 BT_ASSERT(strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0);
1926 action = SESSION_NOT_FOUND_ACTION_END;
1927 }
1928
1929 return action;
14f28187
FD
1930}
1931
88730e42
SM
1932static bt_param_validation_value_descr inputs_elem_descr =
1933 bt_param_validation_value_descr::makeString();
80aff5ef
SM
1934
1935static const char *sess_not_found_action_choices[] = {
4164020e
SM
1936 SESS_NOT_FOUND_ACTION_CONTINUE_STR,
1937 SESS_NOT_FOUND_ACTION_FAIL_STR,
1938 SESS_NOT_FOUND_ACTION_END_STR,
80aff5ef
SM
1939};
1940
1941static struct bt_param_validation_map_value_entry_descr params_descr[] = {
88730e42
SM
1942 {INPUTS_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
1943 bt_param_validation_value_descr::makeArray(1, 1, inputs_elem_descr)},
1944 {SESS_NOT_FOUND_ACTION_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
1945 bt_param_validation_value_descr::makeString(sess_not_found_action_choices)},
4164020e
SM
1946 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
1947
1948static bt_component_class_initialize_method_status
0f5c5d5c 1949lttng_live_component_create(const bt_value *params, bt_self_component_source *self_comp,
ef9e5f5d 1950 lttng_live_component::UP& component)
7cdc2bab 1951{
4164020e
SM
1952 const bt_value *inputs_value;
1953 const bt_value *url_value;
1954 const bt_value *value;
4164020e
SM
1955 enum bt_param_validation_status validation_status;
1956 gchar *validation_error = NULL;
0f5c5d5c 1957 bt2c::Logger logger {bt2::SelfSourceComponent {self_comp}, "PLUGIN/SRC.CTF.LTTNG-LIVE/COMP"};
4164020e
SM
1958
1959 validation_status = bt_param_validation_validate(params, params_descr, &validation_error);
1960 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
ef9e5f5d 1961 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
4164020e 1962 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
6838d8aa 1963 bt2c::GCharUP errorFreer {validation_error};
0f5c5d5c 1964 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "{}", validation_error);
ef9e5f5d 1965 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
4164020e
SM
1966 }
1967
ef9e5f5d 1968 auto lttng_live = bt2s::make_unique<lttng_live_component>(std::move(logger));
0f5c5d5c 1969 lttng_live->self_comp = bt_self_component_source_as_self_component(self_comp);
4164020e
SM
1970 lttng_live->max_query_size = MAX_QUERY_SIZE;
1971 lttng_live->has_msg_iter = false;
1972
1973 inputs_value = bt_value_map_borrow_entry_value_const(params, INPUTS_PARAM);
1974 url_value = bt_value_array_borrow_element_by_index_const(inputs_value, 0);
2780ec75 1975 lttng_live->params.url = bt_value_string_get(url_value);
4164020e
SM
1976
1977 value = bt_value_map_borrow_entry_value_const(params, SESS_NOT_FOUND_ACTION_PARAM);
1978 if (value) {
1979 lttng_live->params.sess_not_found_act = parse_session_not_found_action_param(value);
1980 } else {
0f5c5d5c
SM
1981 BT_CPPLOGI_SPEC(lttng_live->logger,
1982 "Optional `{}` parameter is missing: defaulting to `{}`.",
1983 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR);
4164020e
SM
1984 lttng_live->params.sess_not_found_act = SESSION_NOT_FOUND_ACTION_CONTINUE;
1985 }
1986
ef9e5f5d
SM
1987 component = std::move(lttng_live);
1988 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
d3e4dcd8
PP
1989}
1990
4164020e
SM
1991bt_component_class_initialize_method_status
1992lttng_live_component_init(bt_self_component_source *self_comp_src,
7d91f1ac 1993 bt_self_component_source_configuration *, const bt_value *params, void *)
f3bc2010 1994{
1e690349 1995 try {
ef9e5f5d 1996 lttng_live_component::UP lttng_live;
1e690349
SM
1997 bt_component_class_initialize_method_status ret;
1998 bt_self_component *self_comp = bt_self_component_source_as_self_component(self_comp_src);
1999 bt_self_component_add_port_status add_port_status;
2000
ef9e5f5d 2001 ret = lttng_live_component_create(params, self_comp_src, lttng_live);
1e690349 2002 if (ret != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
ef9e5f5d 2003 return ret;
1e690349 2004 }
4164020e 2005
1e690349
SM
2006 add_port_status =
2007 bt_self_component_source_add_output_port(self_comp_src, "out", NULL, NULL);
2008 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
2009 ret = (bt_component_class_initialize_method_status) add_port_status;
ef9e5f5d 2010 return ret;
1e690349 2011 }
4164020e 2012
ef9e5f5d 2013 bt_self_component_set_data(self_comp, lttng_live.release());
7cdc2bab 2014
ef9e5f5d 2015 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
1e690349
SM
2016 } catch (const std::bad_alloc&) {
2017 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2018 } catch (const bt2::Error&) {
2019 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
2020 }
d85ef162 2021}
This page took 0.22853 seconds and 4 git commands to generate.