src.ctf.lttng-live: remove some goto error-handling
[deliverable/babeltrace.git] / src / plugins / ctf / lttng-live / lttng-live.cpp
CommitLineData
f3bc2010 1/*
0235b0db 2 * SPDX-License-Identifier: MIT
f3bc2010 3 *
14f28187 4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
f3bc2010 5 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7cdc2bab 6 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
f3bc2010 7 *
0235b0db 8 * Babeltrace CTF LTTng-live Client Component
f3bc2010
JG
9 */
10
27a14e13
SM
11#define BT_CLOG_CFG logCfg
12#define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE"
020bc26f 13
14f28187 14#include <inttypes.h>
c4f23e30 15#include <stdbool.h>
14f28187
FD
16#include <unistd.h>
17
3c22a242
FD
18#include <glib.h>
19
578e048b 20#include "common/assert.h"
3fadfbc0 21#include <babeltrace2/babeltrace.h>
578e048b 22#include "compat/compiler.h"
3fadfbc0 23#include <babeltrace2/types.h>
d2606006 24#include "cpp-common/exc.hpp"
0507910c 25#include "cpp-common/glib-up.hpp"
eb6655ff 26#include "cpp-common/make-unique.hpp"
2dbef7b6 27#include "cpp-common/vector.hpp"
f3bc2010 28
376fc2bd 29#include "plugins/common/muxing/muxing.h"
80aff5ef 30#include "plugins/common/param-validation/param-validation.h"
27a14e13 31#include "cpp-common/cfg-logging-error-reporting.hpp"
a5b34e0c 32#include "cpp-common/cfg-logging-error-reporting-throw.hpp"
376fc2bd 33
087cd0f5
SM
34#include "data-stream.hpp"
35#include "metadata.hpp"
36#include "lttng-live.hpp"
7cdc2bab 37
4164020e
SM
38#define MAX_QUERY_SIZE (256 * 1024)
39#define URL_PARAM "url"
40#define INPUTS_PARAM "inputs"
41#define SESS_NOT_FOUND_ACTION_PARAM "session-not-found-action"
42#define SESS_NOT_FOUND_ACTION_CONTINUE_STR "continue"
43#define SESS_NOT_FOUND_ACTION_FAIL_STR "fail"
44#define SESS_NOT_FOUND_ACTION_END_STR "end"
7cdc2bab 45
27a14e13 46#define print_dbg(fmt, ...) BT_CLOGD(fmt, ##__VA_ARGS__)
7cdc2bab 47
4164020e 48static const char *lttng_live_iterator_status_string(enum lttng_live_iterator_status status)
14f28187 49{
4164020e
SM
50 switch (status) {
51 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
52 return "LTTNG_LIVE_ITERATOR_STATUS_CONTINUE";
53 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
54 return "LTTNG_LIVE_ITERATOR_STATUS_AGAIN";
55 case LTTNG_LIVE_ITERATOR_STATUS_END:
56 return "LTTNG_LIVE_ITERATOR_STATUS_END";
57 case LTTNG_LIVE_ITERATOR_STATUS_OK:
58 return "LTTNG_LIVE_ITERATOR_STATUS_OK";
59 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
60 return "LTTNG_LIVE_ITERATOR_STATUS_INVAL";
61 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
62 return "LTTNG_LIVE_ITERATOR_STATUS_ERROR";
63 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
64 return "LTTNG_LIVE_ITERATOR_STATUS_NOMEM";
65 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
66 return "LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED";
67 default:
68 bt_common_abort();
69 }
14f28187
FD
70}
71
4164020e 72static const char *lttng_live_stream_state_string(enum lttng_live_stream_state state)
7cdc2bab 73{
4164020e
SM
74 switch (state) {
75 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
76 return "ACTIVE_NO_DATA";
77 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
78 return "QUIESCENT_NO_DATA";
79 case LTTNG_LIVE_STREAM_QUIESCENT:
80 return "QUIESCENT";
81 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
82 return "ACTIVE_DATA";
83 case LTTNG_LIVE_STREAM_EOF:
84 return "EOF";
85 default:
86 return "ERROR";
87 }
7cdc2bab 88}
7cdc2bab 89
34533ae0 90void lttng_live_stream_iterator_set_state(struct lttng_live_stream_iterator *stream_iter,
4164020e 91 enum lttng_live_stream_state new_state)
34533ae0 92{
27a14e13 93 const bt2_common::LogCfg& logCfg = stream_iter->logCfg;
34533ae0 94
27a14e13
SM
95 BT_CLOGD("Setting live stream iterator state: viewer-stream-id=%" PRIu64
96 ", old-state=%s, new-state=%s",
97 stream_iter->viewer_stream_id, lttng_live_stream_state_string(stream_iter->state),
98 lttng_live_stream_state_string(new_state));
34533ae0 99
4164020e 100 stream_iter->state = new_state;
34533ae0
FD
101}
102
4164020e
SM
103#define LTTNG_LIVE_LOGD_STREAM_ITER(live_stream_iter) \
104 do { \
27a14e13
SM
105 BT_CLOGD("Live stream iterator state=%s, " \
106 "last-inact-ts-is-set=%d, last-inact-ts-value=%" PRId64 ", " \
107 "curr-inact-ts=%" PRId64, \
108 lttng_live_stream_state_string(live_stream_iter->state), \
109 live_stream_iter->last_inactivity_ts.is_set, \
110 live_stream_iter->last_inactivity_ts.value, \
111 live_stream_iter->current_inactivity_ts); \
4164020e 112 } while (0);
6f79a7cf
MD
113
114BT_HIDDEN
9b4f9b42 115bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter)
6f79a7cf 116{
4164020e 117 if (!msg_iter) {
a77756b9 118 return false;
4164020e 119 }
7cdc2bab 120
a77756b9 121 return bt_self_message_iterator_is_interrupted(msg_iter->self_msg_iter);
7cdc2bab
MD
122}
123
4164020e
SM
124static struct lttng_live_trace *
125lttng_live_session_borrow_trace_by_id(struct lttng_live_session *session, uint64_t trace_id)
d3e4dcd8 126{
caec6d6f 127 for (lttng_live_trace::UP& trace : session->traces) {
4164020e 128 if (trace->id == trace_id) {
caec6d6f 129 return trace.get();
4164020e
SM
130 }
131 }
14f28187 132
caec6d6f 133 return nullptr;
7cdc2bab
MD
134}
135
4164020e
SM
136static struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
137 uint64_t trace_id)
7cdc2bab 138{
27a14e13 139 const bt2_common::LogCfg& logCfg = session->logCfg;
4164020e 140
27a14e13
SM
141 BT_CLOGD("Creating live trace: "
142 "session-id=%" PRIu64 ", trace-id=%" PRIu64,
143 session->id, trace_id);
6269f212 144
75537c13 145 lttng_live_trace::UP trace = bt2_common::makeUnique<lttng_live_trace>(logCfg);
4164020e
SM
146 trace->session = session;
147 trace->id = trace_id;
4164020e 148 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
4164020e 149
75537c13 150 lttng_live_trace *ret = trace.get();
caec6d6f 151 session->traces.emplace_back(std::move(trace));
75537c13
SM
152
153 return ret;
7cdc2bab
MD
154}
155
156BT_HIDDEN
4164020e
SM
157struct lttng_live_trace *
158lttng_live_session_borrow_or_create_trace_by_id(struct lttng_live_session *session,
159 uint64_t trace_id)
7cdc2bab 160{
a77756b9
SM
161 if (lttng_live_trace *trace = lttng_live_session_borrow_trace_by_id(session, trace_id)) {
162 return trace;
4164020e 163 }
7cdc2bab 164
4164020e 165 /* The session is the owner of the newly created trace. */
a77756b9 166 return lttng_live_create_trace(session, trace_id);
7cdc2bab
MD
167}
168
169BT_HIDDEN
4164020e
SM
170int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint64_t session_id,
171 const char *hostname, const char *session_name)
7cdc2bab 172{
27a14e13 173 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
4164020e 174
27a14e13
SM
175 BT_CLOGD("Adding live session: "
176 "session-id=%" PRIu64 ", hostname=\"%s\" session-name=\"%s\"",
177 session_id, hostname, session_name);
4164020e 178
c20a2d0f 179 lttng_live_session::UP session = bt2_common::makeUnique<lttng_live_session>(logCfg);
4164020e
SM
180 session->self_comp = lttng_live_msg_iter->self_comp;
181 session->id = session_id;
4164020e
SM
182 session->lttng_live_msg_iter = lttng_live_msg_iter;
183 session->new_streams_needed = true;
2991cc97
SM
184 session->hostname = hostname;
185 session->session_name = session_name;
4164020e 186
bef0ed48 187 lttng_live_msg_iter->sessions.emplace_back(std::move(session));
6269f212
SM
188
189 return 0;
7cdc2bab
MD
190}
191
0da33e67 192lttng_live_session::~lttng_live_session()
7cdc2bab 193{
27a14e13
SM
194 BT_CLOGD("Destroying live session: "
195 "session-id=%" PRIu64 ", session-name=\"%s\"",
0da33e67
SM
196 this->id, this->session_name.c_str());
197 if (this->id != -1ULL) {
198 if (lttng_live_session_detach(this)) {
199 if (!lttng_live_graph_is_canceled(this->lttng_live_msg_iter)) {
4164020e 200 /* Old relayd cannot detach sessions. */
0da33e67 201 BT_CLOGD("Unable to detach lttng live session %" PRIu64, this->id);
4164020e
SM
202 }
203 }
0da33e67 204 this->id = -1ULL;
4164020e 205 }
0da33e67 206}
4164020e 207
528635c1 208lttng_live_msg_iter::~lttng_live_msg_iter()
7cdc2bab 209{
528635c1
SM
210 BT_ASSERT(this->lttng_live_comp);
211 BT_ASSERT(this->lttng_live_comp->has_msg_iter);
14f28187 212
4164020e 213 /* All stream iterators must be destroyed at this point. */
528635c1
SM
214 BT_ASSERT(this->active_stream_iter == 0);
215 this->lttng_live_comp->has_msg_iter = false;
14f28187
FD
216}
217
218BT_HIDDEN
219void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
220{
e1fe0d2a
SM
221 lttng_live_msg_iter::UP {
222 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_iter)};
7cdc2bab
MD
223}
224
4164020e
SM
225static enum lttng_live_iterator_status
226lttng_live_iterator_next_check_stream_state(struct lttng_live_stream_iterator *lttng_live_stream)
7cdc2bab 227{
27a14e13 228 const bt2_common::LogCfg& logCfg = lttng_live_stream->logCfg;
4164020e
SM
229
230 switch (lttng_live_stream->state) {
231 case LTTNG_LIVE_STREAM_QUIESCENT:
232 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
233 break;
234 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
235 /* Invalid state. */
27a14e13 236 BT_CLOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
4164020e
SM
237 bt_common_abort();
238 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
239 /* Invalid state. */
27a14e13 240 BT_CLOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
4164020e
SM
241 bt_common_abort();
242 case LTTNG_LIVE_STREAM_EOF:
243 break;
244 }
245 return LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
246}
247
248/*
f93afbf9
FD
249 * For active no data stream, fetch next index. As a result of that it can
250 * become either:
251 * - quiescent: won't have events for a bit,
252 * - have data: need to get that data and produce the event,
253 * - have no data on this stream at this point: need to retry (AGAIN) or return
254 * EOF.
7cdc2bab 255 */
4164020e
SM
256static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
257 struct lttng_live_msg_iter *lttng_live_msg_iter,
258 struct lttng_live_stream_iterator *lttng_live_stream)
7cdc2bab 259{
27a14e13 260 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
4164020e
SM
261 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
262 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
263 struct packet_index index;
264
265 if (lttng_live_stream->trace->metadata_stream_state ==
266 LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
27a14e13 267 BT_CLOGD(
4164020e
SM
268 "Need to get an update for the metadata stream before proceeding further with this stream: "
269 "stream-name=\"%s\"",
dff1c223 270 lttng_live_stream->name.c_str());
4164020e
SM
271 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
272 goto end;
273 }
274
275 if (lttng_live_stream->trace->session->new_streams_needed) {
27a14e13
SM
276 BT_CLOGD("Need to get an update of all streams before proceeding further with this stream: "
277 "stream-name=\"%s\"",
dff1c223 278 lttng_live_stream->name.c_str());
4164020e
SM
279 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
280 goto end;
281 }
282
283 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
284 lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
285 goto end;
286 }
287 ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream, &index);
288 if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
289 goto end;
290 }
291
292 BT_ASSERT_DBG(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
293
294 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
295 uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts.value,
296 curr_inact_ts = lttng_live_stream->current_inactivity_ts;
297
298 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA && last_inact_ts == curr_inact_ts) {
299 /*
5cb27175
JG
300 * Because the stream is in the QUIESCENT_NO_DATA
301 * state, we can assert that the last_inactivity_ts was
302 * set and can be safely used in the `if` above.
303 */
4164020e
SM
304 BT_ASSERT(lttng_live_stream->last_inactivity_ts.is_set);
305
306 ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
307 LTTNG_LIVE_LOGD_STREAM_ITER(lttng_live_stream);
308 } else {
309 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
310 }
311 goto end;
312 }
313
314 lttng_live_stream->base_offset = index.offset;
315 lttng_live_stream->offset = index.offset;
316 lttng_live_stream->len = index.packet_size / CHAR_BIT;
317
27a14e13
SM
318 BT_CLOGD("Setting live stream reading info: stream-name=\"%s\", "
319 "viewer-stream-id=%" PRIu64 ", stream-base-offset=%" PRIu64 ", stream-offset=%" PRIu64
320 ", stream-len=%" PRIu64,
dff1c223 321 lttng_live_stream->name.c_str(), lttng_live_stream->viewer_stream_id,
27a14e13 322 lttng_live_stream->base_offset, lttng_live_stream->offset, lttng_live_stream->len);
f93afbf9 323
7cdc2bab 324end:
4164020e
SM
325 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
326 ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
327 }
328 return ret;
7cdc2bab
MD
329}
330
331/*
14f28187
FD
332 * Creation of the message requires the ctf trace class to be created
333 * beforehand, but the live protocol gives us all streams (including metadata)
334 * at once. So we split it in three steps: getting streams, getting metadata
335 * (which creates the ctf trace class), and then creating the per-stream
336 * messages.
7cdc2bab 337 */
4164020e
SM
338static enum lttng_live_iterator_status
339lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
340 struct lttng_live_session *session)
7cdc2bab 341{
27a14e13 342 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
4164020e 343 enum lttng_live_iterator_status status;
4164020e
SM
344
345 if (!session->attached) {
27a14e13 346 BT_CLOGD("Attach to session: session-id=%" PRIu64, session->id);
4164020e
SM
347 enum lttng_live_viewer_status attach_status =
348 lttng_live_session_attach(session, lttng_live_msg_iter->self_msg_iter);
349 if (attach_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
350 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
351 /*
352 * Clear any causes appended in
353 * `lttng_live_attach_session()` as we want to
354 * return gracefully since the graph was
355 * cancelled.
356 */
357 bt_current_thread_clear_error();
a77756b9 358 return LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
4164020e 359 } else {
27a14e13 360 BT_CLOGE_APPEND_CAUSE("Error attaching to LTTng live session");
a77756b9 361 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
4164020e 362 }
4164020e
SM
363 }
364 }
365
27a14e13
SM
366 BT_CLOGD("Updating all data streams: "
367 "session-id=%" PRIu64 ", session-name=\"%s\"",
2991cc97 368 session->id, session->session_name.c_str());
4164020e
SM
369
370 status = lttng_live_session_get_new_streams(session, lttng_live_msg_iter->self_msg_iter);
a4f118a3
FD
371 switch (status) {
372 case LTTNG_LIVE_ITERATOR_STATUS_OK:
373 break;
374 case LTTNG_LIVE_ITERATOR_STATUS_END:
375 /*
376 * We received a `_END` from the `_get_new_streams()` function,
377 * which means no more data will ever be received from the data
378 * streams of this session. But it's possible that the metadata
379 * is incomplete.
380 * The live protocol guarantees that we receive all the
381 * metadata needed before we receive data streams needing it.
382 * But it's possible to receive metadata NOT needed by
383 * data streams after the session was closed. For example, this
384 * could happen if a new event is registered and the session is
385 * stopped before any tracepoint for that event is actually
386 * fired.
387 */
27a14e13 388 BT_CLOGD(
a4f118a3
FD
389 "Updating streams returned _END status. Override status to _OK in order fetch any remaining metadata:"
390 "session-id=%" PRIu64 ", session-name=\"%s\"",
2991cc97 391 session->id, session->session_name.c_str());
a77756b9 392 return LTTNG_LIVE_ITERATOR_STATUS_OK;
a4f118a3 393 default:
a77756b9 394 return status;
4164020e
SM
395 }
396
27a14e13
SM
397 BT_CLOGD("Updating metadata stream for session: "
398 "session-id=%" PRIu64 ", session-name=\"%s\"",
2991cc97 399 session->id, session->session_name.c_str());
a4f118a3 400
caec6d6f
SM
401 for (lttng_live_trace::UP& trace : session->traces) {
402 status = lttng_live_metadata_update(trace.get());
4164020e
SM
403 switch (status) {
404 case LTTNG_LIVE_ITERATOR_STATUS_END:
405 case LTTNG_LIVE_ITERATOR_STATUS_OK:
4164020e
SM
406 break;
407 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
408 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
a77756b9 409 return status;
4164020e 410 default:
27a14e13
SM
411 BT_CLOGE_APPEND_CAUSE("Error updating trace metadata: "
412 "stream-iter-status=%s, trace-id=%" PRIu64,
413 lttng_live_iterator_status_string(status), trace->id);
a77756b9 414 return status;
4164020e
SM
415 }
416 }
417
418 /*
419 * Now that we have the metadata we can initialize the downstream
420 * iterator.
421 */
a77756b9 422 return lttng_live_lazy_msg_init(session, lttng_live_msg_iter->self_msg_iter);
7cdc2bab
MD
423}
424
4164020e
SM
425static void
426lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 427{
27a14e13 428 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
4164020e 429
bef0ed48 430 for (lttng_live_session::UP& session : lttng_live_msg_iter->sessions) {
27a14e13
SM
431 BT_CLOGD("Force marking session as needing new streams: "
432 "session-id=%" PRIu64,
433 session->id);
4164020e 434 session->new_streams_needed = true;
caec6d6f 435 for (lttng_live_trace::UP& trace : session->traces) {
27a14e13
SM
436 BT_CLOGD("Force marking trace metadata state as needing an update: "
437 "session-id=%" PRIu64 ", trace-id=%" PRIu64,
438 session->id, trace->id);
4164020e
SM
439
440 BT_ASSERT(trace->metadata_stream_state != LTTNG_LIVE_METADATA_STREAM_STATE_CLOSED);
441
442 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
443 }
444 }
7cdc2bab
MD
445}
446
4164020e
SM
447static enum lttng_live_iterator_status
448lttng_live_iterator_handle_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 449{
4164020e 450 enum lttng_live_viewer_status viewer_status;
27a14e13 451 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
bef0ed48 452 uint64_t nr_sessions_opened = 0;
4164020e
SM
453 enum session_not_found_action sess_not_found_act =
454 lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
455
27a14e13
SM
456 BT_CLOGD("Update data and metadata of all sessions: "
457 "live-msg-iter-addr=%p",
458 lttng_live_msg_iter);
4164020e
SM
459 /*
460 * In a remotely distant future, we could add a "new
461 * session" flag to the protocol, which would tell us that we
462 * need to query for new sessions even though we have sessions
463 * currently ongoing.
464 */
bef0ed48 465 if (lttng_live_msg_iter->sessions.empty()) {
4164020e 466 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
27a14e13 467 BT_CLOGD(
4164020e 468 "No session found. Exiting in accordance with the `session-not-found-action` parameter");
a77756b9 469 return LTTNG_LIVE_ITERATOR_STATUS_END;
4164020e 470 } else {
27a14e13 471 BT_CLOGD(
4164020e
SM
472 "No session found. Try creating a new one in accordance with the `session-not-found-action` parameter");
473 /*
474 * Retry to create a viewer session for the requested
475 * session name.
476 */
477 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
478 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
479 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
27a14e13 480 BT_CLOGE_APPEND_CAUSE("Error creating LTTng live viewer session");
a77756b9 481 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
4164020e 482 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
a77756b9 483 return LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
4164020e
SM
484 } else {
485 bt_common_abort();
486 }
4164020e
SM
487 }
488 }
489 }
490
bef0ed48 491 for (lttng_live_session::UP& session : lttng_live_msg_iter->sessions) {
a77756b9
SM
492 enum lttng_live_iterator_status status =
493 lttng_live_get_session(lttng_live_msg_iter, session.get());
4164020e
SM
494 switch (status) {
495 case LTTNG_LIVE_ITERATOR_STATUS_OK:
4164020e 496 case LTTNG_LIVE_ITERATOR_STATUS_END:
a4f118a3
FD
497 /*
498 * A session returned `_END`. Other sessions may still
499 * be active so we override the status and continue
500 * looping if needed.
501 */
4164020e
SM
502 break;
503 default:
a77756b9 504 return status;
4164020e
SM
505 }
506 if (!session->closed) {
507 nr_sessions_opened++;
508 }
509 }
510
511 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE && nr_sessions_opened == 0) {
a77756b9 512 return LTTNG_LIVE_ITERATOR_STATUS_END;
4164020e 513 } else {
a77756b9 514 return LTTNG_LIVE_ITERATOR_STATUS_OK;
4164020e 515 }
7cdc2bab
MD
516}
517
4164020e
SM
518static enum lttng_live_iterator_status
519emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
1553d862
SM
520 struct lttng_live_stream_iterator *stream_iter,
521 nonstd::optional<bt2::ConstMessage::Shared>& message, uint64_t timestamp)
7cdc2bab 522{
27a14e13 523 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
4164020e
SM
524 bt_message *msg = NULL;
525
526 BT_ASSERT(stream_iter->trace->clock_class);
527
27a14e13
SM
528 BT_CLOGD("Emitting inactivity message for stream: ctf-stream-id=%" PRIu64
529 ", viewer-stream-id=%" PRIu64 ", timestamp=%" PRIu64,
530 stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id, timestamp);
4164020e
SM
531
532 msg = bt_message_message_iterator_inactivity_create(lttng_live_msg_iter->self_msg_iter,
533 stream_iter->trace->clock_class, timestamp);
534 if (!msg) {
27a14e13 535 BT_CLOGE_APPEND_CAUSE("Error emitting message iterator inactivity message");
a77756b9 536 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
4164020e
SM
537 }
538
1553d862
SM
539 message = bt2::ConstMessage::Shared::createWithoutRef(msg);
540
a77756b9 541 return LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
542}
543
4164020e
SM
544static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
545 struct lttng_live_msg_iter *lttng_live_msg_iter,
1553d862
SM
546 struct lttng_live_stream_iterator *lttng_live_stream,
547 nonstd::optional<bt2::ConstMessage::Shared>& message)
7cdc2bab 548{
4164020e
SM
549 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
550 return LTTNG_LIVE_ITERATOR_STATUS_OK;
551 }
552
553 /*
554 * Check if we already sent an inactivty message downstream for this
555 * `current_inactivity_ts` value.
556 */
557 if (lttng_live_stream->last_inactivity_ts.is_set &&
558 lttng_live_stream->current_inactivity_ts == lttng_live_stream->last_inactivity_ts.value) {
559 lttng_live_stream_iterator_set_state(lttng_live_stream,
560 LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA);
561
a77756b9 562 return LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
4164020e
SM
563 }
564
a77756b9
SM
565 lttng_live_iterator_status status = emit_inactivity_message(
566 lttng_live_msg_iter, lttng_live_stream, message, lttng_live_stream->current_inactivity_ts);
567 if (status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
568 return status;
569 }
4164020e
SM
570
571 lttng_live_stream->last_inactivity_ts.value = lttng_live_stream->current_inactivity_ts;
572 lttng_live_stream->last_inactivity_ts.is_set = true;
a77756b9
SM
573
574 return LTTNG_LIVE_ITERATOR_STATUS_OK;
14f28187
FD
575}
576
4164020e
SM
577static int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter,
578 struct lttng_live_msg_iter *lttng_live_msg_iter,
579 const bt_message *msg, int64_t last_msg_ts_ns, int64_t *ts_ns)
14f28187 580{
4164020e
SM
581 const bt_clock_snapshot *clock_snapshot = NULL;
582 int ret = 0;
27a14e13 583 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
4164020e
SM
584
585 BT_ASSERT_DBG(msg);
586 BT_ASSERT_DBG(ts_ns);
587
27a14e13
SM
588 BT_CLOGD("Getting message's timestamp: iter-data-addr=%p, msg-addr=%p, "
589 "last-msg-ts=%" PRId64,
590 lttng_live_msg_iter, msg, last_msg_ts_ns);
4164020e
SM
591
592 switch (bt_message_get_type(msg)) {
593 case BT_MESSAGE_TYPE_EVENT:
4164020e
SM
594 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(msg);
595 break;
596 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
4164020e
SM
597 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
598 break;
599 case BT_MESSAGE_TYPE_PACKET_END:
4164020e
SM
600 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
601 break;
602 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
4164020e
SM
603 clock_snapshot =
604 bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
605 break;
606 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
4164020e
SM
607 clock_snapshot =
608 bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
609 break;
610 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
611 clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const(msg);
612 break;
613 default:
614 /* All the other messages have a higher priority */
a77756b9
SM
615 BT_CLOGD(
616 "Message has no timestamp, using the last message timestamp: iter-data-addr=%p, msg-addr=%p, "
617 "last-msg-ts=%" PRId64 ", ts=%" PRId64,
618 lttng_live_msg_iter, msg, last_msg_ts_ns, *ts_ns);
4164020e 619 *ts_ns = last_msg_ts_ns;
a77756b9 620 return 0;
4164020e
SM
621 }
622
4164020e
SM
623 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
624 if (ret) {
27a14e13
SM
625 BT_CLOGE_APPEND_CAUSE("Cannot get nanoseconds from Epoch of clock snapshot: "
626 "clock-snapshot-addr=%p",
627 clock_snapshot);
a77756b9 628 return -1;
4164020e
SM
629 }
630
a77756b9
SM
631 BT_CLOGD("Found message's timestamp: "
632 "iter-data-addr=%p, msg-addr=%p, "
633 "last-msg-ts=%" PRId64 ", ts=%" PRId64,
634 lttng_live_msg_iter, msg, last_msg_ts_ns, *ts_ns);
4164020e 635
a77756b9 636 return 0;
7cdc2bab
MD
637}
638
4164020e
SM
639static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
640 struct lttng_live_msg_iter *lttng_live_msg_iter,
1553d862
SM
641 struct lttng_live_stream_iterator *lttng_live_stream,
642 nonstd::optional<bt2::ConstMessage::Shared>& message)
7cdc2bab 643{
27a14e13 644 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
4164020e 645 enum ctf_msg_iter_status status;
4164020e 646
bef0ed48 647 for (lttng_live_session::UP& session : lttng_live_msg_iter->sessions) {
4164020e 648 if (session->new_streams_needed) {
27a14e13
SM
649 BT_CLOGD("Need an update for streams: "
650 "session-id=%" PRIu64,
651 session->id);
a77756b9 652 return LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
4164020e 653 }
caec6d6f 654 for (lttng_live_trace::UP& trace : session->traces) {
4164020e 655 if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
27a14e13
SM
656 BT_CLOGD("Need an update for metadata stream: "
657 "session-id=%" PRIu64 ", trace-id=%" PRIu64,
658 session->id, trace->id);
a77756b9 659 return LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
4164020e
SM
660 }
661 }
662 }
663
664 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
27a14e13
SM
665 BT_CLOGE_APPEND_CAUSE("Invalid state of live stream iterator"
666 "stream-iter-status=%s",
667 lttng_live_stream_state_string(lttng_live_stream->state));
a77756b9 668 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
4164020e
SM
669 }
670
1553d862
SM
671 const bt_message *msg;
672 status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), &msg);
4164020e
SM
673 switch (status) {
674 case CTF_MSG_ITER_STATUS_EOF:
a77756b9 675 return LTTNG_LIVE_ITERATOR_STATUS_END;
4164020e 676 case CTF_MSG_ITER_STATUS_OK:
1553d862 677 message = bt2::ConstMessage::Shared::createWithoutRef(msg);
a77756b9 678 return LTTNG_LIVE_ITERATOR_STATUS_OK;
4164020e
SM
679 case CTF_MSG_ITER_STATUS_AGAIN:
680 /*
681 * Continue immediately (end of packet). The next
682 * get_index may return AGAIN to delay the following
683 * attempt.
684 */
a77756b9 685 return LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
4164020e
SM
686 case CTF_MSG_ITER_STATUS_ERROR:
687 default:
27a14e13
SM
688 BT_CLOGE_APPEND_CAUSE("CTF message iterator failed to get next message: "
689 "msg-iter=%p, msg-iter-status=%s",
8469ed41
SM
690 lttng_live_stream->msg_iter.get(),
691 ctf_msg_iter_status_string(status));
a77756b9 692 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
4164020e 693 }
7cdc2bab
MD
694}
695
4164020e
SM
696static enum lttng_live_iterator_status
697lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
698 struct lttng_live_stream_iterator *stream_iter,
1553d862 699 nonstd::optional<bt2::ConstMessage::Shared>& curr_msg)
4a39caef 700{
27a14e13 701 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
4164020e 702
27a14e13
SM
703 BT_CLOGD("Closing live stream iterator: stream-name=\"%s\", "
704 "viewer-stream-id=%" PRIu64,
dff1c223 705 stream_iter->name.c_str(), stream_iter->viewer_stream_id);
4164020e
SM
706
707 /*
708 * The viewer has hung up on us so we are closing the stream. The
709 * `ctf_msg_iter` should simply realize that it needs to close the
710 * stream properly by emitting the necessary stream end message.
711 */
1553d862 712 const bt_message *msg;
4164020e 713 enum ctf_msg_iter_status status =
1553d862 714 ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), &msg);
4164020e
SM
715
716 if (status == CTF_MSG_ITER_STATUS_ERROR) {
27a14e13 717 BT_CLOGE_APPEND_CAUSE("Error getting the next message from CTF message iterator");
a77756b9 718 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
4164020e 719 } else if (status == CTF_MSG_ITER_STATUS_EOF) {
27a14e13 720 BT_CLOGI("Reached the end of the live stream iterator.");
a77756b9 721 return LTTNG_LIVE_ITERATOR_STATUS_END;
4164020e
SM
722 }
723
724 BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
4a39caef 725
1553d862
SM
726 curr_msg = bt2::ConstMessage::Shared::createWithoutRef(msg);
727
a77756b9 728 return LTTNG_LIVE_ITERATOR_STATUS_OK;
4a39caef
FD
729}
730
7cdc2bab
MD
731/*
732 * helper function:
733 * handle_no_data_streams()
734 * retry:
735 * - for each ACTIVE_NO_DATA stream:
736 * - query relayd for stream data, or quiescence info.
737 * - if need metadata, get metadata, goto retry.
738 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
739 * - if quiescent, move to QUIESCENT streams
740 * - if fetched data, move to ACTIVE_DATA streams
741 * (at this point each stream either has data, or is quiescent)
742 *
743 *
744 * iterator_next:
745 * handle_new_streams_and_metadata()
746 * - query relayd for known streams, add them as ACTIVE_NO_DATA
747 * - query relayd for metadata
748 *
749 * call handle_active_no_data_streams()
750 *
751 * handle_quiescent_streams()
752 * - if at least one stream is ACTIVE_DATA:
753 * - peek stream event with lowest timestamp -> next_ts
754 * - for each quiescent stream
755 * - if next_ts >= quiescent end
756 * - set state to ACTIVE_NO_DATA
757 * - else
758 * - for each quiescent stream
759 * - set state to ACTIVE_NO_DATA
760 *
761 * call handle_active_no_data_streams()
762 *
763 * handle_active_data_streams()
764 * - if at least one stream is ACTIVE_DATA:
765 * - get stream event with lowest timestamp from heap
d6e69534 766 * - make that stream event the current message.
7cdc2bab
MD
767 * - move this stream heap position to its next event
768 * - if we need to fetch data from relayd, move
769 * stream to ACTIVE_NO_DATA.
770 * - return OK
771 * - return AGAIN
772 *
773 * end criterion: ctrl-c on client. If relayd exits or the session
774 * closes on the relay daemon side, we keep on waiting for streams.
775 * Eventually handle --end timestamp (also an end criterion).
776 *
777 * When disconnected from relayd: try to re-connect endlessly.
778 */
4164020e
SM
779static enum lttng_live_iterator_status
780lttng_live_iterator_next_msg_on_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
781 struct lttng_live_stream_iterator *stream_iter,
1553d862 782 nonstd::optional<bt2::ConstMessage::Shared>& curr_msg)
7cdc2bab 783{
27a14e13 784 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
4164020e
SM
785 enum lttng_live_iterator_status live_status;
786
27a14e13
SM
787 BT_CLOGD("Advancing live stream iterator until next message if possible: "
788 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
dff1c223 789 stream_iter->name.c_str(), stream_iter->viewer_stream_id);
4164020e
SM
790
791 if (stream_iter->has_stream_hung_up) {
792 /*
793 * The stream has hung up and the stream was properly closed
794 * during the last call to the current function. Return _END
795 * status now so that this stream iterator is removed for the
796 * stream iterator list.
797 */
798 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
799 goto end;
800 }
4a39caef 801
7cdc2bab 802retry:
4164020e
SM
803 LTTNG_LIVE_LOGD_STREAM_ITER(stream_iter);
804
805 /*
806 * Make sure we have the most recent metadata and possibly some new
807 * streams.
808 */
809 live_status = lttng_live_iterator_handle_new_streams_and_metadata(lttng_live_msg_iter);
810 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
811 goto end;
812 }
813
814 live_status =
815 lttng_live_iterator_next_handle_one_no_data_stream(lttng_live_msg_iter, stream_iter);
816 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
817 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
818 /*
819 * We overwrite `live_status` since `curr_msg` is
820 * likely set to a valid message in this function.
821 */
822 live_status =
823 lttng_live_iterator_close_stream(lttng_live_msg_iter, stream_iter, curr_msg);
824 }
825 goto end;
826 }
827
828 live_status = lttng_live_iterator_next_handle_one_quiescent_stream(lttng_live_msg_iter,
829 stream_iter, curr_msg);
830 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1553d862 831 BT_ASSERT(!curr_msg);
4164020e
SM
832 goto end;
833 }
1553d862 834 if (curr_msg) {
4164020e
SM
835 goto end;
836 }
837 live_status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live_msg_iter,
838 stream_iter, curr_msg);
839 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1553d862 840 BT_ASSERT(!curr_msg);
4164020e 841 }
7cdc2bab
MD
842
843end:
4164020e 844 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
27a14e13 845 BT_CLOGD("Ask the relay daemon for an updated view of the data and metadata streams");
4164020e
SM
846 goto retry;
847 }
14f28187 848
27a14e13
SM
849 BT_CLOGD("Returning from advancing live stream iterator: status=%s, "
850 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
dff1c223 851 lttng_live_iterator_status_string(live_status), stream_iter->name.c_str(),
27a14e13 852 stream_iter->viewer_stream_id);
f93afbf9 853
4164020e 854 return live_status;
7cdc2bab
MD
855}
856
1553d862 857static bool is_discarded_packet_or_event_message(bt2::ConstMessage msg)
8ec4d5ff 858{
1553d862 859 const bt2::MessageType type = msg.type();
8ec4d5ff 860
1553d862
SM
861 return type == bt2::MessageType::DISCARDED_EVENTS ||
862 type == bt2::MessageType::DISCARDED_PACKETS;
8ec4d5ff
JG
863}
864
1553d862
SM
865static enum lttng_live_iterator_status adjust_discarded_packets_message(
866 bt_self_message_iterator *iter, const bt_stream *stream, const bt_message *msg_in,
867 nonstd::optional<bt2::ConstMessage::Shared>& msg_out, uint64_t new_begin_ts)
8ec4d5ff 868{
4164020e
SM
869 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
870 enum bt_property_availability availability;
871 const bt_clock_snapshot *clock_snapshot;
872 uint64_t end_ts;
873 uint64_t count;
874
875 clock_snapshot = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg_in);
876 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
877
878 availability = bt_message_discarded_packets_get_count(msg_in, &count);
879 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
880
1553d862 881 bt_message *msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
4164020e 882 iter, stream, new_begin_ts, end_ts);
1553d862 883 if (!msg) {
4164020e
SM
884 status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
885 goto end;
886 }
887
1553d862
SM
888 bt_message_discarded_packets_set_count(msg, count);
889 msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
890
8ec4d5ff 891end:
4164020e 892 return status;
8ec4d5ff
JG
893}
894
1553d862
SM
895static enum lttng_live_iterator_status adjust_discarded_events_message(
896 bt_self_message_iterator *iter, const bt_stream *stream, const bt_message *msg_in,
897 nonstd::optional<bt2::ConstMessage::Shared>& msg_out, uint64_t new_begin_ts)
8ec4d5ff 898{
4164020e
SM
899 enum bt_property_availability availability;
900 const bt_clock_snapshot *clock_snapshot;
901 uint64_t end_ts;
902 uint64_t count;
903
904 clock_snapshot = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg_in);
905 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
906
907 availability = bt_message_discarded_events_get_count(msg_in, &count);
908 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
909
1553d862 910 bt_message *msg = bt_message_discarded_events_create_with_default_clock_snapshots(
4164020e 911 iter, stream, new_begin_ts, end_ts);
1553d862 912 if (!msg) {
a77756b9 913 return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
4164020e
SM
914 }
915
1553d862
SM
916 bt_message_discarded_events_set_count(msg, count);
917 msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
918
a77756b9 919 return LTTNG_LIVE_ITERATOR_STATUS_OK;
8ec4d5ff
JG
920}
921
4164020e
SM
922static enum lttng_live_iterator_status
923handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
924 struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns,
1553d862 925 bt2::ConstMessage::Shared late_msg)
285951be 926{
27a14e13 927 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
4164020e
SM
928 const bt_clock_class *clock_class;
929 const bt_stream_class *stream_class;
930 enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status;
931 int64_t last_inactivity_ts_ns;
4164020e 932 enum lttng_live_iterator_status adjust_status;
1553d862 933 nonstd::optional<bt2::ConstMessage::Shared> adjusted_message;
4164020e
SM
934
935 /*
936 * The timestamp of the current message is before the last message sent
937 * by this component. We CANNOT send it as is.
938 *
939 * The only expected scenario in which that could happen is the
940 * following, everything else is a bug in this component, relay deamon,
941 * or CTF parser.
942 *
943 * Expected scenario: The CTF message iterator emitted discarded
944 * packets and discarded events with synthesized beginning and end
945 * timestamps from the bounds of the last known packet and the newly
946 * decoded packet header. The CTF message iterator is not aware of
947 * stream inactivity beacons. Hence, we have to adjust the beginning
948 * timestamp of those types of messages if a stream signalled its
949 * inactivity up until _after_ the last known packet's beginning
950 * timestamp.
951 *
952 * Otherwise, the monotonicity guarantee of message timestamps would
953 * not be preserved.
954 *
955 * In short, the only scenario in which it's okay and fixable to
956 * received a late message is when:
957 * 1. the late message is a discarded packets or discarded events
958 * message,
959 * 2. this stream produced an inactivity message downstream, and
960 * 3. the timestamp of the late message is within the inactivity
961 * timespan we sent downstream through the inactivity message.
962 */
963
27a14e13
SM
964 BT_CLOGD("Handling late message on live stream iterator: "
965 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
dff1c223 966 stream_iter->name.c_str(), stream_iter->viewer_stream_id);
4164020e
SM
967
968 if (!stream_iter->last_inactivity_ts.is_set) {
27a14e13
SM
969 BT_CLOGE_APPEND_CAUSE("Invalid live stream state: "
970 "have a late message when no inactivity message "
971 "was ever sent for that stream.");
a77756b9 972 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
4164020e
SM
973 }
974
1553d862
SM
975 if (!is_discarded_packet_or_event_message(*late_msg)) {
976 BT_CLOGE_APPEND_CAUSE(
977 "Invalid live stream state: "
978 "have a late message that is not a packet discarded or "
979 "event discarded message: late-msg-type=%s",
980 bt_common_message_type_string(bt_message_get_type(late_msg->libObjPtr())));
a77756b9 981 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
4164020e
SM
982 }
983
5f16c381 984 stream_class = bt_stream_borrow_class_const((*stream_iter->stream)->libObjPtr());
4164020e
SM
985 clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class);
986
987 ts_ns_status = bt_clock_class_cycles_to_ns_from_origin(
988 clock_class, stream_iter->last_inactivity_ts.value, &last_inactivity_ts_ns);
989 if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) {
27a14e13
SM
990 BT_CLOGE_APPEND_CAUSE("Error converting last "
991 "inactivity message timestamp to nanoseconds");
a77756b9 992 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
4164020e
SM
993 }
994
995 if (last_inactivity_ts_ns <= late_msg_ts_ns) {
27a14e13
SM
996 BT_CLOGE_APPEND_CAUSE("Invalid live stream state: "
997 "have a late message that is none included in a stream "
998 "inactivity timespan: last-inactivity-ts-ns=%" PRIu64
999 "late-msg-ts-ns=%" PRIu64,
1000 last_inactivity_ts_ns, late_msg_ts_ns);
a77756b9 1001 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
4164020e
SM
1002 }
1003
1004 /*
1005 * We now know that it's okay for this message to be late, we can now
1006 * adjust its timestamp to ensure monotonicity.
1007 */
27a14e13
SM
1008 BT_CLOGD("Adjusting the timestamp of late message: late-msg-type=%s, "
1009 "msg-new-ts-ns=%" PRIu64,
1553d862 1010 bt_common_message_type_string(bt_message_get_type(late_msg->libObjPtr())),
27a14e13 1011 stream_iter->last_inactivity_ts.value);
1553d862
SM
1012 switch (late_msg->type()) {
1013 case bt2::MessageType::DISCARDED_EVENTS:
4164020e 1014 adjust_status = adjust_discarded_events_message(
1553d862
SM
1015 lttng_live_msg_iter->self_msg_iter, (*stream_iter->stream)->libObjPtr(),
1016 late_msg->libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
4164020e 1017 break;
1553d862 1018 case bt2::MessageType::DISCARDED_PACKETS:
4164020e 1019 adjust_status = adjust_discarded_packets_message(
1553d862
SM
1020 lttng_live_msg_iter->self_msg_iter, (*stream_iter->stream)->libObjPtr(),
1021 late_msg->libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
4164020e
SM
1022 break;
1023 default:
1024 bt_common_abort();
1025 }
1026
1027 if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
a77756b9 1028 return adjust_status;
4164020e
SM
1029 }
1030
1031 BT_ASSERT_DBG(adjusted_message);
1032 stream_iter->current_msg = adjusted_message;
1033 stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
cefd84f8 1034
a77756b9 1035 return LTTNG_LIVE_ITERATOR_STATUS_OK;
285951be
FD
1036}
1037
4164020e
SM
1038static enum lttng_live_iterator_status
1039next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
1040 struct lttng_live_trace *live_trace,
1041 struct lttng_live_stream_iterator **youngest_trace_stream_iter)
7cdc2bab 1042{
4164020e 1043 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
27a14e13 1044 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
4164020e
SM
1045 int64_t youngest_candidate_msg_ts = INT64_MAX;
1046 uint64_t stream_iter_idx;
1047
1048 BT_ASSERT_DBG(live_trace);
4164020e 1049
27a14e13
SM
1050 BT_CLOGD("Finding the next stream iterator for trace: "
1051 "trace-id=%" PRIu64,
1052 live_trace->id);
4164020e
SM
1053 /*
1054 * Update the current message of every stream iterators of this trace.
1055 * The current msg of every stream must have a timestamp equal or
1056 * larger than the last message returned by this iterator. We must
1057 * ensure monotonicity.
1058 */
1059 stream_iter_idx = 0;
2dbef7b6 1060 while (stream_iter_idx < live_trace->stream_iterators.size()) {
4164020e 1061 bool stream_iter_is_ended = false;
2dbef7b6
SM
1062 lttng_live_stream_iterator *stream_iter =
1063 live_trace->stream_iterators[stream_iter_idx].get();
4164020e
SM
1064
1065 /*
1066 * If there is no current message for this stream, go fetch
1067 * one.
1068 */
1069 while (!stream_iter->current_msg) {
1553d862 1070 nonstd::optional<bt2::ConstMessage::Shared> msg;
4164020e
SM
1071 int64_t curr_msg_ts_ns = INT64_MAX;
1072
a77756b9 1073 lttng_live_iterator_status stream_iter_status =
1553d862 1074 lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, msg);
4164020e
SM
1075
1076 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1077 stream_iter_is_ended = true;
1078 break;
1079 }
1080
1081 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
a77756b9 1082 return stream_iter_status;
4164020e
SM
1083 }
1084
1085 BT_ASSERT_DBG(msg);
1086
27a14e13
SM
1087 BT_CLOGD("Live stream iterator returned message: msg-type=%s, "
1088 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
1553d862 1089 bt_common_message_type_string(bt_message_get_type((*msg)->libObjPtr())),
dff1c223 1090 stream_iter->name.c_str(), stream_iter->viewer_stream_id);
4164020e
SM
1091
1092 /*
1093 * Get the timestamp in nanoseconds from origin of this
1094 * messsage.
1095 */
1553d862 1096 live_get_msg_ts_ns(stream_iter, lttng_live_msg_iter, (*msg)->libObjPtr(),
4164020e
SM
1097 lttng_live_msg_iter->last_msg_ts_ns, &curr_msg_ts_ns);
1098
1099 /*
1100 * Check if the message of the current live stream
1101 * iterator occurred at the exact same time or after the
1102 * last message returned by this component's message
1103 * iterator. If not, we need to handle it with care.
1104 */
1105 if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
1553d862 1106 stream_iter->current_msg = std::move(*msg);
4164020e
SM
1107 stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
1108 } else {
1109 /*
1110 * We received a message from the past. This
1111 * may be fixable but it can also be an error.
1112 */
1553d862
SM
1113 stream_iter_status = handle_late_message(lttng_live_msg_iter, stream_iter,
1114 curr_msg_ts_ns, std::move(*msg));
4164020e 1115 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
27a14e13
SM
1116 BT_CLOGE_APPEND_CAUSE("Late message could not be handled correctly: "
1117 "lttng-live-msg-iter-addr=%p, "
1118 "stream-name=\"%s\", "
1119 "curr-msg-ts=%" PRId64 ", last-msg-ts=%" PRId64,
dff1c223 1120 lttng_live_msg_iter, stream_iter->name.c_str(),
27a14e13 1121 curr_msg_ts_ns, lttng_live_msg_iter->last_msg_ts_ns);
a77756b9 1122 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
4164020e
SM
1123 }
1124 }
1125 }
1126
1127 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1128
1129 if (!stream_iter_is_ended) {
1130 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1131 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1132 /*
1133 * Update the current best candidate message
1134 * for the stream iterator of this live trace
1135 * to be forwarded downstream.
1136 */
1137 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1138 youngest_candidate_stream_iter = stream_iter;
1139 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1140 /*
1141 * Order the messages in an arbitrary but
1142 * deterministic way.
1143 */
1144 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1145 int ret = common_muxing_compare_messages(
1553d862
SM
1146 (*stream_iter->current_msg)->libObjPtr(),
1147 (*youngest_candidate_stream_iter->current_msg)->libObjPtr());
4164020e
SM
1148 if (ret < 0) {
1149 /*
1150 * The `youngest_candidate_stream_iter->current_msg`
1151 * should go first. Update the next
1152 * iterator and the current timestamp.
1153 */
1154 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1155 youngest_candidate_stream_iter = stream_iter;
1156 } else if (ret == 0) {
1157 /*
1158 * Unable to pick which one should go
1159 * first.
1160 */
27a14e13 1161 BT_CLOGW(
4164020e
SM
1162 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1163 "stream-iter-addr=%p"
1164 "stream-iter-addr=%p",
1165 stream_iter, youngest_candidate_stream_iter);
1166 }
1167 }
1168
1169 stream_iter_idx++;
1170 } else {
1171 /*
1172 * The live stream iterator has ended. That
1173 * iterator is removed from the array, but
1174 * there is no need to increment
1175 * stream_iter_idx as
1176 * g_ptr_array_remove_index_fast replaces the
1177 * removed element with the array's last
1178 * element.
1179 */
2dbef7b6 1180 bt2_common::vectorFastRemove(live_trace->stream_iterators, stream_iter_idx);
4164020e
SM
1181 }
1182 }
1183
1184 if (youngest_candidate_stream_iter) {
1185 *youngest_trace_stream_iter = youngest_candidate_stream_iter;
a77756b9 1186 return LTTNG_LIVE_ITERATOR_STATUS_OK;
4164020e
SM
1187 } else {
1188 /*
1189 * The only case where we don't have a candidate for this trace
1190 * is if we reached the end of all the iterators.
1191 */
2dbef7b6 1192 BT_ASSERT(live_trace->stream_iterators.empty());
a77756b9 1193 return LTTNG_LIVE_ITERATOR_STATUS_END;
4164020e 1194 }
14f28187
FD
1195}
1196
4164020e
SM
1197static enum lttng_live_iterator_status
1198next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
1199 struct lttng_live_session *session,
1200 struct lttng_live_stream_iterator **youngest_session_stream_iter)
14f28187 1201{
27a14e13 1202 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
4164020e
SM
1203 enum lttng_live_iterator_status stream_iter_status;
1204 uint64_t trace_idx = 0;
1205 int64_t youngest_candidate_msg_ts = INT64_MAX;
1206 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
1207
27a14e13
SM
1208 BT_CLOGD("Finding the next stream iterator for session: "
1209 "session-id=%" PRIu64,
1210 session->id);
4164020e
SM
1211 /*
1212 * Make sure we are attached to the session and look for new streams
1213 * and metadata.
1214 */
1215 stream_iter_status = lttng_live_get_session(lttng_live_msg_iter, session);
1216 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
1217 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE &&
1218 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) {
a77756b9 1219 return stream_iter_status;
4164020e
SM
1220 }
1221
caec6d6f 1222 while (trace_idx < session->traces.size()) {
4164020e
SM
1223 bool trace_is_ended = false;
1224 struct lttng_live_stream_iterator *stream_iter;
caec6d6f 1225 lttng_live_trace *trace = session->traces[trace_idx].get();
4164020e
SM
1226
1227 stream_iter_status =
1228 next_stream_iterator_for_trace(lttng_live_msg_iter, trace, &stream_iter);
1229 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1230 /*
1231 * All the live stream iterators for this trace are
1232 * ENDed. Remove the trace from this session.
1233 */
1234 trace_is_ended = true;
1235 } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
a77756b9 1236 return stream_iter_status;
4164020e
SM
1237 }
1238
1239 if (!trace_is_ended) {
1240 BT_ASSERT_DBG(stream_iter);
1241
1242 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1243 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1244 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1245 youngest_candidate_stream_iter = stream_iter;
1246 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1247 /*
1248 * Order the messages in an arbitrary but
1249 * deterministic way.
1250 */
1251 int ret = common_muxing_compare_messages(
1553d862
SM
1252 (*stream_iter->current_msg)->libObjPtr(),
1253 (*youngest_candidate_stream_iter->current_msg)->libObjPtr());
4164020e
SM
1254 if (ret < 0) {
1255 /*
1256 * The `youngest_candidate_stream_iter->current_msg`
1257 * should go first. Update the next iterator
1258 * and the current timestamp.
1259 */
1260 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1261 youngest_candidate_stream_iter = stream_iter;
1262 } else if (ret == 0) {
1263 /* Unable to pick which one should go first. */
27a14e13 1264 BT_CLOGW(
4164020e
SM
1265 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1266 "stream-iter-addr=%p"
1267 "stream-iter-addr=%p",
1268 stream_iter, youngest_candidate_stream_iter);
1269 }
1270 }
1271 trace_idx++;
1272 } else {
1273 /*
1274 * trace_idx is not incremented since
caec6d6f 1275 * vectorFastRemove replaces the
4164020e
SM
1276 * element at trace_idx with the array's last element.
1277 */
caec6d6f 1278 bt2_common::vectorFastRemove(session->traces, trace_idx);
4164020e
SM
1279 }
1280 }
1281 if (youngest_candidate_stream_iter) {
1282 *youngest_session_stream_iter = youngest_candidate_stream_iter;
a77756b9 1283 return LTTNG_LIVE_ITERATOR_STATUS_OK;
4164020e
SM
1284 } else {
1285 /*
1286 * The only cases where we don't have a candidate for this
1287 * trace is:
1288 * 1. if we reached the end of all the iterators of all the
1289 * traces of this session,
1290 * 2. if we never had live stream iterator in the first place.
1291 *
1292 * In either cases, we return END.
1293 */
caec6d6f 1294 BT_ASSERT(session->traces.empty());
a77756b9 1295 return LTTNG_LIVE_ITERATOR_STATUS_END;
4164020e 1296 }
14f28187
FD
1297}
1298
4164020e 1299static inline void put_messages(bt_message_array_const msgs, uint64_t count)
14f28187 1300{
4164020e 1301 uint64_t i;
14f28187 1302
4164020e
SM
1303 for (i = 0; i < count; i++) {
1304 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
1305 }
7cdc2bab
MD
1306}
1307
d3eb6e8f 1308BT_HIDDEN
4164020e
SM
1309bt_message_iterator_class_next_method_status
1310lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array_const msgs,
1311 uint64_t capacity, uint64_t *count)
d3eb6e8f 1312{
4164020e
SM
1313 struct lttng_live_msg_iter *lttng_live_msg_iter =
1314 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_it);
27a14e13 1315 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
4164020e 1316
d2606006
SM
1317 try {
1318 bt_message_iterator_class_next_method_status status;
1319 enum lttng_live_viewer_status viewer_status;
4164020e 1320
d2606006
SM
1321 struct lttng_live_component *lttng_live = lttng_live_msg_iter->lttng_live_comp;
1322 enum lttng_live_iterator_status stream_iter_status;
4164020e 1323
d2606006 1324 *count = 0;
4164020e 1325
d2606006 1326 BT_ASSERT_DBG(lttng_live_msg_iter);
4164020e 1327
d2606006 1328 if (G_UNLIKELY(lttng_live_msg_iter->was_interrupted)) {
4164020e 1329 /*
d2606006
SM
1330 * The iterator was interrupted in a previous call to the
1331 * `_next()` method. We currently do not support generating
1332 * messages after such event. The babeltrace2 CLI should never
1333 * be running the graph after being interrupted. So this check
1334 * is to prevent other graph users from using this live
1335 * iterator in an messed up internal state.
4164020e 1336 */
d2606006
SM
1337 BT_CLOGE_APPEND_CAUSE(
1338 "Message iterator was interrupted during a previous call to the `next()` and currently does not support continuing after such event.");
a77756b9 1339 return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
4164020e 1340 }
4164020e 1341
d2606006
SM
1342 /*
1343 * Clear all the invalid message reference that might be left over in
1344 * the output array.
1345 */
1346 memset(msgs, 0, capacity * sizeof(*msgs));
4164020e 1347
d2606006
SM
1348 /*
1349 * If no session are exposed on the relay found at the url provided by
1350 * the user, session count will be 0. In this case, we return status
1351 * end to return gracefully.
1352 */
bef0ed48 1353 if (lttng_live_msg_iter->sessions.empty()) {
d2606006 1354 if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
a77756b9 1355 return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
d2606006
SM
1356 } else {
1357 /*
1358 * The are no more active session for this session
1359 * name. Retry to create a viewer session for the
1360 * requested session name.
1361 */
1362 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1363 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1364 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
d2606006 1365 BT_CLOGE_APPEND_CAUSE("Error creating LTTng live viewer session");
a77756b9 1366 return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
d2606006 1367 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
a77756b9 1368 return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
d2606006
SM
1369 } else {
1370 bt_common_abort();
1371 }
4164020e 1372 }
4164020e 1373 }
d2606006 1374 }
4164020e 1375
d2606006
SM
1376 if (lttng_live_msg_iter->active_stream_iter == 0) {
1377 lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter);
1378 }
1379
1380 /*
1381 * Here the muxing of message is done.
1382 *
1383 * We need to iterate over all the streams of all the traces of all the
1384 * viewer sessions in order to get the message with the smallest
1385 * timestamp. In this case, a session is a viewer session and there is
1386 * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
1387 * kernel). Each viewer session can have multiple traces, for example,
1388 * 64bit UST viewer sessions could have multiple per-pid traces.
1389 *
1390 * We iterate over the streams of each traces to update and see what is
1391 * their next message's timestamp. From those timestamps, we select the
1392 * message with the smallest timestamp as the best candidate message
1393 * for that trace and do the same thing across all the sessions.
1394 *
1395 * We then compare the timestamp of best candidate message of all the
1396 * sessions to pick the message with the smallest timestamp and we
1397 * return it.
1398 */
1399 while (*count < capacity) {
1400 struct lttng_live_stream_iterator *youngest_stream_iter = NULL,
1401 *candidate_stream_iter = NULL;
1402 int64_t youngest_msg_ts_ns = INT64_MAX;
1403
bef0ed48
SM
1404 uint64_t session_idx = 0;
1405 while (session_idx < lttng_live_msg_iter->sessions.size()) {
1406 lttng_live_session *session = lttng_live_msg_iter->sessions[session_idx].get();
d2606006
SM
1407
1408 /* Find the best candidate message to send downstream. */
1409 stream_iter_status = next_stream_iterator_for_session(lttng_live_msg_iter, session,
1410 &candidate_stream_iter);
4164020e 1411
4164020e 1412 /*
d2606006
SM
1413 * If we receive an END status, it means that either:
1414 * - Those traces never had active streams (UST with no
1415 * data produced yet),
1416 * - All live stream iterators have ENDed.
4164020e 1417 */
d2606006 1418 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
caec6d6f 1419 if (session->closed && session->traces.empty()) {
d2606006
SM
1420 /*
1421 * Remove the session from the list.
1422 * session_idx is not modified since
1423 * g_ptr_array_remove_index_fast
1424 * replaces the the removed element with
1425 * the array's last element.
1426 */
bef0ed48 1427 bt2_common::vectorFastRemove(lttng_live_msg_iter->sessions, session_idx);
d2606006
SM
1428 } else {
1429 session_idx++;
1430 }
1431 continue;
1432 }
1433
1434 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1435 goto return_status;
1436 }
1437
1438 if (G_UNLIKELY(youngest_stream_iter == NULL) ||
1439 candidate_stream_iter->current_msg_ts_ns < youngest_msg_ts_ns) {
4164020e
SM
1440 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1441 youngest_stream_iter = candidate_stream_iter;
d2606006
SM
1442 } else if (candidate_stream_iter->current_msg_ts_ns == youngest_msg_ts_ns) {
1443 /*
1444 * The currently selected message to be sent
1445 * downstream next has the exact same timestamp
1446 * that of the current candidate message. We
1447 * must break the tie in a predictable manner.
1448 */
1449 BT_CLOGD_STR(
1450 "Two of the next message candidates have the same timestamps, pick one deterministically.");
1451 /*
1452 * Order the messages in an arbitrary but
1453 * deterministic way.
1454 */
1553d862
SM
1455 int ret = common_muxing_compare_messages(
1456 (*candidate_stream_iter->current_msg)->libObjPtr(),
1457 (*youngest_stream_iter->current_msg)->libObjPtr());
d2606006
SM
1458 if (ret < 0) {
1459 /*
1460 * The `candidate_stream_iter->current_msg`
1461 * should go first. Update the next
1462 * iterator and the current timestamp.
1463 */
1464 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1465 youngest_stream_iter = candidate_stream_iter;
1466 } else if (ret == 0) {
1467 /* Unable to pick which one should go first. */
1468 BT_CLOGW(
1469 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1470 "next-stream-iter-addr=%p"
1471 "candidate-stream-iter-addr=%p",
1472 youngest_stream_iter, candidate_stream_iter);
1473 }
4164020e 1474 }
4164020e 1475
d2606006
SM
1476 session_idx++;
1477 }
4164020e 1478
d2606006
SM
1479 if (!youngest_stream_iter) {
1480 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1481 goto return_status;
1482 }
4164020e 1483
d2606006
SM
1484 BT_ASSERT_DBG(youngest_stream_iter->current_msg);
1485 /* Ensure monotonicity. */
1486 BT_ASSERT_DBG(lttng_live_msg_iter->last_msg_ts_ns <=
1487 youngest_stream_iter->current_msg_ts_ns);
4164020e 1488
d2606006
SM
1489 /*
1490 * Insert the next message to the message batch. This will set
1491 * stream iterator current messsage to NULL so that next time
1492 * we fetch the next message of that stream iterator
1493 */
1553d862
SM
1494 msgs[*count] = youngest_stream_iter->current_msg->release().libObjPtr();
1495 youngest_stream_iter->current_msg.reset();
d2606006 1496 (*count)++;
4164020e 1497
d2606006
SM
1498 /* Update the last timestamp in nanoseconds sent downstream. */
1499 lttng_live_msg_iter->last_msg_ts_ns = youngest_msg_ts_ns;
1500 youngest_stream_iter->current_msg_ts_ns = INT64_MAX;
4164020e 1501
d2606006
SM
1502 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1503 }
f79c2d7a
FD
1504
1505return_status:
d2606006
SM
1506 switch (stream_iter_status) {
1507 case LTTNG_LIVE_ITERATOR_STATUS_OK:
1508 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
1509 /*
1510 * If we gathered messages, return _OK even if the graph was
1511 * interrupted. This allows for the components downstream to at
1512 * least get the thoses messages. If the graph was indeed
1513 * interrupted there should not be another _next() call as the
1514 * application will tear down the graph. This component class
1515 * doesn't support restarting after an interruption.
1516 */
1517 if (*count > 0) {
1518 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
1519 } else {
1520 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1521 }
1522 break;
1523 case LTTNG_LIVE_ITERATOR_STATUS_END:
1524 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1525 break;
1526 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
1527 BT_CLOGE_APPEND_CAUSE("Memory error preparing the next batch of messages: "
1528 "live-iter-status=%s",
1529 lttng_live_iterator_status_string(stream_iter_status));
1530 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1531 break;
1532 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
1533 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
1534 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
1535 BT_CLOGE_APPEND_CAUSE("Error preparing the next batch of messages: "
1536 "live-iter-status=%s",
1537 lttng_live_iterator_status_string(stream_iter_status));
1538
1539 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1540 /* Put all existing messages on error. */
1541 put_messages(msgs, *count);
1542 break;
1543 default:
1544 bt_common_abort();
4164020e 1545 }
14f28187 1546
d2606006
SM
1547 return status;
1548 } catch (const std::bad_alloc&) {
1549 return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1550 } catch (const bt2_common::Error&) {
1551 BT_CLOGE_APPEND_CAUSE("Failed to fetch next messages");
1552 return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1553 }
7cdc2bab 1554}
41a2b7ae 1555
e1fe0d2a 1556static lttng_live_msg_iter::UP
4164020e
SM
1557lttng_live_msg_iter_create(struct lttng_live_component *lttng_live_comp,
1558 bt_self_message_iterator *self_msg_it)
4f74db52 1559{
e1fe0d2a
SM
1560 lttng_live_msg_iter::UP lttng_live_msg_iter =
1561 bt2_common::makeUnique<struct lttng_live_msg_iter>(lttng_live_comp->logCfg);
4164020e
SM
1562 lttng_live_msg_iter->self_comp = lttng_live_comp->self_comp;
1563 lttng_live_msg_iter->lttng_live_comp = lttng_live_comp;
1564 lttng_live_msg_iter->self_msg_iter = self_msg_it;
1565
1566 lttng_live_msg_iter->active_stream_iter = 0;
1567 lttng_live_msg_iter->last_msg_ts_ns = INT64_MIN;
1568 lttng_live_msg_iter->was_interrupted = false;
4f74db52 1569
4164020e 1570 return lttng_live_msg_iter;
4f74db52
FD
1571}
1572
7cdc2bab 1573BT_HIDDEN
4164020e
SM
1574bt_message_iterator_class_initialize_method_status
1575lttng_live_msg_iter_init(bt_self_message_iterator *self_msg_it,
1576 bt_self_message_iterator_configuration *config,
1577 bt_self_component_port_output *self_port)
7cdc2bab 1578{
4164020e 1579 bt_self_component *self_comp = bt_self_message_iterator_borrow_component(self_msg_it);
d2606006
SM
1580 lttng_live_component *lttng_live =
1581 (lttng_live_component *) bt_self_component_get_data(self_comp);
27a14e13 1582 const bt2_common::LogCfg& logCfg = lttng_live->logCfg;
4164020e 1583
d2606006 1584 try {
d2606006 1585 enum lttng_live_viewer_status viewer_status;
4164020e 1586
d2606006
SM
1587 /* There can be only one downstream iterator at the same time. */
1588 BT_ASSERT(!lttng_live->has_msg_iter);
1589 lttng_live->has_msg_iter = true;
4164020e 1590
e1fe0d2a
SM
1591 lttng_live_msg_iter::UP lttng_live_msg_iter =
1592 lttng_live_msg_iter_create(lttng_live, self_msg_it);
d2606006
SM
1593 if (!lttng_live_msg_iter) {
1594 BT_CLOGE_APPEND_CAUSE("Failed to create lttng_live_msg_iter");
e1fe0d2a 1595 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
4164020e 1596 }
10265ebe 1597
3e03a79d 1598 viewer_status = live_viewer_connection_create(lttng_live->params.url.c_str(), false,
e1fe0d2a 1599 lttng_live_msg_iter.get(), logCfg,
772808ca 1600 lttng_live_msg_iter->viewer_connection);
d2606006
SM
1601 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1602 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1603 BT_CLOGE_APPEND_CAUSE("Failed to create viewer connection");
1604 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1605 /*
1606 * Interruption in the _iter_init() method is not
1607 * supported. Return an error.
1608 */
1609 BT_CLOGE_APPEND_CAUSE("Interrupted while creating viewer connection");
1610 }
4164020e 1611
e1fe0d2a 1612 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
4164020e 1613 }
10265ebe 1614
e1fe0d2a 1615 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter.get());
d2606006
SM
1616 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1617 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1618 BT_CLOGE_APPEND_CAUSE("Failed to create viewer session");
1619 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1620 /*
1621 * Interruption in the _iter_init() method is not
1622 * supported. Return an error.
1623 */
1624 BT_CLOGE_APPEND_CAUSE("Interrupted when creating viewer session");
1625 }
4164020e 1626
e1fe0d2a 1627 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
4164020e 1628 }
4164020e 1629
bef0ed48 1630 if (lttng_live_msg_iter->sessions.empty()) {
d2606006
SM
1631 switch (lttng_live->params.sess_not_found_act) {
1632 case SESSION_NOT_FOUND_ACTION_CONTINUE:
1633 BT_CLOGI(
1634 "Unable to connect to the requested live viewer session. Keep trying to connect because of "
1635 "%s=\"%s\" component parameter: url=\"%s\"",
1636 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR,
3e03a79d 1637 lttng_live->params.url.c_str());
d2606006
SM
1638 break;
1639 case SESSION_NOT_FOUND_ACTION_FAIL:
1640 BT_CLOGE_APPEND_CAUSE(
1641 "Unable to connect to the requested live viewer session. Fail the message iterator initialization because of %s=\"%s\" "
1642 "component parameter: url =\"%s\"",
1643 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_FAIL_STR,
3e03a79d 1644 lttng_live->params.url.c_str());
e1fe0d2a 1645 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
d2606006
SM
1646 case SESSION_NOT_FOUND_ACTION_END:
1647 BT_CLOGI(
1648 "Unable to connect to the requested live viewer session. End gracefully at the first _next() "
1649 "call because of %s=\"%s\" component parameter: "
1650 "url=\"%s\"",
1651 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_END_STR,
3e03a79d 1652 lttng_live->params.url.c_str());
d2606006
SM
1653 break;
1654 default:
1655 bt_common_abort();
1656 }
1657 }
1658
e1fe0d2a
SM
1659 bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter.release());
1660 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
d2606006
SM
1661 } catch (const std::bad_alloc&) {
1662 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1663 } catch (const bt2_common::Error&) {
1664 BT_CLOGE_APPEND_CAUSE("Failed to initialize iterator");
1665 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1666 }
7cdc2bab
MD
1667}
1668
80aff5ef 1669static struct bt_param_validation_map_value_entry_descr list_sessions_params[] = {
88730e42
SM
1670 {URL_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
1671 bt_param_validation_value_descr::makeString()},
4164020e
SM
1672 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
1673
a5b34e0c
SM
1674static bt2::Value::Shared lttng_live_query_list_sessions(bt2::ConstMapValue params,
1675 const bt2_common::LogCfg& logCfg)
7cdc2bab 1676{
4164020e 1677 const char *url;
772808ca 1678 live_viewer_connection::UP viewer_connection;
4164020e
SM
1679 enum lttng_live_viewer_status viewer_status;
1680 enum bt_param_validation_status validation_status;
1681 gchar *validate_error = NULL;
1682
34fab92e
SM
1683 validation_status =
1684 bt_param_validation_validate(params.libObjPtr(), list_sessions_params, &validate_error);
4164020e 1685 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
a5b34e0c 1686 throw bt2_common::MemoryError {};
4164020e 1687 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
a5b34e0c
SM
1688 bt2_common::GCharUP errorFreer {validate_error};
1689 BT_CLOGE_APPEND_CAUSE_AND_THROW(bt2::Error, "%s", validate_error);
4164020e
SM
1690 }
1691
34fab92e 1692 url = params[URL_PARAM]->asString().value().c_str();
4164020e 1693
772808ca 1694 viewer_status = live_viewer_connection_create(url, true, NULL, logCfg, viewer_connection);
4164020e
SM
1695 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1696 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
a5b34e0c 1697 BT_CLOGE_APPEND_CAUSE_AND_THROW(bt2::Error, "Failed to create viewer connection");
4164020e 1698 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
a5b34e0c 1699 throw bt2_common::TryAgain {};
4164020e
SM
1700 } else {
1701 bt_common_abort();
1702 }
4164020e
SM
1703 }
1704
a5b34e0c 1705 return live_viewer_connection_list_sessions(viewer_connection.get());
7cdc2bab
MD
1706}
1707
a5b34e0c
SM
1708static bt2::Value::Shared lttng_live_query_support_info(bt2::ConstMapValue params,
1709 const bt2_common::LogCfg& logCfg)
312df793 1710{
34fab92e 1711 nonstd::optional<bt2::ConstValue> inputValue;
4164020e 1712 struct bt_common_lttng_live_url_parts parts = {0};
81aa8979 1713 bt_common_lttng_live_url_parts_deleter partsDeleter {parts};
4164020e
SM
1714
1715 /* Used by the logging macros */
1716 __attribute__((unused)) bt_self_component *self_comp = NULL;
1717
34fab92e
SM
1718 nonstd::optional<bt2::ConstValue> typeValue = params["type"];
1719 if (!typeValue) {
a5b34e0c 1720 BT_CLOGE_APPEND_CAUSE_AND_THROW(bt2::Error, "Missing expected `type` parameter.");
4164020e
SM
1721 }
1722
34fab92e 1723 if (!typeValue->isString()) {
a5b34e0c 1724 BT_CLOGE_APPEND_CAUSE_AND_THROW(bt2::Error, "`type` parameter is not a string value.");
4164020e
SM
1725 }
1726
34fab92e 1727 if (typeValue->asString().value() != "string") {
4164020e 1728 /* We don't handle file system paths */
a5b34e0c 1729 return bt2::RealValue::create();
4164020e
SM
1730 }
1731
34fab92e
SM
1732 inputValue = params["input"];
1733 if (!inputValue) {
a5b34e0c 1734 BT_CLOGE_APPEND_CAUSE_AND_THROW(bt2::Error, "Missing expected `input` parameter.");
4164020e
SM
1735 }
1736
34fab92e 1737 if (!inputValue->isString()) {
a5b34e0c 1738 BT_CLOGE_APPEND_CAUSE_AND_THROW(bt2::Error, "`input` parameter is not a string value.");
4164020e
SM
1739 }
1740
34fab92e 1741 parts = bt_common_parse_lttng_live_url(inputValue->asString().value().c_str(), NULL, 0);
4164020e
SM
1742 if (parts.session_name) {
1743 /*
1744 * Looks pretty much like an LTTng live URL: we got the
1745 * session name part, which forms a complete URL.
1746 */
a5b34e0c 1747 return bt2::RealValue::create(.75);
4164020e 1748 }
312df793 1749
a5b34e0c 1750 return bt2::RealValue::create();
312df793
PP
1751}
1752
7cdc2bab 1753BT_HIDDEN
4164020e
SM
1754bt_component_class_query_method_status lttng_live_query(bt_self_component_class_source *comp_class,
1755 bt_private_query_executor *priv_query_exec,
1756 const char *object, const bt_value *params,
1757 __attribute__((unused)) void *method_data,
1758 const bt_value **result)
7cdc2bab 1759{
4164020e
SM
1760 bt_self_component_class *self_comp_class =
1761 bt_self_component_class_source_as_self_component_class(comp_class);
1762 bt_logging_level log_level = bt_query_executor_get_logging_level(
1763 bt_private_query_executor_as_query_executor_const(priv_query_exec));
27a14e13 1764 bt2_common::LogCfg logCfg(log_level, *self_comp_class);
4164020e 1765
d2606006 1766 try {
34fab92e 1767 bt2::ConstMapValue paramsObj(params);
32845257 1768 nonstd::optional<bt2::Value::Shared> resultObj;
d2606006
SM
1769
1770 if (strcmp(object, "sessions") == 0) {
a5b34e0c 1771 resultObj = lttng_live_query_list_sessions(paramsObj, logCfg);
d2606006 1772 } else if (strcmp(object, "babeltrace.support-info") == 0) {
a5b34e0c 1773 resultObj = lttng_live_query_support_info(paramsObj, logCfg);
d2606006
SM
1774 } else {
1775 BT_CLOGI("Unknown query object `%s`", object);
32845257
SM
1776 return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT;
1777 }
1778
a5b34e0c 1779 *result = resultObj->release().libObjPtr();
14f28187 1780
a5b34e0c
SM
1781 return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
1782 } catch (const bt2_common::TryAgain&) {
1783 return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
d2606006
SM
1784 } catch (const std::bad_alloc&) {
1785 return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1786 } catch (const bt2_common::Error&) {
1787 BT_CLOGE_APPEND_CAUSE("Failed to execute query");
1788 return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1789 }
7cdc2bab
MD
1790}
1791
7cdc2bab 1792BT_HIDDEN
14f28187 1793void lttng_live_component_finalize(bt_self_component_source *component)
7cdc2bab 1794{
eb6655ff
SM
1795 lttng_live_component::UP {(lttng_live_component *) bt_self_component_get_data(
1796 bt_self_component_source_as_self_component(component))};
7cdc2bab
MD
1797}
1798
4164020e
SM
1799static enum session_not_found_action
1800parse_session_not_found_action_param(const bt_value *no_session_param)
14f28187 1801{
4164020e
SM
1802 enum session_not_found_action action;
1803 const char *no_session_act_str = bt_value_string_get(no_session_param);
1804
1805 if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) {
1806 action = SESSION_NOT_FOUND_ACTION_CONTINUE;
1807 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) {
1808 action = SESSION_NOT_FOUND_ACTION_FAIL;
1809 } else {
1810 BT_ASSERT(strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0);
1811 action = SESSION_NOT_FOUND_ACTION_END;
1812 }
1813
1814 return action;
14f28187
FD
1815}
1816
88730e42
SM
1817static bt_param_validation_value_descr inputs_elem_descr =
1818 bt_param_validation_value_descr::makeString();
80aff5ef
SM
1819
1820static const char *sess_not_found_action_choices[] = {
4164020e
SM
1821 SESS_NOT_FOUND_ACTION_CONTINUE_STR,
1822 SESS_NOT_FOUND_ACTION_FAIL_STR,
1823 SESS_NOT_FOUND_ACTION_END_STR,
80aff5ef
SM
1824};
1825
1826static struct bt_param_validation_map_value_entry_descr params_descr[] = {
88730e42
SM
1827 {INPUTS_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
1828 bt_param_validation_value_descr::makeArray(1, 1, inputs_elem_descr)},
1829 {SESS_NOT_FOUND_ACTION_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
1830 bt_param_validation_value_descr::makeString(sess_not_found_action_choices)},
4164020e
SM
1831 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
1832
1833static bt_component_class_initialize_method_status
27a14e13 1834lttng_live_component_create(const bt_value *params, bt_self_component *self_comp,
eb6655ff 1835 const bt2_common::LogCfg& logCfg, lttng_live_component::UP& component)
7cdc2bab 1836{
4164020e
SM
1837 const bt_value *inputs_value;
1838 const bt_value *url_value;
1839 const bt_value *value;
1840 const char *url;
1841 enum bt_param_validation_status validation_status;
1842 gchar *validation_error = NULL;
4164020e
SM
1843
1844 validation_status = bt_param_validation_validate(params, params_descr, &validation_error);
1845 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
eb6655ff 1846 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
4164020e 1847 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
0507910c 1848 bt2_common::GCharUP errorFreer {validation_error};
27a14e13 1849 BT_CLOGE_APPEND_CAUSE("%s", validation_error);
eb6655ff 1850 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
4164020e
SM
1851 }
1852
eb6655ff 1853 lttng_live_component::UP lttng_live = bt2_common::makeUnique<lttng_live_component>(logCfg);
4164020e
SM
1854 lttng_live->self_comp = self_comp;
1855 lttng_live->max_query_size = MAX_QUERY_SIZE;
1856 lttng_live->has_msg_iter = false;
1857
1858 inputs_value = bt_value_map_borrow_entry_value_const(params, INPUTS_PARAM);
1859 url_value = bt_value_array_borrow_element_by_index_const(inputs_value, 0);
1860 url = bt_value_string_get(url_value);
1861
3e03a79d 1862 lttng_live->params.url = url;
4164020e
SM
1863
1864 value = bt_value_map_borrow_entry_value_const(params, SESS_NOT_FOUND_ACTION_PARAM);
1865 if (value) {
1866 lttng_live->params.sess_not_found_act = parse_session_not_found_action_param(value);
1867 } else {
27a14e13
SM
1868 BT_CLOGI("Optional `%s` parameter is missing: "
1869 "defaulting to `%s`.",
1870 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR);
4164020e
SM
1871 lttng_live->params.sess_not_found_act = SESSION_NOT_FOUND_ACTION_CONTINUE;
1872 }
1873
eb6655ff
SM
1874 component = std::move(lttng_live);
1875 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
d3e4dcd8
PP
1876}
1877
f3bc2010 1878BT_HIDDEN
4164020e
SM
1879bt_component_class_initialize_method_status
1880lttng_live_component_init(bt_self_component_source *self_comp_src,
1881 bt_self_component_source_configuration *config, const bt_value *params,
1882 __attribute__((unused)) void *init_method_data)
f3bc2010 1883{
4164020e
SM
1884 bt_self_component *self_comp = bt_self_component_source_as_self_component(self_comp_src);
1885 bt_logging_level log_level =
1886 bt_component_get_logging_level(bt_self_component_as_component(self_comp));
27a14e13 1887 bt2_common::LogCfg logCfg(log_level, *self_comp);
4164020e 1888
d2606006 1889 try {
eb6655ff 1890 lttng_live_component::UP lttng_live;
d2606006
SM
1891 bt_component_class_initialize_method_status ret;
1892 bt_self_component_add_port_status add_port_status;
4164020e 1893
eb6655ff 1894 ret = lttng_live_component_create(params, self_comp, logCfg, lttng_live);
d2606006 1895 if (ret != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
eb6655ff 1896 return ret;
d2606006 1897 }
4164020e 1898
d2606006
SM
1899 add_port_status =
1900 bt_self_component_source_add_output_port(self_comp_src, "out", NULL, NULL);
1901 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
1902 ret = (bt_component_class_initialize_method_status) add_port_status;
eb6655ff 1903 return ret;
d2606006
SM
1904 }
1905
eb6655ff 1906 bt_self_component_set_data(self_comp, lttng_live.release());
7cdc2bab 1907
eb6655ff 1908 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
d2606006
SM
1909 } catch (const std::bad_alloc&) {
1910 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1911 } catch (const bt2_common::Error&) {
1912 BT_CLOGE_APPEND_CAUSE("Failed to initialize component");
1913 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1914 }
d85ef162 1915}
This page took 0.210561 seconds and 5 git commands to generate.