Cleanup: src.ctf.lttng-live: remove usage of `bt_object`
[babeltrace.git] / src / plugins / ctf / lttng-live / lttng-live.c
CommitLineData
f3bc2010
JG
1/*
2 * lttng-live.c
3 *
4 * Babeltrace CTF LTTng-live Client Component
5 *
14f28187 6 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
f3bc2010 7 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7cdc2bab 8 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
f3bc2010
JG
9 *
10 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
11 *
12 * Permission is hereby granted, free of charge, to any person obtaining a copy
13 * of this software and associated documentation files (the "Software"), to deal
14 * in the Software without restriction, including without limitation the rights
15 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
16 * copies of the Software, and to permit persons to whom the Software is
17 * furnished to do so, subject to the following conditions:
18 *
19 * The above copyright notice and this permission notice shall be included in
20 * all copies or substantial portions of the Software.
21 *
22 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
25 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
27 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
28 * SOFTWARE.
29 */
30
2ece7dd0 31#define BT_COMP_LOG_SELF_COMP self_comp
c01594de 32#define BT_LOG_OUTPUT_LEVEL log_level
350ad6c1 33#define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE"
d9c39b0a 34#include "logging/comp-logging.h"
020bc26f 35
14f28187
FD
36#include <glib.h>
37#include <inttypes.h>
c4f23e30 38#include <stdbool.h>
14f28187
FD
39#include <unistd.h>
40
578e048b 41#include "common/assert.h"
3fadfbc0 42#include <babeltrace2/babeltrace.h>
578e048b 43#include "compat/compiler.h"
3fadfbc0 44#include <babeltrace2/types.h>
f3bc2010 45
376fc2bd 46#include "plugins/common/muxing/muxing.h"
80aff5ef 47#include "plugins/common/param-validation/param-validation.h"
376fc2bd 48
7cdc2bab
MD
49#include "data-stream.h"
50#include "metadata.h"
14f28187 51#include "lttng-live.h"
7cdc2bab 52
14f28187
FD
53#define MAX_QUERY_SIZE (256*1024)
54#define URL_PARAM "url"
312df793 55#define INPUTS_PARAM "inputs"
14f28187
FD
56#define SESS_NOT_FOUND_ACTION_PARAM "session-not-found-action"
57#define SESS_NOT_FOUND_ACTION_CONTINUE_STR "continue"
58#define SESS_NOT_FOUND_ACTION_FAIL_STR "fail"
59#define SESS_NOT_FOUND_ACTION_END_STR "end"
7cdc2bab 60
2ece7dd0 61#define print_dbg(fmt, ...) BT_COMP_LOGD(fmt, ## __VA_ARGS__)
7cdc2bab 62
14f28187
FD
63static
64const char *print_live_iterator_status(enum lttng_live_iterator_status status)
65{
66 switch (status) {
67 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
68 return "LTTNG_LIVE_ITERATOR_STATUS_CONTINUE";
69 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
70 return "LTTNG_LIVE_ITERATOR_STATUS_AGAIN";
71 case LTTNG_LIVE_ITERATOR_STATUS_END:
72 return "LTTNG_LIVE_ITERATOR_STATUS_END";
73 case LTTNG_LIVE_ITERATOR_STATUS_OK:
74 return "LTTNG_LIVE_ITERATOR_STATUS_OK";
75 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
76 return "LTTNG_LIVE_ITERATOR_STATUS_INVAL";
77 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
78 return "LTTNG_LIVE_ITERATOR_STATUS_ERROR";
79 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
80 return "LTTNG_LIVE_ITERATOR_STATUS_NOMEM";
81 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
82 return "LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED";
83 default:
84 abort();
85 }
86}
87
88static
89const char *print_state(struct lttng_live_stream_iterator *s)
7cdc2bab
MD
90{
91 switch (s->state) {
92 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
93 return "ACTIVE_NO_DATA";
94 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
95 return "QUIESCENT_NO_DATA";
96 case LTTNG_LIVE_STREAM_QUIESCENT:
97 return "QUIESCENT";
98 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
99 return "ACTIVE_DATA";
100 case LTTNG_LIVE_STREAM_EOF:
101 return "EOF";
102 default:
103 return "ERROR";
104 }
105}
7cdc2bab 106
14f28187
FD
107#define print_stream_state(live_stream_iter) \
108 do { \
2ece7dd0 109 BT_COMP_LOGD("stream state %s last_inact_ts %" PRId64 \
14f28187
FD
110 ", curr_inact_ts %" PRId64, \
111 print_state(live_stream_iter), \
112 live_stream_iter->last_inactivity_ts, \
113 live_stream_iter->current_inactivity_ts); \
114 } while (0);
6f79a7cf
MD
115
116BT_HIDDEN
9b4f9b42 117bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter)
6f79a7cf 118{
14f28187 119 bool ret;
6f79a7cf 120
9b4f9b42 121 if (!msg_iter) {
14f28187
FD
122 ret = false;
123 goto end;
7cdc2bab 124 }
7cdc2bab 125
9b4f9b42
PP
126 ret = bt_self_message_iterator_is_interrupted(
127 msg_iter->self_msg_iter);
4bf0e537 128
14f28187
FD
129end:
130 return ret;
7cdc2bab
MD
131}
132
133static
134struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
135 uint64_t trace_id)
d3e4dcd8 136{
14f28187
FD
137 uint64_t trace_idx;
138 struct lttng_live_trace *ret_trace = NULL;
7cdc2bab 139
14f28187
FD
140 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
141 struct lttng_live_trace *trace =
142 g_ptr_array_index(session->traces, trace_idx);
7cdc2bab 143 if (trace->id == trace_id) {
14f28187
FD
144 ret_trace = trace;
145 goto end;
7cdc2bab
MD
146 }
147 }
14f28187
FD
148
149end:
150 return ret_trace;
d3eb6e8f
PP
151}
152
7cdc2bab 153static
14f28187 154void lttng_live_destroy_trace(struct lttng_live_trace *trace)
7cdc2bab 155{
c01594de 156 bt_logging_level log_level = trace->log_level;
2ece7dd0 157 bt_self_component *self_comp = trace->self_comp;
c01594de 158
2ece7dd0 159 BT_COMP_LOGD("Destroy lttng_live_trace");
7cdc2bab 160
14f28187
FD
161 BT_ASSERT(trace->stream_iterators);
162 g_ptr_array_free(trace->stream_iterators, TRUE);
5bd230f4 163
14f28187
FD
164 BT_TRACE_PUT_REF_AND_RESET(trace->trace);
165 BT_TRACE_CLASS_PUT_REF_AND_RESET(trace->trace_class);
5bd230f4 166
7cdc2bab 167 lttng_live_metadata_fini(trace);
7cdc2bab
MD
168 g_free(trace);
169}
170
171static
172struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
173 uint64_t trace_id)
174{
175 struct lttng_live_trace *trace = NULL;
c01594de 176 bt_logging_level log_level = session->log_level;
2ece7dd0 177 bt_self_component *self_comp = session->self_comp;
7cdc2bab
MD
178
179 trace = g_new0(struct lttng_live_trace, 1);
180 if (!trace) {
1419db2b
FD
181 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
182 "Failed to allocate live trace");
7cdc2bab
MD
183 goto error;
184 }
c01594de 185 trace->log_level = session->log_level;
2ece7dd0 186 trace->self_comp = session->self_comp;
7cdc2bab
MD
187 trace->session = session;
188 trace->id = trace_id;
14f28187
FD
189 trace->trace_class = NULL;
190 trace->trace = NULL;
191 trace->stream_iterators = g_ptr_array_new_with_free_func(
0f1979c3 192 (GDestroyNotify) lttng_live_stream_iterator_destroy);
14f28187 193 BT_ASSERT(trace->stream_iterators);
7cdc2bab 194 trace->new_metadata_needed = true;
14f28187
FD
195 g_ptr_array_add(session->traces, trace);
196
2ece7dd0 197 BT_COMP_LOGI("Create trace");
7cdc2bab
MD
198 goto end;
199error:
200 g_free(trace);
201 trace = NULL;
202end:
203 return trace;
204}
205
206BT_HIDDEN
14f28187
FD
207struct lttng_live_trace *lttng_live_borrow_trace(
208 struct lttng_live_session *session, uint64_t trace_id)
7cdc2bab
MD
209{
210 struct lttng_live_trace *trace;
211
212 trace = lttng_live_find_trace(session, trace_id);
213 if (trace) {
14f28187 214 goto end;
7cdc2bab 215 }
7cdc2bab 216
14f28187
FD
217 /* The session is the owner of the newly created trace. */
218 trace = lttng_live_create_trace(session, trace_id);
7cdc2bab 219
14f28187
FD
220end:
221 return trace;
7cdc2bab
MD
222}
223
224BT_HIDDEN
14f28187 225int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
06994c71
MD
226 uint64_t session_id, const char *hostname,
227 const char *session_name)
7cdc2bab
MD
228{
229 int ret = 0;
14f28187 230 struct lttng_live_session *session;
c01594de 231 bt_logging_level log_level = lttng_live_msg_iter->log_level;
2ece7dd0 232 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
7cdc2bab 233
14f28187
FD
234 session = g_new0(struct lttng_live_session, 1);
235 if (!session) {
1419db2b
FD
236 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
237 "Failed to allocate live session");
7cdc2bab
MD
238 goto error;
239 }
240
c01594de 241 session->log_level = lttng_live_msg_iter->log_level;
2ece7dd0 242 session->self_comp = lttng_live_msg_iter->self_comp;
14f28187
FD
243 session->id = session_id;
244 session->traces = g_ptr_array_new_with_free_func(
0f1979c3 245 (GDestroyNotify) lttng_live_destroy_trace);
14f28187
FD
246 BT_ASSERT(session->traces);
247 session->lttng_live_msg_iter = lttng_live_msg_iter;
248 session->new_streams_needed = true;
249 session->hostname = g_string_new(hostname);
250 BT_ASSERT(session->hostname);
251
252 session->session_name = g_string_new(session_name);
253 BT_ASSERT(session->session_name);
7cdc2bab 254
2ece7dd0 255 BT_COMP_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s",
14f28187
FD
256 session->id, hostname, session_name);
257 g_ptr_array_add(lttng_live_msg_iter->sessions, session);
7cdc2bab
MD
258 goto end;
259error:
1419db2b 260 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error adding session");
14f28187 261 g_free(session);
7cdc2bab
MD
262 ret = -1;
263end:
264 return ret;
265}
266
267static
268void lttng_live_destroy_session(struct lttng_live_session *session)
269{
c01594de 270 bt_logging_level log_level;
2ece7dd0 271 bt_self_component *self_comp;
14f28187
FD
272
273 if (!session) {
274 goto end;
275 }
7cdc2bab 276
c01594de 277 log_level = session->log_level;
2ece7dd0
PP
278 self_comp = session->self_comp;
279 BT_COMP_LOGD("Destroy lttng live session");
7cdc2bab
MD
280 if (session->id != -1ULL) {
281 if (lttng_live_detach_session(session)) {
0757916e
FD
282 if (!lttng_live_graph_is_canceled(
283 session->lttng_live_msg_iter)) {
4c66436f 284 /* Old relayd cannot detach sessions. */
2ece7dd0 285 BT_COMP_LOGD("Unable to detach lttng live session %" PRIu64,
4c66436f
MD
286 session->id);
287 }
7cdc2bab
MD
288 }
289 session->id = -1ULL;
290 }
14f28187
FD
291
292 if (session->traces) {
293 g_ptr_array_free(session->traces, TRUE);
7cdc2bab 294 }
14f28187 295
06994c71
MD
296 if (session->hostname) {
297 g_string_free(session->hostname, TRUE);
298 }
299 if (session->session_name) {
300 g_string_free(session->session_name, TRUE);
301 }
7cdc2bab 302 g_free(session);
14f28187
FD
303
304end:
305 return;
7cdc2bab
MD
306}
307
14f28187
FD
308static
309void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 310{
14f28187
FD
311 if (!lttng_live_msg_iter) {
312 goto end;
7cdc2bab 313 }
7cdc2bab 314
14f28187
FD
315 if (lttng_live_msg_iter->sessions) {
316 g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
7cdc2bab 317 }
14f28187 318
b9e6ec43
FD
319 if (lttng_live_msg_iter->viewer_connection) {
320 live_viewer_connection_destroy(lttng_live_msg_iter->viewer_connection);
321 }
14f28187
FD
322 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
323 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
324
325 /* All stream iterators must be destroyed at this point. */
326 BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0);
327 lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false;
328
329 g_free(lttng_live_msg_iter);
330
331end:
332 return;
333}
334
335BT_HIDDEN
336void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
337{
338 struct lttng_live_msg_iter *lttng_live_msg_iter;
339
340 BT_ASSERT(self_msg_iter);
341
342 lttng_live_msg_iter = bt_self_message_iterator_get_data(self_msg_iter);
343 BT_ASSERT(lttng_live_msg_iter);
344 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
7cdc2bab
MD
345}
346
347static
14f28187 348enum lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
7cdc2bab
MD
349 struct lttng_live_stream_iterator *lttng_live_stream)
350{
c01594de 351 bt_logging_level log_level = lttng_live_stream->log_level;
2ece7dd0 352 bt_self_component *self_comp = lttng_live_stream->self_comp;
c01594de 353
7cdc2bab
MD
354 switch (lttng_live_stream->state) {
355 case LTTNG_LIVE_STREAM_QUIESCENT:
356 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
357 break;
358 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
359 /* Invalid state. */
2ece7dd0 360 BT_COMP_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
087bc060 361 abort();
7cdc2bab
MD
362 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
363 /* Invalid state. */
2ece7dd0 364 BT_COMP_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
087bc060 365 abort();
7cdc2bab
MD
366 case LTTNG_LIVE_STREAM_EOF:
367 break;
368 }
14f28187 369 return LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
370}
371
372/*
373 * For active no data stream, fetch next data. It can be either:
374 * - quiescent: need to put it in the prio heap at quiescent end
375 * timestamp,
376 * - have data: need to wire up first event into the prio heap,
377 * - have no data on this stream at this point: need to retry (AGAIN) or
378 * return EOF.
379 */
380static
14f28187
FD
381enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
382 struct lttng_live_msg_iter *lttng_live_msg_iter,
7cdc2bab
MD
383 struct lttng_live_stream_iterator *lttng_live_stream)
384{
c01594de 385 bt_logging_level log_level = lttng_live_msg_iter->log_level;
2ece7dd0 386 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
0f1979c3 387 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab 388 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
0f1979c3 389 struct packet_index index;
7cdc2bab
MD
390
391 if (lttng_live_stream->trace->new_metadata_needed) {
14f28187 392 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
7cdc2bab
MD
393 goto end;
394 }
395 if (lttng_live_stream->trace->session->new_streams_needed) {
14f28187 396 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
7cdc2bab
MD
397 goto end;
398 }
14f28187
FD
399 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
400 lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
7cdc2bab
MD
401 goto end;
402 }
14f28187
FD
403 ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream,
404 &index);
405 if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
7cdc2bab
MD
406 goto end;
407 }
98b15851 408 BT_ASSERT_DBG(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
7cdc2bab 409 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
14f28187
FD
410 uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts,
411 curr_inact_ts = lttng_live_stream->current_inactivity_ts;
412
413 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA &&
414 last_inact_ts == curr_inact_ts) {
415 ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
7cdc2bab
MD
416 print_stream_state(lttng_live_stream);
417 } else {
14f28187 418 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
7cdc2bab
MD
419 }
420 goto end;
421 }
422 lttng_live_stream->base_offset = index.offset;
423 lttng_live_stream->offset = index.offset;
424 lttng_live_stream->len = index.packet_size / CHAR_BIT;
425end:
14f28187
FD
426 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
427 ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
7cdc2bab
MD
428 }
429 return ret;
430}
431
432/*
14f28187
FD
433 * Creation of the message requires the ctf trace class to be created
434 * beforehand, but the live protocol gives us all streams (including metadata)
435 * at once. So we split it in three steps: getting streams, getting metadata
436 * (which creates the ctf trace class), and then creating the per-stream
437 * messages.
7cdc2bab
MD
438 */
439static
14f28187
FD
440enum lttng_live_iterator_status lttng_live_get_session(
441 struct lttng_live_msg_iter *lttng_live_msg_iter,
7cdc2bab
MD
442 struct lttng_live_session *session)
443{
1419db2b
FD
444 bt_logging_level log_level = lttng_live_msg_iter->log_level;
445 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
14f28187
FD
446 enum lttng_live_iterator_status status;
447 uint64_t trace_idx;
7cdc2bab 448
14f28187 449 if (!session->attached) {
eee8e741
FD
450 enum lttng_live_attach_session_status attach_status =
451 lttng_live_attach_session(session);
452 if (attach_status != LTTNG_LIVE_ATTACH_SESSION_STATUS_OK) {
0757916e 453 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
1419db2b
FD
454 /*
455 * Clear any causes appended in
456 * `lttng_live_attach_session()` as we want to
457 * return gracefully since the graph was
458 * cancelled.
459 */
460 bt_current_thread_clear_error();
14f28187
FD
461 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
462 } else {
463 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1419db2b
FD
464 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
465 "Error attaching to LTTng live session");
14f28187
FD
466 }
467 goto end;
4c66436f 468 }
7cdc2bab 469 }
14f28187 470
7cdc2bab 471 status = lttng_live_get_new_streams(session);
14f28187
FD
472 if (status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
473 status != LTTNG_LIVE_ITERATOR_STATUS_END) {
474 goto end;
7cdc2bab 475 }
c28512ab
FD
476 trace_idx = 0;
477 while (trace_idx < session->traces->len) {
14f28187
FD
478 struct lttng_live_trace *trace =
479 g_ptr_array_index(session->traces, trace_idx);
480
7cdc2bab 481 status = lttng_live_metadata_update(trace);
c28512ab
FD
482 switch (status) {
483 case LTTNG_LIVE_ITERATOR_STATUS_OK:
484 trace_idx++;
485 break;
486 case LTTNG_LIVE_ITERATOR_STATUS_END:
487 /*
488 * The trace has ended. Remove it of the array an
489 * continue the iteration.
490 * We can remove the trace safely when using the
491 * g_ptr_array_remove_index_fast because it replaces
492 * the element at trace_idx with the array's last
493 * element. trace_idx is not incremented because of
494 * that.
495 */
496 (void) g_ptr_array_remove_index_fast(session->traces,
497 trace_idx);
498 break;
499 default:
14f28187 500 goto end;
7cdc2bab
MD
501 }
502 }
14f28187
FD
503 status = lttng_live_lazy_msg_init(session);
504
505end:
506 return status;
7cdc2bab
MD
507}
508
509BT_HIDDEN
14f28187 510void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 511{
14f28187 512 uint64_t session_idx;
7cdc2bab 513
14f28187
FD
514 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
515 session_idx++) {
516 struct lttng_live_session *session =
517 g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
7cdc2bab
MD
518 session->new_streams_needed = true;
519 }
520}
521
522static
14f28187 523void lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 524{
14f28187 525 uint64_t session_idx, trace_idx;
7cdc2bab 526
14f28187
FD
527 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
528 session_idx++) {
529 struct lttng_live_session *session =
530 g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
7cdc2bab 531 session->new_streams_needed = true;
14f28187
FD
532 for (trace_idx = 0; trace_idx < session->traces->len;
533 trace_idx++) {
534 struct lttng_live_trace *trace =
535 g_ptr_array_index(session->traces, trace_idx);
7cdc2bab
MD
536 trace->new_metadata_needed = true;
537 }
538 }
539}
540
541static
14f28187
FD
542enum lttng_live_iterator_status
543lttng_live_iterator_handle_new_streams_and_metadata(
544 struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 545{
0f1979c3 546 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
1419db2b
FD
547 bt_logging_level log_level = lttng_live_msg_iter->log_level;
548 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
14f28187
FD
549 uint64_t session_idx = 0, nr_sessions_opened = 0;
550 struct lttng_live_session *session;
551 enum session_not_found_action sess_not_found_act =
552 lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
553
7cdc2bab 554 /*
14f28187 555 * In a remotely distant future, we could add a "new
7cdc2bab
MD
556 * session" flag to the protocol, which would tell us that we
557 * need to query for new sessions even though we have sessions
558 * currently ongoing.
559 */
14f28187
FD
560 if (lttng_live_msg_iter->sessions->len == 0) {
561 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
562 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
563 goto end;
564 } else {
565 /*
566 * Retry to create a viewer session for the requested
567 * session name.
568 */
569 if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
570 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1419db2b
FD
571 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
572 "Error creating LTTng live viewer session");
14f28187
FD
573 goto end;
574 }
575 }
7cdc2bab 576 }
14f28187
FD
577
578 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
579 session_idx++) {
580 session = g_ptr_array_index(lttng_live_msg_iter->sessions,
581 session_idx);
582 ret = lttng_live_get_session(lttng_live_msg_iter, session);
7cdc2bab 583 switch (ret) {
14f28187 584 case LTTNG_LIVE_ITERATOR_STATUS_OK:
7cdc2bab 585 break;
14f28187
FD
586 case LTTNG_LIVE_ITERATOR_STATUS_END:
587 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
588 break;
589 default:
590 goto end;
591 }
592 if (!session->closed) {
593 nr_sessions_opened++;
594 }
595 }
596end:
14f28187
FD
597 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK &&
598 sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE &&
599 nr_sessions_opened == 0) {
600 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
7cdc2bab
MD
601 }
602 return ret;
603}
604
605static
14f28187
FD
606enum lttng_live_iterator_status emit_inactivity_message(
607 struct lttng_live_msg_iter *lttng_live_msg_iter,
608 struct lttng_live_stream_iterator *stream_iter,
609 bt_message **message, uint64_t timestamp)
7cdc2bab 610{
0f1979c3 611 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
1419db2b
FD
612 bt_logging_level log_level = lttng_live_msg_iter->log_level;
613 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
14f28187 614 bt_message *msg = NULL;
7cdc2bab 615
14f28187
FD
616 BT_ASSERT(stream_iter->trace->clock_class);
617
618 msg = bt_message_message_iterator_inactivity_create(
0f1979c3
FD
619 lttng_live_msg_iter->self_msg_iter,
620 stream_iter->trace->clock_class, timestamp);
d6e69534 621 if (!msg) {
1419db2b
FD
622 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
623 "Error emitting message iterator inactivity message");
7cdc2bab
MD
624 goto error;
625 }
14f28187 626
d6e69534 627 *message = msg;
7cdc2bab 628end:
7cdc2bab
MD
629 return ret;
630
631error:
14f28187 632 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
d6e69534 633 bt_message_put_ref(msg);
7cdc2bab
MD
634 goto end;
635}
636
637static
14f28187
FD
638enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
639 struct lttng_live_msg_iter *lttng_live_msg_iter,
7cdc2bab 640 struct lttng_live_stream_iterator *lttng_live_stream,
14f28187 641 bt_message **message)
7cdc2bab 642{
0f1979c3 643 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
644
645 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
14f28187 646 return LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
647 }
648
14f28187
FD
649 if (lttng_live_stream->current_inactivity_ts ==
650 lttng_live_stream->last_inactivity_ts) {
7cdc2bab 651 lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
14f28187
FD
652 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
653 goto end;
654 }
655
656 ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream,
0f1979c3 657 message, lttng_live_stream->current_inactivity_ts);
14f28187
FD
658
659 lttng_live_stream->last_inactivity_ts =
0f1979c3 660 lttng_live_stream->current_inactivity_ts;
14f28187
FD
661end:
662 return ret;
663}
664
665static
666int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter,
667 struct lttng_live_msg_iter *lttng_live_msg_iter,
668 const bt_message *msg, int64_t last_msg_ts_ns,
669 int64_t *ts_ns)
670{
671 const bt_clock_class *clock_class = NULL;
672 const bt_clock_snapshot *clock_snapshot = NULL;
673 int ret = 0;
c01594de 674 bt_logging_level log_level = lttng_live_msg_iter->log_level;
2ece7dd0 675 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
14f28187 676
98b15851
PP
677 BT_ASSERT_DBG(msg);
678 BT_ASSERT_DBG(ts_ns);
14f28187 679
2ece7dd0 680 BT_COMP_LOGD("Getting message's timestamp: iter-data-addr=%p, msg-addr=%p, "
14f28187
FD
681 "last-msg-ts=%" PRId64, lttng_live_msg_iter, msg,
682 last_msg_ts_ns);
683
684 switch (bt_message_get_type(msg)) {
685 case BT_MESSAGE_TYPE_EVENT:
0f1979c3 686 clock_class = bt_message_event_borrow_stream_class_default_clock_class_const(
14f28187 687 msg);
98b15851 688 BT_ASSERT_DBG(clock_class);
14f28187 689
0cbc2c33
PP
690 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(
691 msg);
14f28187
FD
692 break;
693 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
0f1979c3 694 clock_class = bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
14f28187
FD
695 msg);
696 BT_ASSERT(clock_class);
697
0cbc2c33
PP
698 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
699 msg);
14f28187
FD
700 break;
701 case BT_MESSAGE_TYPE_PACKET_END:
0f1979c3 702 clock_class = bt_message_packet_end_borrow_stream_class_default_clock_class_const(
14f28187
FD
703 msg);
704 BT_ASSERT(clock_class);
705
0cbc2c33
PP
706 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(
707 msg);
14f28187
FD
708 break;
709 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
0f1979c3 710 clock_class = bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
14f28187
FD
711 msg);
712 BT_ASSERT(clock_class);
713
9b24b6aa 714 clock_snapshot = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
0cbc2c33 715 msg);
14f28187
FD
716 break;
717 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
0f1979c3 718 clock_class = bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
14f28187
FD
719 msg);
720 BT_ASSERT(clock_class);
721
9b24b6aa 722 clock_snapshot = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
0cbc2c33 723 msg);
14f28187 724 break;
14f28187 725 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
0f1979c3
FD
726 clock_snapshot = bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
727 msg);
14f28187
FD
728 break;
729 default:
730 /* All the other messages have a higher priority */
2ece7dd0 731 BT_COMP_LOGD_STR("Message has no timestamp: using the last message timestamp.");
14f28187 732 *ts_ns = last_msg_ts_ns;
7cdc2bab
MD
733 goto end;
734 }
735
14f28187 736 clock_class = bt_clock_snapshot_borrow_clock_class_const(clock_snapshot);
98b15851 737 BT_ASSERT_DBG(clock_class);
14f28187
FD
738
739 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
740 if (ret) {
1419db2b
FD
741 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
742 "Cannot get nanoseconds from Epoch of clock snapshot: "
14f28187
FD
743 "clock-snapshot-addr=%p", clock_snapshot);
744 goto error;
745 }
746
747 goto end;
748
14f28187
FD
749error:
750 ret = -1;
7cdc2bab 751
7cdc2bab 752end:
14f28187 753 if (ret == 0) {
2ece7dd0 754 BT_COMP_LOGD("Found message's timestamp: "
14f28187
FD
755 "iter-data-addr=%p, msg-addr=%p, "
756 "last-msg-ts=%" PRId64 ", ts=%" PRId64,
757 lttng_live_msg_iter, msg, last_msg_ts_ns, *ts_ns);
758 }
759
7cdc2bab
MD
760 return ret;
761}
762
763static
14f28187
FD
764enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
765 struct lttng_live_msg_iter *lttng_live_msg_iter,
7cdc2bab 766 struct lttng_live_stream_iterator *lttng_live_stream,
14f28187 767 bt_message **message)
7cdc2bab 768{
14f28187 769 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
c01594de 770 bt_logging_level log_level = lttng_live_msg_iter->log_level;
2ece7dd0 771 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
0f1979c3
FD
772 enum bt_msg_iter_status status;
773 uint64_t session_idx, trace_idx;
7cdc2bab 774
14f28187
FD
775 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
776 session_idx++) {
777 struct lttng_live_session *session =
778 g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
7cdc2bab
MD
779
780 if (session->new_streams_needed) {
14f28187
FD
781 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
782 goto end;
7cdc2bab 783 }
14f28187
FD
784 for (trace_idx = 0; trace_idx < session->traces->len;
785 trace_idx++) {
786 struct lttng_live_trace *trace =
787 g_ptr_array_index(session->traces, trace_idx);
7cdc2bab 788 if (trace->new_metadata_needed) {
14f28187
FD
789 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
790 goto end;
7cdc2bab
MD
791 }
792 }
793 }
794
795 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
14f28187
FD
796 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
797 goto end;
7cdc2bab 798 }
14f28187
FD
799
800 status = bt_msg_iter_get_next_message(lttng_live_stream->msg_iter,
801 lttng_live_msg_iter->self_msg_iter, message);
7cdc2bab 802 switch (status) {
d6e69534 803 case BT_MSG_ITER_STATUS_EOF:
14f28187 804 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
7cdc2bab 805 break;
d6e69534 806 case BT_MSG_ITER_STATUS_OK:
14f28187 807 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab 808 break;
d6e69534 809 case BT_MSG_ITER_STATUS_AGAIN:
7cdc2bab
MD
810 /*
811 * Continue immediately (end of packet). The next
812 * get_index may return AGAIN to delay the following
813 * attempt.
814 */
14f28187 815 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
7cdc2bab 816 break;
d6e69534 817 case BT_MSG_ITER_STATUS_INVAL:
7cdc2bab 818 /* No argument provided by the user, so don't return INVAL. */
d6e69534 819 case BT_MSG_ITER_STATUS_ERROR:
7cdc2bab 820 default:
14f28187 821 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1419db2b
FD
822 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
823 "CTF message iterator return an error or failed: "
824 "msg_iter=%p", lttng_live_stream->msg_iter);
7cdc2bab
MD
825 break;
826 }
14f28187
FD
827
828end:
7cdc2bab
MD
829 return ret;
830}
831
4a39caef
FD
832static
833enum lttng_live_iterator_status lttng_live_iterator_close_stream(
834 struct lttng_live_msg_iter *lttng_live_msg_iter,
835 struct lttng_live_stream_iterator *stream_iter,
836 bt_message **curr_msg)
837{
838 enum lttng_live_iterator_status live_status =
839 LTTNG_LIVE_ITERATOR_STATUS_OK;
1419db2b
FD
840 bt_logging_level log_level = lttng_live_msg_iter->log_level;
841 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
4a39caef
FD
842 /*
843 * The viewer has hung up on us so we are closing the stream. The
844 * `bt_msg_iter` should simply realize that it needs to close the
845 * stream properly by emitting the necessary stream end message.
846 */
0f1979c3
FD
847 enum bt_msg_iter_status status = bt_msg_iter_get_next_message(
848 stream_iter->msg_iter, lttng_live_msg_iter->self_msg_iter,
849 curr_msg);
4a39caef
FD
850
851 if (status == BT_MSG_ITER_STATUS_ERROR) {
1419db2b
FD
852 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
853 "Error getting the next message from CTF message iterator");
4a39caef
FD
854 live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
855 goto end;
856 }
857
858 BT_ASSERT(status == BT_MSG_ITER_STATUS_OK);
859
860end:
861 return live_status;
862}
863
7cdc2bab
MD
864/*
865 * helper function:
866 * handle_no_data_streams()
867 * retry:
868 * - for each ACTIVE_NO_DATA stream:
869 * - query relayd for stream data, or quiescence info.
870 * - if need metadata, get metadata, goto retry.
871 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
872 * - if quiescent, move to QUIESCENT streams
873 * - if fetched data, move to ACTIVE_DATA streams
874 * (at this point each stream either has data, or is quiescent)
875 *
876 *
877 * iterator_next:
878 * handle_new_streams_and_metadata()
879 * - query relayd for known streams, add them as ACTIVE_NO_DATA
880 * - query relayd for metadata
881 *
882 * call handle_active_no_data_streams()
883 *
884 * handle_quiescent_streams()
885 * - if at least one stream is ACTIVE_DATA:
886 * - peek stream event with lowest timestamp -> next_ts
887 * - for each quiescent stream
888 * - if next_ts >= quiescent end
889 * - set state to ACTIVE_NO_DATA
890 * - else
891 * - for each quiescent stream
892 * - set state to ACTIVE_NO_DATA
893 *
894 * call handle_active_no_data_streams()
895 *
896 * handle_active_data_streams()
897 * - if at least one stream is ACTIVE_DATA:
898 * - get stream event with lowest timestamp from heap
d6e69534 899 * - make that stream event the current message.
7cdc2bab
MD
900 * - move this stream heap position to its next event
901 * - if we need to fetch data from relayd, move
902 * stream to ACTIVE_NO_DATA.
903 * - return OK
904 * - return AGAIN
905 *
906 * end criterion: ctrl-c on client. If relayd exits or the session
907 * closes on the relay daemon side, we keep on waiting for streams.
908 * Eventually handle --end timestamp (also an end criterion).
909 *
910 * When disconnected from relayd: try to re-connect endlessly.
911 */
912static
a31f4d8b 913enum lttng_live_iterator_status lttng_live_iterator_next_msg_on_stream(
14f28187
FD
914 struct lttng_live_msg_iter *lttng_live_msg_iter,
915 struct lttng_live_stream_iterator *stream_iter,
916 bt_message **curr_msg)
7cdc2bab 917{
c01594de 918 bt_logging_level log_level = lttng_live_msg_iter->log_level;
2ece7dd0 919 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
14f28187 920 enum lttng_live_iterator_status live_status;
7cdc2bab 921
4a39caef
FD
922 if (stream_iter->has_stream_hung_up) {
923 /*
924 * The stream has hung up and the stream was properly closed
925 * during the last call to the current function. Return _END
926 * status now so that this stream iterator is removed for the
927 * stream iterator list.
928 */
929 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
930 goto end;
931 }
932
7cdc2bab
MD
933retry:
934 print_stream_state(stream_iter);
14f28187 935 live_status = lttng_live_iterator_handle_new_streams_and_metadata(
0f1979c3 936 lttng_live_msg_iter);
14f28187 937 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
7cdc2bab
MD
938 goto end;
939 }
14f28187 940 live_status = lttng_live_iterator_next_handle_one_no_data_stream(
0f1979c3 941 lttng_live_msg_iter, stream_iter);
4a39caef 942
14f28187 943 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
4a39caef
FD
944 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
945 /*
946 * We overwrite `live_status` since `curr_msg` is
947 * likely set to a valid message in this function.
948 */
949 live_status = lttng_live_iterator_close_stream(
950 lttng_live_msg_iter, stream_iter, curr_msg);
951 }
7cdc2bab
MD
952 goto end;
953 }
14f28187 954 live_status = lttng_live_iterator_next_handle_one_quiescent_stream(
0f1979c3 955 lttng_live_msg_iter, stream_iter, curr_msg);
14f28187 956 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
5084732e 957 BT_ASSERT(!*curr_msg);
7cdc2bab
MD
958 goto end;
959 }
14f28187 960 if (*curr_msg) {
7cdc2bab
MD
961 goto end;
962 }
14f28187 963 live_status = lttng_live_iterator_next_handle_one_active_data_stream(
0f1979c3 964 lttng_live_msg_iter, stream_iter, curr_msg);
14f28187 965 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
5084732e 966 BT_ASSERT(!*curr_msg);
7cdc2bab
MD
967 }
968
969end:
14f28187 970 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
7cdc2bab 971 goto retry;
7cdc2bab 972 }
14f28187
FD
973
974 return live_status;
7cdc2bab
MD
975}
976
977static
14f28187
FD
978enum lttng_live_iterator_status next_stream_iterator_for_trace(
979 struct lttng_live_msg_iter *lttng_live_msg_iter,
980 struct lttng_live_trace *live_trace,
a31f4d8b 981 struct lttng_live_stream_iterator **youngest_trace_stream_iter)
7cdc2bab 982{
a31f4d8b 983 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
0f1979c3
FD
984 bt_logging_level log_level = lttng_live_msg_iter->log_level;
985 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
14f28187 986 enum lttng_live_iterator_status stream_iter_status;;
a31f4d8b 987 int64_t youngest_candidate_msg_ts = INT64_MAX;
14f28187 988 uint64_t stream_iter_idx;
7cdc2bab 989
98b15851
PP
990 BT_ASSERT_DBG(live_trace);
991 BT_ASSERT_DBG(live_trace->stream_iterators);
14f28187
FD
992 /*
993 * Update the current message of every stream iterators of this trace.
994 * The current msg of every stream must have a timestamp equal or
995 * larger than the last message returned by this iterator. We must
996 * ensure monotonicity.
997 */
998 stream_iter_idx = 0;
999 while (stream_iter_idx < live_trace->stream_iterators->len) {
1000 bool stream_iter_is_ended = false;
1001 struct lttng_live_stream_iterator *stream_iter =
1002 g_ptr_array_index(live_trace->stream_iterators,
0f1979c3 1003 stream_iter_idx);
14f28187
FD
1004
1005 /*
1006 * Find if there is are now current message for this stream
1007 * iterator get it.
1008 */
1009 while (!stream_iter->current_msg) {
1010 bt_message *msg = NULL;
1011 int64_t curr_msg_ts_ns = INT64_MAX;
a31f4d8b 1012 stream_iter_status = lttng_live_iterator_next_msg_on_stream(
0f1979c3 1013 lttng_live_msg_iter, stream_iter, &msg);
14f28187 1014
2ece7dd0 1015 BT_COMP_LOGD("live stream iterator returned status :%s",
0f1979c3 1016 print_live_iterator_status(stream_iter_status));
14f28187
FD
1017 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1018 stream_iter_is_ended = true;
1019 break;
1020 }
1021
1022 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1023 goto end;
1024 }
1025
98b15851 1026 BT_ASSERT_DBG(msg);
14f28187
FD
1027
1028 /*
1029 * Get the timestamp in nanoseconds from origin of this
1030 * messsage.
1031 */
1032 live_get_msg_ts_ns(stream_iter, lttng_live_msg_iter,
1033 msg, lttng_live_msg_iter->last_msg_ts_ns,
1034 &curr_msg_ts_ns);
1035
1036 /*
1037 * Check if the message of the current live stream
1038 * iterator occured at the exact same time or after the
1039 * last message returned by this component's message
1040 * iterator. If not, we return an error.
1041 */
1042 if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
1043 stream_iter->current_msg = msg;
1044 stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
1045 } else {
1046 /*
1047 * We received a message in the past. To ensure
1048 * monotonicity, we can't send it forward.
1049 */
1419db2b
FD
1050 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1051 "Message's timestamp is less than "
14f28187
FD
1052 "lttng-live's message iterator's last "
1053 "returned timestamp: "
1054 "lttng-live-msg-iter-addr=%p, ts=%" PRId64 ", "
1055 "last-msg-ts=%" PRId64,
1056 lttng_live_msg_iter, curr_msg_ts_ns,
1057 lttng_live_msg_iter->last_msg_ts_ns);
1058 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1059 goto end;
1060 }
1061 }
1062
98b15851 1063 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
07afb6bc
FD
1064
1065 if (!stream_iter_is_ended) {
a31f4d8b
FD
1066 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1067 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
07afb6bc
FD
1068 /*
1069 * Update the current best candidate message
1070 * for the stream iterator of this live trace
1071 * to be forwarded downstream.
1072 */
a31f4d8b
FD
1073 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1074 youngest_candidate_stream_iter = stream_iter;
1075 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
07afb6bc
FD
1076 /*
1077 * Order the messages in an arbitrary but
1078 * deterministic way.
1079 */
98b15851 1080 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
07afb6bc
FD
1081 int ret = common_muxing_compare_messages(
1082 stream_iter->current_msg,
a31f4d8b 1083 youngest_candidate_stream_iter->current_msg);
07afb6bc
FD
1084 if (ret < 0) {
1085 /*
a31f4d8b 1086 * The `youngest_candidate_stream_iter->current_msg`
07afb6bc
FD
1087 * should go first. Update the next
1088 * iterator and the current timestamp.
1089 */
a31f4d8b
FD
1090 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1091 youngest_candidate_stream_iter = stream_iter;
07afb6bc
FD
1092 } else if (ret == 0) {
1093 /*
1094 * Unable to pick which one should go
1095 * first.
1096 */
1097 BT_COMP_LOGW("Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1098 "stream-iter-addr=%p"
1099 "stream-iter-addr=%p",
1100 stream_iter,
a31f4d8b 1101 youngest_candidate_stream_iter);
07afb6bc
FD
1102 }
1103 }
14f28187 1104
07afb6bc
FD
1105 stream_iter_idx++;
1106 } else {
14f28187 1107 /*
fba9b58e
JG
1108 * The live stream iterator has ended. That
1109 * iterator is removed from the array, but
1110 * there is no need to increment
1111 * stream_iter_idx as
1112 * g_ptr_array_remove_index_fast replaces the
1113 * removed element with the array's last
1114 * element.
14f28187 1115 */
fba9b58e
JG
1116 g_ptr_array_remove_index_fast(
1117 live_trace->stream_iterators,
14f28187 1118 stream_iter_idx);
14f28187
FD
1119 }
1120 }
1121
a31f4d8b
FD
1122 if (youngest_candidate_stream_iter) {
1123 *youngest_trace_stream_iter = youngest_candidate_stream_iter;
14f28187
FD
1124 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1125 } else {
1126 /*
1127 * The only case where we don't have a candidate for this trace
1128 * is if we reached the end of all the iterators.
1129 */
1130 BT_ASSERT(live_trace->stream_iterators->len == 0);
1131 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1132 }
1133
1134end:
1135 return stream_iter_status;
1136}
1137
1138static
1139enum lttng_live_iterator_status next_stream_iterator_for_session(
1140 struct lttng_live_msg_iter *lttng_live_msg_iter,
1141 struct lttng_live_session *session,
a31f4d8b 1142 struct lttng_live_stream_iterator **youngest_session_stream_iter)
14f28187 1143{
07afb6bc
FD
1144 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
1145 bt_logging_level log_level = lttng_live_msg_iter->log_level;
14f28187
FD
1146 enum lttng_live_iterator_status stream_iter_status;
1147 uint64_t trace_idx = 0;
a31f4d8b
FD
1148 int64_t youngest_candidate_msg_ts = INT64_MAX;
1149 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
14f28187
FD
1150
1151 /*
1152 * Make sure we are attached to the session and look for new streams
1153 * and metadata.
1154 */
1155 stream_iter_status = lttng_live_get_session(lttng_live_msg_iter, session);
1156 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
1157 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE &&
1158 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) {
7cdc2bab
MD
1159 goto end;
1160 }
14f28187 1161
98b15851 1162 BT_ASSERT_DBG(session->traces);
14f28187 1163
14f28187
FD
1164 while (trace_idx < session->traces->len) {
1165 bool trace_is_ended = false;
1166 struct lttng_live_stream_iterator *stream_iter;
1167 struct lttng_live_trace *trace =
1168 g_ptr_array_index(session->traces, trace_idx);
1169
1170 stream_iter_status = next_stream_iterator_for_trace(
1171 lttng_live_msg_iter, trace, &stream_iter);
1172 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1173 /*
1174 * All the live stream iterators for this trace are
1175 * ENDed. Remove the trace from this session.
1176 */
1177 trace_is_ended = true;
1178 } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1179 goto end;
1180 }
1181
1182 if (!trace_is_ended) {
98b15851 1183 BT_ASSERT_DBG(stream_iter);
14f28187 1184
a31f4d8b
FD
1185 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1186 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1187 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1188 youngest_candidate_stream_iter = stream_iter;
1189 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
07afb6bc
FD
1190 /*
1191 * Order the messages in an arbitrary but
1192 * deterministic way.
1193 */
1194 int ret = common_muxing_compare_messages(
1195 stream_iter->current_msg,
a31f4d8b 1196 youngest_candidate_stream_iter->current_msg);
07afb6bc
FD
1197 if (ret < 0) {
1198 /*
a31f4d8b 1199 * The `youngest_candidate_stream_iter->current_msg`
07afb6bc
FD
1200 * should go first. Update the next iterator
1201 * and the current timestamp.
1202 */
a31f4d8b
FD
1203 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1204 youngest_candidate_stream_iter = stream_iter;
07afb6bc
FD
1205 } else if (ret == 0) {
1206 /* Unable to pick which one should go first. */
1207 BT_COMP_LOGW("Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1208 "stream-iter-addr=%p" "stream-iter-addr=%p",
a31f4d8b 1209 stream_iter, youngest_candidate_stream_iter);
07afb6bc 1210 }
14f28187
FD
1211 }
1212 trace_idx++;
1213 } else {
ba90bce7
JG
1214 /*
1215 * trace_idx is not incremented since
1216 * g_ptr_array_remove_index_fast replaces the
1217 * element at trace_idx with the array's last element.
1218 */
1219 g_ptr_array_remove_index_fast(session->traces,
1220 trace_idx);
14f28187
FD
1221 }
1222 }
a31f4d8b
FD
1223 if (youngest_candidate_stream_iter) {
1224 *youngest_session_stream_iter = youngest_candidate_stream_iter;
14f28187 1225 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab 1226 } else {
14f28187
FD
1227 /*
1228 * The only cases where we don't have a candidate for this
1229 * trace is:
1230 * 1. if we reached the end of all the iterators of all the
1231 * traces of this session,
1232 * 2. if we never had live stream iterator in the first place.
1233 *
1234 * In either cases, we return END.
1235 */
1236 BT_ASSERT(session->traces->len == 0);
1237 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
7cdc2bab
MD
1238 }
1239end:
14f28187
FD
1240 return stream_iter_status;
1241}
1242
1243static inline
1244void put_messages(bt_message_array_const msgs, uint64_t count)
1245{
1246 uint64_t i;
1247
1248 for (i = 0; i < count; i++) {
1249 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
7cdc2bab 1250 }
7cdc2bab
MD
1251}
1252
d3eb6e8f 1253BT_HIDDEN
d24d5663 1254bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next(
14f28187
FD
1255 bt_self_message_iterator *self_msg_it,
1256 bt_message_array_const msgs, uint64_t capacity,
1257 uint64_t *count)
d3eb6e8f 1258{
d24d5663 1259 bt_component_class_message_iterator_next_method_status status;
14f28187
FD
1260 struct lttng_live_msg_iter *lttng_live_msg_iter =
1261 bt_self_message_iterator_get_data(self_msg_it);
1262 struct lttng_live_component *lttng_live =
1263 lttng_live_msg_iter->lttng_live_comp;
376fc2bd
FD
1264 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
1265 bt_logging_level log_level = lttng_live_msg_iter->log_level;
14f28187
FD
1266 enum lttng_live_iterator_status stream_iter_status;
1267 uint64_t session_idx;
1268
1269 *count = 0;
1270
98b15851 1271 BT_ASSERT_DBG(lttng_live_msg_iter);
14f28187
FD
1272
1273 /*
1274 * Clear all the invalid message reference that might be left over in
1275 * the output array.
1276 */
1277 memset(msgs, 0, capacity * sizeof(*msgs));
1278
1279 /*
1280 * If no session are exposed on the relay found at the url provided by
1281 * the user, session count will be 0. In this case, we return status
1282 * end to return gracefully.
1283 */
1284 if (lttng_live_msg_iter->sessions->len == 0) {
1285 if (lttng_live->params.sess_not_found_act !=
1286 SESSION_NOT_FOUND_ACTION_CONTINUE) {
d24d5663 1287 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END;
14f28187
FD
1288 goto no_session;
1289 } else {
1290 /*
1291 * The are no more active session for this session
1292 * name. Retry to create a viewer session for the
1293 * requested session name.
1294 */
1295 if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
d24d5663 1296 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
14f28187
FD
1297 goto no_session;
1298 }
1299 }
1300 }
1301
1302 if (lttng_live_msg_iter->active_stream_iter == 0) {
1303 lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter);
1304 }
1305
1306 /*
1307 * Here the muxing of message is done.
1308 *
1309 * We need to iterate over all the streams of all the traces of all the
1310 * viewer sessions in order to get the message with the smallest
1311 * timestamp. In this case, a session is a viewer session and there is
1312 * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
1313 * kernel). Each viewer session can have multiple traces, for example,
1314 * 64bit UST viewer sessions could have multiple per-pid traces.
1315 *
1316 * We iterate over the streams of each traces to update and see what is
1317 * their next message's timestamp. From those timestamps, we select the
1318 * message with the smallest timestamp as the best candidate message
1319 * for that trace and do the same thing across all the sessions.
1320 *
1321 * We then compare the timestamp of best candidate message of all the
1322 * sessions to pick the message with the smallest timestamp and we
1323 * return it.
1324 */
1325 while (*count < capacity) {
a31f4d8b
FD
1326 struct lttng_live_stream_iterator *youngest_stream_iter = NULL,
1327 *candidate_stream_iter = NULL;
1328 int64_t youngest_msg_ts_ns = INT64_MAX;
14f28187 1329
98b15851 1330 BT_ASSERT_DBG(lttng_live_msg_iter->sessions);
14f28187 1331 session_idx = 0;
14f28187
FD
1332 while (session_idx < lttng_live_msg_iter->sessions->len) {
1333 struct lttng_live_session *session =
1334 g_ptr_array_index(lttng_live_msg_iter->sessions,
1335 session_idx);
1336
1337 /* Find the best candidate message to send downstream. */
1338 stream_iter_status = next_stream_iterator_for_session(
1339 lttng_live_msg_iter, session,
1340 &candidate_stream_iter);
1341
1342 /* If we receive an END status, it means that either:
1343 * - Those traces never had active streams (UST with no
1344 * data produced yet),
1345 * - All live stream iterators have ENDed.*/
1346 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1347 if (session->closed && session->traces->len == 0) {
1348 /*
7d632b96
JG
1349 * Remove the session from the list.
1350 * session_idx is not modified since
1351 * g_ptr_array_remove_index_fast
1352 * replaces the the removed element with
1353 * the array's last element.
14f28187
FD
1354 */
1355 g_ptr_array_remove_index_fast(
1356 lttng_live_msg_iter->sessions,
1357 session_idx);
14f28187
FD
1358 } else {
1359 session_idx++;
1360 }
1361 continue;
1362 }
1363
1364 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1365 goto end;
1366 }
1367
a31f4d8b 1368 if (G_UNLIKELY(youngest_stream_iter == NULL) ||
1d314141 1369 candidate_stream_iter->current_msg_ts_ns < youngest_msg_ts_ns) {
a31f4d8b
FD
1370 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1371 youngest_stream_iter = candidate_stream_iter;
1372 } else if (candidate_stream_iter->current_msg_ts_ns == youngest_msg_ts_ns) {
376fc2bd
FD
1373 /*
1374 * The currently selected message to be sent
1375 * downstream next has the exact same timestamp
1376 * that of the current candidate message. We
1377 * must break the tie in a predictable manner.
1378 */
1379 BT_COMP_LOGD_STR("Two of the next message candidates have the same timestamps, pick one deterministically.");
1380 /*
1381 * Order the messages in an arbitrary but
07afb6bc 1382 * deterministic way.
376fc2bd 1383 */
07afb6bc
FD
1384 int ret = common_muxing_compare_messages(
1385 candidate_stream_iter->current_msg,
a31f4d8b 1386 youngest_stream_iter->current_msg);
376fc2bd
FD
1387 if (ret < 0) {
1388 /*
1389 * The `candidate_stream_iter->current_msg`
1390 * should go first. Update the next
1391 * iterator and the current timestamp.
1392 */
a31f4d8b
FD
1393 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1394 youngest_stream_iter = candidate_stream_iter;
376fc2bd
FD
1395 } else if (ret == 0) {
1396 /* Unable to pick which one should go first. */
1397 BT_COMP_LOGW("Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1398 "next-stream-iter-addr=%p" "candidate-stream-iter-addr=%p",
a31f4d8b 1399 youngest_stream_iter, candidate_stream_iter);
376fc2bd 1400 }
14f28187
FD
1401 }
1402
1403 session_idx++;
1404 }
1405
a31f4d8b 1406 if (!youngest_stream_iter) {
14f28187
FD
1407 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1408 goto end;
1409 }
1410
98b15851 1411 BT_ASSERT_DBG(youngest_stream_iter->current_msg);
14f28187 1412 /* Ensure monotonicity. */
98b15851 1413 BT_ASSERT_DBG(lttng_live_msg_iter->last_msg_ts_ns <=
a31f4d8b 1414 youngest_stream_iter->current_msg_ts_ns);
14f28187
FD
1415
1416 /*
1417 * Insert the next message to the message batch. This will set
1418 * stream iterator current messsage to NULL so that next time
1419 * we fetch the next message of that stream iterator
1420 */
a31f4d8b 1421 BT_MESSAGE_MOVE_REF(msgs[*count], youngest_stream_iter->current_msg);
14f28187
FD
1422 (*count)++;
1423
1424 /* Update the last timestamp in nanoseconds sent downstream. */
a31f4d8b
FD
1425 lttng_live_msg_iter->last_msg_ts_ns = youngest_msg_ts_ns;
1426 youngest_stream_iter->current_msg_ts_ns = INT64_MAX;
14f28187
FD
1427
1428 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1429 }
1430end:
1431 switch (stream_iter_status) {
1432 case LTTNG_LIVE_ITERATOR_STATUS_OK:
1433 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
1434 if (*count > 0) {
1435 /*
1436 * We received a again status but we have some messages
1437 * to send downstream. We send them and return OK for
1438 * now. On the next call we return again if there are
1439 * still no new message to send.
1440 */
d24d5663 1441 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
14f28187 1442 } else {
d24d5663 1443 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_AGAIN;
14f28187 1444 }
7cdc2bab 1445 break;
14f28187 1446 case LTTNG_LIVE_ITERATOR_STATUS_END:
d24d5663 1447 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END;
7cdc2bab 1448 break;
14f28187 1449 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
d24d5663 1450 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR;
14f28187
FD
1451 break;
1452 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
1453 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
1454 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
d24d5663 1455 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
14f28187
FD
1456 /* Put all existing messages on error. */
1457 put_messages(msgs, *count);
7cdc2bab 1458 break;
14f28187
FD
1459 default:
1460 abort();
7cdc2bab 1461 }
14f28187
FD
1462
1463no_session:
1464 return status;
7cdc2bab 1465}
41a2b7ae 1466
7cdc2bab 1467BT_HIDDEN
21a9f056 1468bt_component_class_message_iterator_initialize_method_status lttng_live_msg_iter_init(
14f28187 1469 bt_self_message_iterator *self_msg_it,
8d8b141d 1470 bt_self_message_iterator_configuration *config,
14f28187
FD
1471 bt_self_component_source *self_comp_src,
1472 bt_self_component_port_output *self_port)
7cdc2bab 1473{
21a9f056
FD
1474 bt_component_class_message_iterator_initialize_method_status ret =
1475 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_OK;
14f28187 1476 bt_self_component *self_comp =
d24d5663 1477 bt_self_component_source_as_self_component(self_comp_src);
14f28187
FD
1478 struct lttng_live_component *lttng_live;
1479 struct lttng_live_msg_iter *lttng_live_msg_iter;
c01594de 1480 bt_logging_level log_level;
14f28187
FD
1481
1482 BT_ASSERT(self_msg_it);
1483
1484 lttng_live = bt_self_component_get_data(self_comp);
c01594de 1485 log_level = lttng_live->log_level;
2ece7dd0 1486 self_comp = lttng_live->self_comp;
14f28187
FD
1487
1488 /* There can be only one downstream iterator at the same time. */
1489 BT_ASSERT(!lttng_live->has_msg_iter);
1490 lttng_live->has_msg_iter = true;
1491
1492 lttng_live_msg_iter = g_new0(struct lttng_live_msg_iter, 1);
1493 if (!lttng_live_msg_iter) {
21a9f056 1494 ret = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR;
14f28187 1495 goto end;
7cdc2bab 1496 }
14f28187 1497
c01594de 1498 lttng_live_msg_iter->log_level = lttng_live->log_level;
2ece7dd0 1499 lttng_live_msg_iter->self_comp = lttng_live->self_comp;
14f28187
FD
1500 lttng_live_msg_iter->lttng_live_comp = lttng_live;
1501 lttng_live_msg_iter->self_msg_iter = self_msg_it;
1502
1503 lttng_live_msg_iter->active_stream_iter = 0;
1504 lttng_live_msg_iter->last_msg_ts_ns = INT64_MIN;
1505 lttng_live_msg_iter->sessions = g_ptr_array_new_with_free_func(
1506 (GDestroyNotify) lttng_live_destroy_session);
1507 BT_ASSERT(lttng_live_msg_iter->sessions);
1508
1509 lttng_live_msg_iter->viewer_connection =
1510 live_viewer_connection_create(lttng_live->params.url->str, false,
1419db2b 1511 lttng_live_msg_iter, self_comp, NULL, log_level);
14f28187
FD
1512 if (!lttng_live_msg_iter->viewer_connection) {
1513 goto error;
1514 }
1515
1516 if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
1517 goto error;
1518 }
1519 if (lttng_live_msg_iter->sessions->len == 0) {
1520 switch (lttng_live->params.sess_not_found_act) {
1521 case SESSION_NOT_FOUND_ACTION_CONTINUE:
2ece7dd0 1522 BT_COMP_LOGI("Unable to connect to the requested live viewer "
14f28187
FD
1523 "session. Keep trying to connect because of "
1524 "%s=\"%s\" component parameter: url=\"%s\"",
1525 SESS_NOT_FOUND_ACTION_PARAM,
1526 SESS_NOT_FOUND_ACTION_CONTINUE_STR,
1527 lttng_live->params.url->str);
1528 break;
1529 case SESSION_NOT_FOUND_ACTION_FAIL:
1419db2b 1530 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Unable to connect to the requested live viewer "
14f28187
FD
1531 "session. Fail the message iterator"
1532 "initialization because of %s=\"%s\" "
1533 "component parameter: url =\"%s\"",
1534 SESS_NOT_FOUND_ACTION_PARAM,
1535 SESS_NOT_FOUND_ACTION_FAIL_STR,
1536 lttng_live->params.url->str);
7cdc2bab 1537 goto error;
14f28187 1538 case SESSION_NOT_FOUND_ACTION_END:
2ece7dd0 1539 BT_COMP_LOGI("Unable to connect to the requested live viewer "
14f28187
FD
1540 "session. End gracefully at the first _next() "
1541 "call because of %s=\"%s\" component parameter: "
1542 "url=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM,
1543 SESS_NOT_FOUND_ACTION_END_STR,
1544 lttng_live->params.url->str);
1545 break;
bce81ff2 1546 default:
80aff5ef 1547 abort();
7cdc2bab 1548 }
7cdc2bab
MD
1549 }
1550
14f28187
FD
1551 bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
1552
1553 goto end;
1554error:
21a9f056 1555 ret = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR;
14f28187 1556 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
7cdc2bab 1557end:
41a2b7ae 1558 return ret;
7cdc2bab
MD
1559}
1560
80aff5ef
SM
1561static struct bt_param_validation_map_value_entry_descr list_sessions_params[] = {
1562 { URL_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY, { .type = BT_VALUE_TYPE_STRING } },
1563 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END
1564};
1565
7cdc2bab 1566static
d24d5663
PP
1567bt_component_class_query_method_status lttng_live_query_list_sessions(
1568 const bt_value *params, const bt_value **result,
80aff5ef 1569 bt_self_component_class *self_comp_class,
d24d5663 1570 bt_logging_level log_level)
7cdc2bab 1571{
80aff5ef 1572 bt_component_class_query_method_status status;
14f28187 1573 const bt_value *url_value = NULL;
7cdc2bab 1574 const char *url;
14f28187 1575 struct live_viewer_connection *viewer_connection = NULL;
80aff5ef
SM
1576 enum bt_param_validation_status validation_status;
1577 gchar *validate_error = NULL;
7cdc2bab 1578
80aff5ef
SM
1579 validation_status = bt_param_validation_validate(params,
1580 list_sessions_params, &validate_error);
1581 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
1582 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
7cdc2bab 1583 goto error;
80aff5ef 1584 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
a635e507 1585 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1419db2b
FD
1586 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "%s",
1587 validate_error);
7cdc2bab
MD
1588 goto error;
1589 }
1590
80aff5ef 1591 url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
14f28187
FD
1592 url = bt_value_string_get(url_value);
1593
550004b4 1594 viewer_connection = live_viewer_connection_create(url, true, NULL,
1419db2b 1595 NULL, self_comp_class, log_level);
7cdc2bab 1596 if (!viewer_connection) {
1419db2b
FD
1597 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
1598 "Failed to create viewer connection");
80aff5ef 1599 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
7cdc2bab
MD
1600 goto error;
1601 }
1602
14f28187 1603 status = live_viewer_connection_list_sessions(viewer_connection,
80aff5ef 1604 result);
d24d5663 1605 if (status != BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK) {
1419db2b
FD
1606 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
1607 "Failed to list viewer sessions");
c7eee084
PP
1608 goto error;
1609 }
1610
7cdc2bab 1611 goto end;
c7eee084 1612
7cdc2bab 1613error:
14f28187 1614 BT_VALUE_PUT_REF_AND_RESET(*result);
c7eee084 1615
14f28187 1616 if (status >= 0) {
d24d5663 1617 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
c7eee084
PP
1618 }
1619
7cdc2bab
MD
1620end:
1621 if (viewer_connection) {
14f28187 1622 live_viewer_connection_destroy(viewer_connection);
7cdc2bab 1623 }
80aff5ef
SM
1624
1625 g_free(validate_error);
1626
14f28187 1627 return status;
7cdc2bab
MD
1628}
1629
312df793
PP
1630static
1631bt_component_class_query_method_status lttng_live_query_support_info(
1632 const bt_value *params, const bt_value **result,
1419db2b 1633 bt_self_component_class *self_comp_class,
312df793
PP
1634 bt_logging_level log_level)
1635{
1636 bt_component_class_query_method_status status =
1637 BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
1638 const bt_value *input_type_value;
1639 const bt_value *input_value;
1640 double weight = 0;
1641 struct bt_common_lttng_live_url_parts parts = { 0 };
1642
1643 /* Used by the logging macros */
1644 __attribute__((unused)) bt_self_component *self_comp = NULL;
1645
1646 *result = NULL;
1647 input_type_value = bt_value_map_borrow_entry_value_const(params,
1648 "type");
1649 if (!input_type_value) {
1419db2b
FD
1650 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
1651 "Missing expected `type` parameter.");
312df793
PP
1652 goto error;
1653 }
1654
1655 if (!bt_value_is_string(input_type_value)) {
1419db2b
FD
1656 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
1657 "`type` parameter is not a string value.");
312df793
PP
1658 goto error;
1659 }
1660
1661 if (strcmp(bt_value_string_get(input_type_value), "string") != 0) {
1662 /* We don't handle file system paths */
1663 goto create_result;
1664 }
1665
1666 input_value = bt_value_map_borrow_entry_value_const(params, "input");
1667 if (!input_value) {
1419db2b
FD
1668 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
1669 "Missing expected `input` parameter.");
312df793
PP
1670 goto error;
1671 }
1672
1673 if (!bt_value_is_string(input_value)) {
1419db2b
FD
1674 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
1675 "`input` parameter is not a string value.");
312df793
PP
1676 goto error;
1677 }
1678
1679 parts = bt_common_parse_lttng_live_url(bt_value_string_get(input_value),
1680 NULL, 0);
1681 if (parts.session_name) {
1682 /*
1683 * Looks pretty much like an LTTng live URL: we got the
1684 * session name part, which forms a complete URL.
1685 */
1686 weight = .75;
1687 }
1688
1689create_result:
1690 *result = bt_value_real_create_init(weight);
1691 if (!*result) {
1692 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1693 goto error;
1694 }
1695
1696 goto end;
1697
1698error:
1699 if (status >= 0) {
1700 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1701 }
1702
1703 BT_ASSERT(!*result);
1704
1705end:
1706 bt_common_destroy_lttng_live_url_parts(&parts);
1707 return status;
1708}
1709
7cdc2bab 1710BT_HIDDEN
d24d5663
PP
1711bt_component_class_query_method_status lttng_live_query(
1712 bt_self_component_class_source *comp_class,
3c729b9a 1713 bt_private_query_executor *priv_query_exec,
14f28187 1714 const char *object, const bt_value *params,
7c14d641 1715 __attribute__((unused)) void *method_data,
3c729b9a 1716 const bt_value **result)
7cdc2bab 1717{
d24d5663
PP
1718 bt_component_class_query_method_status status =
1719 BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
2ece7dd0 1720 bt_self_component *self_comp = NULL;
80aff5ef
SM
1721 bt_self_component_class *self_comp_class =
1722 bt_self_component_class_source_as_self_component_class(comp_class);
3c729b9a
PP
1723 bt_logging_level log_level = bt_query_executor_get_logging_level(
1724 bt_private_query_executor_as_query_executor_const(
1725 priv_query_exec));
c7eee084 1726
7cdc2bab 1727 if (strcmp(object, "sessions") == 0) {
c01594de 1728 status = lttng_live_query_list_sessions(params, result,
80aff5ef 1729 self_comp_class, log_level);
312df793
PP
1730 } else if (strcmp(object, "babeltrace.support-info") == 0) {
1731 status = lttng_live_query_support_info(params, result,
1419db2b 1732 self_comp_class, log_level);
14f28187 1733 } else {
32a09ecd 1734 BT_COMP_LOGI("Unknown query object `%s`", object);
76b6c2f7 1735 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT;
14f28187 1736 goto end;
7cdc2bab 1737 }
14f28187
FD
1738
1739end:
1740 return status;
7cdc2bab
MD
1741}
1742
1743static
1744void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
1745{
19bdff20
FD
1746 if (!lttng_live) {
1747 return;
1748 }
14f28187
FD
1749 if (lttng_live->params.url) {
1750 g_string_free(lttng_live->params.url, TRUE);
7cdc2bab
MD
1751 }
1752 g_free(lttng_live);
1753}
1754
1755BT_HIDDEN
14f28187 1756void lttng_live_component_finalize(bt_self_component_source *component)
7cdc2bab 1757{
14f28187 1758 void *data = bt_self_component_get_data(
0f1979c3 1759 bt_self_component_source_as_self_component(component));
7cdc2bab
MD
1760
1761 if (!data) {
1762 return;
1763 }
1764 lttng_live_component_destroy_data(data);
1765}
1766
1767static
14f28187
FD
1768enum session_not_found_action parse_session_not_found_action_param(
1769 const bt_value *no_session_param)
1770{
1771 enum session_not_found_action action;
80aff5ef
SM
1772 const char *no_session_act_str = bt_value_string_get(no_session_param);
1773
14f28187
FD
1774 if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) {
1775 action = SESSION_NOT_FOUND_ACTION_CONTINUE;
1776 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) {
1777 action = SESSION_NOT_FOUND_ACTION_FAIL;
14f28187 1778 } else {
80aff5ef
SM
1779 BT_ASSERT(strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0);
1780 action = SESSION_NOT_FOUND_ACTION_END;
14f28187
FD
1781 }
1782
1783 return action;
1784}
1785
80aff5ef
SM
1786static struct bt_param_validation_value_descr inputs_elem_descr = {
1787 .type = BT_VALUE_TYPE_STRING,
1788};
1789
1790static const char *sess_not_found_action_choices[] = {
1791 SESS_NOT_FOUND_ACTION_CONTINUE_STR,
1792 SESS_NOT_FOUND_ACTION_FAIL_STR,
1793 SESS_NOT_FOUND_ACTION_END_STR,
1794};
1795
1796static struct bt_param_validation_map_value_entry_descr params_descr[] = {
1797 { INPUTS_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY, { BT_VALUE_TYPE_ARRAY, .array = {
1798 .min_length = 1,
1799 .max_length = 1,
1800 .element_type = &inputs_elem_descr,
1801 } } },
1802 { SESS_NOT_FOUND_ACTION_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { BT_VALUE_TYPE_STRING, .string = {
1803 .choices = sess_not_found_action_choices,
1804 } } },
1805 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END
1806};
1807
70384700 1808static
80aff5ef
SM
1809bt_component_class_initialize_method_status lttng_live_component_create(
1810 const bt_value *params,
1811 bt_logging_level log_level,
1812 bt_self_component *self_comp,
1813 struct lttng_live_component **component)
7cdc2bab 1814{
80aff5ef 1815 struct lttng_live_component *lttng_live = NULL;
312df793
PP
1816 const bt_value *inputs_value;
1817 const bt_value *url_value;
1818 const bt_value *value;
7cdc2bab 1819 const char *url;
80aff5ef
SM
1820 enum bt_param_validation_status validation_status;
1821 gchar *validation_error = NULL;
1822 bt_component_class_initialize_method_status status;
1823
1824 validation_status = bt_param_validation_validate(params, params_descr,
1825 &validation_error);
1826 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
1827 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1828 goto error;
1829 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
1830 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "%s", validation_error);
1831 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1832 goto error;
1833 }
7cdc2bab
MD
1834
1835 lttng_live = g_new0(struct lttng_live_component, 1);
1836 if (!lttng_live) {
80aff5ef 1837 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
7cdc2bab
MD
1838 goto end;
1839 }
c01594de 1840 lttng_live->log_level = log_level;
2ece7dd0 1841 lttng_live->self_comp = self_comp;
7cdc2bab 1842 lttng_live->max_query_size = MAX_QUERY_SIZE;
14f28187
FD
1843 lttng_live->has_msg_iter = false;
1844
80aff5ef
SM
1845 inputs_value =
1846 bt_value_map_borrow_entry_value_const(params, INPUTS_PARAM);
1847 url_value =
1848 bt_value_array_borrow_element_by_index_const(inputs_value, 0);
312df793 1849 url = bt_value_string_get(url_value);
80aff5ef 1850
14f28187
FD
1851 lttng_live->params.url = g_string_new(url);
1852 if (!lttng_live->params.url) {
80aff5ef 1853 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
7cdc2bab
MD
1854 goto error;
1855 }
14f28187
FD
1856
1857 value = bt_value_map_borrow_entry_value_const(params,
1858 SESS_NOT_FOUND_ACTION_PARAM);
312df793 1859 if (value) {
14f28187
FD
1860 lttng_live->params.sess_not_found_act =
1861 parse_session_not_found_action_param(value);
14f28187 1862 } else {
312df793
PP
1863 BT_COMP_LOGI("Optional `%s` parameter is missing: "
1864 "defaulting to `%s`.",
14f28187
FD
1865 SESS_NOT_FOUND_ACTION_PARAM,
1866 SESS_NOT_FOUND_ACTION_CONTINUE_STR);
1867 lttng_live->params.sess_not_found_act =
1868 SESSION_NOT_FOUND_ACTION_CONTINUE;
7cdc2bab 1869 }
4c66436f 1870
80aff5ef 1871 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
7cdc2bab
MD
1872 goto end;
1873
1874error:
1875 lttng_live_component_destroy_data(lttng_live);
1876 lttng_live = NULL;
1877end:
80aff5ef
SM
1878 g_free(validation_error);
1879
1880 *component = lttng_live;
1881 return status;
d3e4dcd8
PP
1882}
1883
f3bc2010 1884BT_HIDDEN
21a9f056 1885bt_component_class_initialize_method_status lttng_live_component_init(
c01594de 1886 bt_self_component_source *self_comp_src,
59225a3e
SM
1887 bt_self_component_source_configuration *config,
1888 const bt_value *params,
1889 __attribute__((unused)) void *init_method_data)
f3bc2010 1890{
7cdc2bab 1891 struct lttng_live_component *lttng_live;
80aff5ef 1892 bt_component_class_initialize_method_status ret;
c01594de
PP
1893 bt_self_component *self_comp =
1894 bt_self_component_source_as_self_component(self_comp_src);
1895 bt_logging_level log_level = bt_component_get_logging_level(
1896 bt_self_component_as_component(self_comp));
d24d5663 1897 bt_self_component_add_port_status add_port_status;
7cdc2bab 1898
80aff5ef
SM
1899 ret = lttng_live_component_create(params, log_level, self_comp, &lttng_live);
1900 if (ret != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
19bdff20 1901 goto error;
7cdc2bab
MD
1902 }
1903
d24d5663
PP
1904 add_port_status = bt_self_component_source_add_output_port(
1905 self_comp_src, "out", NULL, NULL);
80aff5ef
SM
1906 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
1907 ret = (int) add_port_status;
1908 goto end;
147337a3 1909 }
7cdc2bab 1910
c01594de 1911 bt_self_component_set_data(self_comp, lttng_live);
19bdff20 1912 goto end;
7cdc2bab 1913
19bdff20
FD
1914error:
1915 lttng_live_component_destroy_data(lttng_live);
1916 lttng_live = NULL;
7cdc2bab
MD
1917end:
1918 return ret;
d85ef162 1919}
This page took 0.164595 seconds and 4 git commands to generate.