Cleanup: src.ctf.lttng-live: remove usage of `bt_object`
[babeltrace.git] / src / plugins / ctf / lttng-live / lttng-live.c
1 /*
2 * lttng-live.c
3 *
4 * Babeltrace CTF LTTng-live Client Component
5 *
6 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
7 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
8 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
9 *
10 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
11 *
12 * Permission is hereby granted, free of charge, to any person obtaining a copy
13 * of this software and associated documentation files (the "Software"), to deal
14 * in the Software without restriction, including without limitation the rights
15 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
16 * copies of the Software, and to permit persons to whom the Software is
17 * furnished to do so, subject to the following conditions:
18 *
19 * The above copyright notice and this permission notice shall be included in
20 * all copies or substantial portions of the Software.
21 *
22 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
25 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
27 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
28 * SOFTWARE.
29 */
30
31 #define BT_COMP_LOG_SELF_COMP self_comp
32 #define BT_LOG_OUTPUT_LEVEL log_level
33 #define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE"
34 #include "logging/comp-logging.h"
35
36 #include <glib.h>
37 #include <inttypes.h>
38 #include <stdbool.h>
39 #include <unistd.h>
40
41 #include "common/assert.h"
42 #include <babeltrace2/babeltrace.h>
43 #include "compat/compiler.h"
44 #include <babeltrace2/types.h>
45
46 #include "plugins/common/muxing/muxing.h"
47 #include "plugins/common/param-validation/param-validation.h"
48
49 #include "data-stream.h"
50 #include "metadata.h"
51 #include "lttng-live.h"
52
53 #define MAX_QUERY_SIZE (256*1024)
54 #define URL_PARAM "url"
55 #define INPUTS_PARAM "inputs"
56 #define SESS_NOT_FOUND_ACTION_PARAM "session-not-found-action"
57 #define SESS_NOT_FOUND_ACTION_CONTINUE_STR "continue"
58 #define SESS_NOT_FOUND_ACTION_FAIL_STR "fail"
59 #define SESS_NOT_FOUND_ACTION_END_STR "end"
60
61 #define print_dbg(fmt, ...) BT_COMP_LOGD(fmt, ## __VA_ARGS__)
62
63 static
64 const char *print_live_iterator_status(enum lttng_live_iterator_status status)
65 {
66 switch (status) {
67 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
68 return "LTTNG_LIVE_ITERATOR_STATUS_CONTINUE";
69 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
70 return "LTTNG_LIVE_ITERATOR_STATUS_AGAIN";
71 case LTTNG_LIVE_ITERATOR_STATUS_END:
72 return "LTTNG_LIVE_ITERATOR_STATUS_END";
73 case LTTNG_LIVE_ITERATOR_STATUS_OK:
74 return "LTTNG_LIVE_ITERATOR_STATUS_OK";
75 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
76 return "LTTNG_LIVE_ITERATOR_STATUS_INVAL";
77 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
78 return "LTTNG_LIVE_ITERATOR_STATUS_ERROR";
79 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
80 return "LTTNG_LIVE_ITERATOR_STATUS_NOMEM";
81 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
82 return "LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED";
83 default:
84 abort();
85 }
86 }
87
88 static
89 const char *print_state(struct lttng_live_stream_iterator *s)
90 {
91 switch (s->state) {
92 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
93 return "ACTIVE_NO_DATA";
94 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
95 return "QUIESCENT_NO_DATA";
96 case LTTNG_LIVE_STREAM_QUIESCENT:
97 return "QUIESCENT";
98 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
99 return "ACTIVE_DATA";
100 case LTTNG_LIVE_STREAM_EOF:
101 return "EOF";
102 default:
103 return "ERROR";
104 }
105 }
106
107 #define print_stream_state(live_stream_iter) \
108 do { \
109 BT_COMP_LOGD("stream state %s last_inact_ts %" PRId64 \
110 ", curr_inact_ts %" PRId64, \
111 print_state(live_stream_iter), \
112 live_stream_iter->last_inactivity_ts, \
113 live_stream_iter->current_inactivity_ts); \
114 } while (0);
115
116 BT_HIDDEN
117 bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter)
118 {
119 bool ret;
120
121 if (!msg_iter) {
122 ret = false;
123 goto end;
124 }
125
126 ret = bt_self_message_iterator_is_interrupted(
127 msg_iter->self_msg_iter);
128
129 end:
130 return ret;
131 }
132
133 static
134 struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
135 uint64_t trace_id)
136 {
137 uint64_t trace_idx;
138 struct lttng_live_trace *ret_trace = NULL;
139
140 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
141 struct lttng_live_trace *trace =
142 g_ptr_array_index(session->traces, trace_idx);
143 if (trace->id == trace_id) {
144 ret_trace = trace;
145 goto end;
146 }
147 }
148
149 end:
150 return ret_trace;
151 }
152
153 static
154 void lttng_live_destroy_trace(struct lttng_live_trace *trace)
155 {
156 bt_logging_level log_level = trace->log_level;
157 bt_self_component *self_comp = trace->self_comp;
158
159 BT_COMP_LOGD("Destroy lttng_live_trace");
160
161 BT_ASSERT(trace->stream_iterators);
162 g_ptr_array_free(trace->stream_iterators, TRUE);
163
164 BT_TRACE_PUT_REF_AND_RESET(trace->trace);
165 BT_TRACE_CLASS_PUT_REF_AND_RESET(trace->trace_class);
166
167 lttng_live_metadata_fini(trace);
168 g_free(trace);
169 }
170
171 static
172 struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
173 uint64_t trace_id)
174 {
175 struct lttng_live_trace *trace = NULL;
176 bt_logging_level log_level = session->log_level;
177 bt_self_component *self_comp = session->self_comp;
178
179 trace = g_new0(struct lttng_live_trace, 1);
180 if (!trace) {
181 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
182 "Failed to allocate live trace");
183 goto error;
184 }
185 trace->log_level = session->log_level;
186 trace->self_comp = session->self_comp;
187 trace->session = session;
188 trace->id = trace_id;
189 trace->trace_class = NULL;
190 trace->trace = NULL;
191 trace->stream_iterators = g_ptr_array_new_with_free_func(
192 (GDestroyNotify) lttng_live_stream_iterator_destroy);
193 BT_ASSERT(trace->stream_iterators);
194 trace->new_metadata_needed = true;
195 g_ptr_array_add(session->traces, trace);
196
197 BT_COMP_LOGI("Create trace");
198 goto end;
199 error:
200 g_free(trace);
201 trace = NULL;
202 end:
203 return trace;
204 }
205
206 BT_HIDDEN
207 struct lttng_live_trace *lttng_live_borrow_trace(
208 struct lttng_live_session *session, uint64_t trace_id)
209 {
210 struct lttng_live_trace *trace;
211
212 trace = lttng_live_find_trace(session, trace_id);
213 if (trace) {
214 goto end;
215 }
216
217 /* The session is the owner of the newly created trace. */
218 trace = lttng_live_create_trace(session, trace_id);
219
220 end:
221 return trace;
222 }
223
224 BT_HIDDEN
225 int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
226 uint64_t session_id, const char *hostname,
227 const char *session_name)
228 {
229 int ret = 0;
230 struct lttng_live_session *session;
231 bt_logging_level log_level = lttng_live_msg_iter->log_level;
232 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
233
234 session = g_new0(struct lttng_live_session, 1);
235 if (!session) {
236 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
237 "Failed to allocate live session");
238 goto error;
239 }
240
241 session->log_level = lttng_live_msg_iter->log_level;
242 session->self_comp = lttng_live_msg_iter->self_comp;
243 session->id = session_id;
244 session->traces = g_ptr_array_new_with_free_func(
245 (GDestroyNotify) lttng_live_destroy_trace);
246 BT_ASSERT(session->traces);
247 session->lttng_live_msg_iter = lttng_live_msg_iter;
248 session->new_streams_needed = true;
249 session->hostname = g_string_new(hostname);
250 BT_ASSERT(session->hostname);
251
252 session->session_name = g_string_new(session_name);
253 BT_ASSERT(session->session_name);
254
255 BT_COMP_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s",
256 session->id, hostname, session_name);
257 g_ptr_array_add(lttng_live_msg_iter->sessions, session);
258 goto end;
259 error:
260 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error adding session");
261 g_free(session);
262 ret = -1;
263 end:
264 return ret;
265 }
266
267 static
268 void lttng_live_destroy_session(struct lttng_live_session *session)
269 {
270 bt_logging_level log_level;
271 bt_self_component *self_comp;
272
273 if (!session) {
274 goto end;
275 }
276
277 log_level = session->log_level;
278 self_comp = session->self_comp;
279 BT_COMP_LOGD("Destroy lttng live session");
280 if (session->id != -1ULL) {
281 if (lttng_live_detach_session(session)) {
282 if (!lttng_live_graph_is_canceled(
283 session->lttng_live_msg_iter)) {
284 /* Old relayd cannot detach sessions. */
285 BT_COMP_LOGD("Unable to detach lttng live session %" PRIu64,
286 session->id);
287 }
288 }
289 session->id = -1ULL;
290 }
291
292 if (session->traces) {
293 g_ptr_array_free(session->traces, TRUE);
294 }
295
296 if (session->hostname) {
297 g_string_free(session->hostname, TRUE);
298 }
299 if (session->session_name) {
300 g_string_free(session->session_name, TRUE);
301 }
302 g_free(session);
303
304 end:
305 return;
306 }
307
308 static
309 void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter)
310 {
311 if (!lttng_live_msg_iter) {
312 goto end;
313 }
314
315 if (lttng_live_msg_iter->sessions) {
316 g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
317 }
318
319 if (lttng_live_msg_iter->viewer_connection) {
320 live_viewer_connection_destroy(lttng_live_msg_iter->viewer_connection);
321 }
322 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
323 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
324
325 /* All stream iterators must be destroyed at this point. */
326 BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0);
327 lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false;
328
329 g_free(lttng_live_msg_iter);
330
331 end:
332 return;
333 }
334
335 BT_HIDDEN
336 void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
337 {
338 struct lttng_live_msg_iter *lttng_live_msg_iter;
339
340 BT_ASSERT(self_msg_iter);
341
342 lttng_live_msg_iter = bt_self_message_iterator_get_data(self_msg_iter);
343 BT_ASSERT(lttng_live_msg_iter);
344 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
345 }
346
347 static
348 enum lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
349 struct lttng_live_stream_iterator *lttng_live_stream)
350 {
351 bt_logging_level log_level = lttng_live_stream->log_level;
352 bt_self_component *self_comp = lttng_live_stream->self_comp;
353
354 switch (lttng_live_stream->state) {
355 case LTTNG_LIVE_STREAM_QUIESCENT:
356 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
357 break;
358 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
359 /* Invalid state. */
360 BT_COMP_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
361 abort();
362 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
363 /* Invalid state. */
364 BT_COMP_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
365 abort();
366 case LTTNG_LIVE_STREAM_EOF:
367 break;
368 }
369 return LTTNG_LIVE_ITERATOR_STATUS_OK;
370 }
371
372 /*
373 * For active no data stream, fetch next data. It can be either:
374 * - quiescent: need to put it in the prio heap at quiescent end
375 * timestamp,
376 * - have data: need to wire up first event into the prio heap,
377 * - have no data on this stream at this point: need to retry (AGAIN) or
378 * return EOF.
379 */
380 static
381 enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
382 struct lttng_live_msg_iter *lttng_live_msg_iter,
383 struct lttng_live_stream_iterator *lttng_live_stream)
384 {
385 bt_logging_level log_level = lttng_live_msg_iter->log_level;
386 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
387 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
388 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
389 struct packet_index index;
390
391 if (lttng_live_stream->trace->new_metadata_needed) {
392 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
393 goto end;
394 }
395 if (lttng_live_stream->trace->session->new_streams_needed) {
396 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
397 goto end;
398 }
399 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
400 lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
401 goto end;
402 }
403 ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream,
404 &index);
405 if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
406 goto end;
407 }
408 BT_ASSERT_DBG(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
409 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
410 uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts,
411 curr_inact_ts = lttng_live_stream->current_inactivity_ts;
412
413 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA &&
414 last_inact_ts == curr_inact_ts) {
415 ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
416 print_stream_state(lttng_live_stream);
417 } else {
418 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
419 }
420 goto end;
421 }
422 lttng_live_stream->base_offset = index.offset;
423 lttng_live_stream->offset = index.offset;
424 lttng_live_stream->len = index.packet_size / CHAR_BIT;
425 end:
426 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
427 ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
428 }
429 return ret;
430 }
431
432 /*
433 * Creation of the message requires the ctf trace class to be created
434 * beforehand, but the live protocol gives us all streams (including metadata)
435 * at once. So we split it in three steps: getting streams, getting metadata
436 * (which creates the ctf trace class), and then creating the per-stream
437 * messages.
438 */
439 static
440 enum lttng_live_iterator_status lttng_live_get_session(
441 struct lttng_live_msg_iter *lttng_live_msg_iter,
442 struct lttng_live_session *session)
443 {
444 bt_logging_level log_level = lttng_live_msg_iter->log_level;
445 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
446 enum lttng_live_iterator_status status;
447 uint64_t trace_idx;
448
449 if (!session->attached) {
450 enum lttng_live_attach_session_status attach_status =
451 lttng_live_attach_session(session);
452 if (attach_status != LTTNG_LIVE_ATTACH_SESSION_STATUS_OK) {
453 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
454 /*
455 * Clear any causes appended in
456 * `lttng_live_attach_session()` as we want to
457 * return gracefully since the graph was
458 * cancelled.
459 */
460 bt_current_thread_clear_error();
461 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
462 } else {
463 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
464 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
465 "Error attaching to LTTng live session");
466 }
467 goto end;
468 }
469 }
470
471 status = lttng_live_get_new_streams(session);
472 if (status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
473 status != LTTNG_LIVE_ITERATOR_STATUS_END) {
474 goto end;
475 }
476 trace_idx = 0;
477 while (trace_idx < session->traces->len) {
478 struct lttng_live_trace *trace =
479 g_ptr_array_index(session->traces, trace_idx);
480
481 status = lttng_live_metadata_update(trace);
482 switch (status) {
483 case LTTNG_LIVE_ITERATOR_STATUS_OK:
484 trace_idx++;
485 break;
486 case LTTNG_LIVE_ITERATOR_STATUS_END:
487 /*
488 * The trace has ended. Remove it of the array an
489 * continue the iteration.
490 * We can remove the trace safely when using the
491 * g_ptr_array_remove_index_fast because it replaces
492 * the element at trace_idx with the array's last
493 * element. trace_idx is not incremented because of
494 * that.
495 */
496 (void) g_ptr_array_remove_index_fast(session->traces,
497 trace_idx);
498 break;
499 default:
500 goto end;
501 }
502 }
503 status = lttng_live_lazy_msg_init(session);
504
505 end:
506 return status;
507 }
508
509 BT_HIDDEN
510 void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter)
511 {
512 uint64_t session_idx;
513
514 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
515 session_idx++) {
516 struct lttng_live_session *session =
517 g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
518 session->new_streams_needed = true;
519 }
520 }
521
522 static
523 void lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
524 {
525 uint64_t session_idx, trace_idx;
526
527 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
528 session_idx++) {
529 struct lttng_live_session *session =
530 g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
531 session->new_streams_needed = true;
532 for (trace_idx = 0; trace_idx < session->traces->len;
533 trace_idx++) {
534 struct lttng_live_trace *trace =
535 g_ptr_array_index(session->traces, trace_idx);
536 trace->new_metadata_needed = true;
537 }
538 }
539 }
540
541 static
542 enum lttng_live_iterator_status
543 lttng_live_iterator_handle_new_streams_and_metadata(
544 struct lttng_live_msg_iter *lttng_live_msg_iter)
545 {
546 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
547 bt_logging_level log_level = lttng_live_msg_iter->log_level;
548 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
549 uint64_t session_idx = 0, nr_sessions_opened = 0;
550 struct lttng_live_session *session;
551 enum session_not_found_action sess_not_found_act =
552 lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
553
554 /*
555 * In a remotely distant future, we could add a "new
556 * session" flag to the protocol, which would tell us that we
557 * need to query for new sessions even though we have sessions
558 * currently ongoing.
559 */
560 if (lttng_live_msg_iter->sessions->len == 0) {
561 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
562 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
563 goto end;
564 } else {
565 /*
566 * Retry to create a viewer session for the requested
567 * session name.
568 */
569 if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
570 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
571 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
572 "Error creating LTTng live viewer session");
573 goto end;
574 }
575 }
576 }
577
578 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
579 session_idx++) {
580 session = g_ptr_array_index(lttng_live_msg_iter->sessions,
581 session_idx);
582 ret = lttng_live_get_session(lttng_live_msg_iter, session);
583 switch (ret) {
584 case LTTNG_LIVE_ITERATOR_STATUS_OK:
585 break;
586 case LTTNG_LIVE_ITERATOR_STATUS_END:
587 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
588 break;
589 default:
590 goto end;
591 }
592 if (!session->closed) {
593 nr_sessions_opened++;
594 }
595 }
596 end:
597 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK &&
598 sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE &&
599 nr_sessions_opened == 0) {
600 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
601 }
602 return ret;
603 }
604
605 static
606 enum lttng_live_iterator_status emit_inactivity_message(
607 struct lttng_live_msg_iter *lttng_live_msg_iter,
608 struct lttng_live_stream_iterator *stream_iter,
609 bt_message **message, uint64_t timestamp)
610 {
611 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
612 bt_logging_level log_level = lttng_live_msg_iter->log_level;
613 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
614 bt_message *msg = NULL;
615
616 BT_ASSERT(stream_iter->trace->clock_class);
617
618 msg = bt_message_message_iterator_inactivity_create(
619 lttng_live_msg_iter->self_msg_iter,
620 stream_iter->trace->clock_class, timestamp);
621 if (!msg) {
622 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
623 "Error emitting message iterator inactivity message");
624 goto error;
625 }
626
627 *message = msg;
628 end:
629 return ret;
630
631 error:
632 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
633 bt_message_put_ref(msg);
634 goto end;
635 }
636
637 static
638 enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
639 struct lttng_live_msg_iter *lttng_live_msg_iter,
640 struct lttng_live_stream_iterator *lttng_live_stream,
641 bt_message **message)
642 {
643 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
644
645 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
646 return LTTNG_LIVE_ITERATOR_STATUS_OK;
647 }
648
649 if (lttng_live_stream->current_inactivity_ts ==
650 lttng_live_stream->last_inactivity_ts) {
651 lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
652 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
653 goto end;
654 }
655
656 ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream,
657 message, lttng_live_stream->current_inactivity_ts);
658
659 lttng_live_stream->last_inactivity_ts =
660 lttng_live_stream->current_inactivity_ts;
661 end:
662 return ret;
663 }
664
665 static
666 int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter,
667 struct lttng_live_msg_iter *lttng_live_msg_iter,
668 const bt_message *msg, int64_t last_msg_ts_ns,
669 int64_t *ts_ns)
670 {
671 const bt_clock_class *clock_class = NULL;
672 const bt_clock_snapshot *clock_snapshot = NULL;
673 int ret = 0;
674 bt_logging_level log_level = lttng_live_msg_iter->log_level;
675 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
676
677 BT_ASSERT_DBG(msg);
678 BT_ASSERT_DBG(ts_ns);
679
680 BT_COMP_LOGD("Getting message's timestamp: iter-data-addr=%p, msg-addr=%p, "
681 "last-msg-ts=%" PRId64, lttng_live_msg_iter, msg,
682 last_msg_ts_ns);
683
684 switch (bt_message_get_type(msg)) {
685 case BT_MESSAGE_TYPE_EVENT:
686 clock_class = bt_message_event_borrow_stream_class_default_clock_class_const(
687 msg);
688 BT_ASSERT_DBG(clock_class);
689
690 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(
691 msg);
692 break;
693 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
694 clock_class = bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
695 msg);
696 BT_ASSERT(clock_class);
697
698 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
699 msg);
700 break;
701 case BT_MESSAGE_TYPE_PACKET_END:
702 clock_class = bt_message_packet_end_borrow_stream_class_default_clock_class_const(
703 msg);
704 BT_ASSERT(clock_class);
705
706 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(
707 msg);
708 break;
709 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
710 clock_class = bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
711 msg);
712 BT_ASSERT(clock_class);
713
714 clock_snapshot = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
715 msg);
716 break;
717 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
718 clock_class = bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
719 msg);
720 BT_ASSERT(clock_class);
721
722 clock_snapshot = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
723 msg);
724 break;
725 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
726 clock_snapshot = bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
727 msg);
728 break;
729 default:
730 /* All the other messages have a higher priority */
731 BT_COMP_LOGD_STR("Message has no timestamp: using the last message timestamp.");
732 *ts_ns = last_msg_ts_ns;
733 goto end;
734 }
735
736 clock_class = bt_clock_snapshot_borrow_clock_class_const(clock_snapshot);
737 BT_ASSERT_DBG(clock_class);
738
739 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
740 if (ret) {
741 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
742 "Cannot get nanoseconds from Epoch of clock snapshot: "
743 "clock-snapshot-addr=%p", clock_snapshot);
744 goto error;
745 }
746
747 goto end;
748
749 error:
750 ret = -1;
751
752 end:
753 if (ret == 0) {
754 BT_COMP_LOGD("Found message's timestamp: "
755 "iter-data-addr=%p, msg-addr=%p, "
756 "last-msg-ts=%" PRId64 ", ts=%" PRId64,
757 lttng_live_msg_iter, msg, last_msg_ts_ns, *ts_ns);
758 }
759
760 return ret;
761 }
762
763 static
764 enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
765 struct lttng_live_msg_iter *lttng_live_msg_iter,
766 struct lttng_live_stream_iterator *lttng_live_stream,
767 bt_message **message)
768 {
769 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
770 bt_logging_level log_level = lttng_live_msg_iter->log_level;
771 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
772 enum bt_msg_iter_status status;
773 uint64_t session_idx, trace_idx;
774
775 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
776 session_idx++) {
777 struct lttng_live_session *session =
778 g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
779
780 if (session->new_streams_needed) {
781 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
782 goto end;
783 }
784 for (trace_idx = 0; trace_idx < session->traces->len;
785 trace_idx++) {
786 struct lttng_live_trace *trace =
787 g_ptr_array_index(session->traces, trace_idx);
788 if (trace->new_metadata_needed) {
789 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
790 goto end;
791 }
792 }
793 }
794
795 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
796 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
797 goto end;
798 }
799
800 status = bt_msg_iter_get_next_message(lttng_live_stream->msg_iter,
801 lttng_live_msg_iter->self_msg_iter, message);
802 switch (status) {
803 case BT_MSG_ITER_STATUS_EOF:
804 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
805 break;
806 case BT_MSG_ITER_STATUS_OK:
807 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
808 break;
809 case BT_MSG_ITER_STATUS_AGAIN:
810 /*
811 * Continue immediately (end of packet). The next
812 * get_index may return AGAIN to delay the following
813 * attempt.
814 */
815 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
816 break;
817 case BT_MSG_ITER_STATUS_INVAL:
818 /* No argument provided by the user, so don't return INVAL. */
819 case BT_MSG_ITER_STATUS_ERROR:
820 default:
821 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
822 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
823 "CTF message iterator return an error or failed: "
824 "msg_iter=%p", lttng_live_stream->msg_iter);
825 break;
826 }
827
828 end:
829 return ret;
830 }
831
832 static
833 enum lttng_live_iterator_status lttng_live_iterator_close_stream(
834 struct lttng_live_msg_iter *lttng_live_msg_iter,
835 struct lttng_live_stream_iterator *stream_iter,
836 bt_message **curr_msg)
837 {
838 enum lttng_live_iterator_status live_status =
839 LTTNG_LIVE_ITERATOR_STATUS_OK;
840 bt_logging_level log_level = lttng_live_msg_iter->log_level;
841 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
842 /*
843 * The viewer has hung up on us so we are closing the stream. The
844 * `bt_msg_iter` should simply realize that it needs to close the
845 * stream properly by emitting the necessary stream end message.
846 */
847 enum bt_msg_iter_status status = bt_msg_iter_get_next_message(
848 stream_iter->msg_iter, lttng_live_msg_iter->self_msg_iter,
849 curr_msg);
850
851 if (status == BT_MSG_ITER_STATUS_ERROR) {
852 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
853 "Error getting the next message from CTF message iterator");
854 live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
855 goto end;
856 }
857
858 BT_ASSERT(status == BT_MSG_ITER_STATUS_OK);
859
860 end:
861 return live_status;
862 }
863
864 /*
865 * helper function:
866 * handle_no_data_streams()
867 * retry:
868 * - for each ACTIVE_NO_DATA stream:
869 * - query relayd for stream data, or quiescence info.
870 * - if need metadata, get metadata, goto retry.
871 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
872 * - if quiescent, move to QUIESCENT streams
873 * - if fetched data, move to ACTIVE_DATA streams
874 * (at this point each stream either has data, or is quiescent)
875 *
876 *
877 * iterator_next:
878 * handle_new_streams_and_metadata()
879 * - query relayd for known streams, add them as ACTIVE_NO_DATA
880 * - query relayd for metadata
881 *
882 * call handle_active_no_data_streams()
883 *
884 * handle_quiescent_streams()
885 * - if at least one stream is ACTIVE_DATA:
886 * - peek stream event with lowest timestamp -> next_ts
887 * - for each quiescent stream
888 * - if next_ts >= quiescent end
889 * - set state to ACTIVE_NO_DATA
890 * - else
891 * - for each quiescent stream
892 * - set state to ACTIVE_NO_DATA
893 *
894 * call handle_active_no_data_streams()
895 *
896 * handle_active_data_streams()
897 * - if at least one stream is ACTIVE_DATA:
898 * - get stream event with lowest timestamp from heap
899 * - make that stream event the current message.
900 * - move this stream heap position to its next event
901 * - if we need to fetch data from relayd, move
902 * stream to ACTIVE_NO_DATA.
903 * - return OK
904 * - return AGAIN
905 *
906 * end criterion: ctrl-c on client. If relayd exits or the session
907 * closes on the relay daemon side, we keep on waiting for streams.
908 * Eventually handle --end timestamp (also an end criterion).
909 *
910 * When disconnected from relayd: try to re-connect endlessly.
911 */
912 static
913 enum lttng_live_iterator_status lttng_live_iterator_next_msg_on_stream(
914 struct lttng_live_msg_iter *lttng_live_msg_iter,
915 struct lttng_live_stream_iterator *stream_iter,
916 bt_message **curr_msg)
917 {
918 bt_logging_level log_level = lttng_live_msg_iter->log_level;
919 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
920 enum lttng_live_iterator_status live_status;
921
922 if (stream_iter->has_stream_hung_up) {
923 /*
924 * The stream has hung up and the stream was properly closed
925 * during the last call to the current function. Return _END
926 * status now so that this stream iterator is removed for the
927 * stream iterator list.
928 */
929 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
930 goto end;
931 }
932
933 retry:
934 print_stream_state(stream_iter);
935 live_status = lttng_live_iterator_handle_new_streams_and_metadata(
936 lttng_live_msg_iter);
937 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
938 goto end;
939 }
940 live_status = lttng_live_iterator_next_handle_one_no_data_stream(
941 lttng_live_msg_iter, stream_iter);
942
943 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
944 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
945 /*
946 * We overwrite `live_status` since `curr_msg` is
947 * likely set to a valid message in this function.
948 */
949 live_status = lttng_live_iterator_close_stream(
950 lttng_live_msg_iter, stream_iter, curr_msg);
951 }
952 goto end;
953 }
954 live_status = lttng_live_iterator_next_handle_one_quiescent_stream(
955 lttng_live_msg_iter, stream_iter, curr_msg);
956 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
957 BT_ASSERT(!*curr_msg);
958 goto end;
959 }
960 if (*curr_msg) {
961 goto end;
962 }
963 live_status = lttng_live_iterator_next_handle_one_active_data_stream(
964 lttng_live_msg_iter, stream_iter, curr_msg);
965 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
966 BT_ASSERT(!*curr_msg);
967 }
968
969 end:
970 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
971 goto retry;
972 }
973
974 return live_status;
975 }
976
977 static
978 enum lttng_live_iterator_status next_stream_iterator_for_trace(
979 struct lttng_live_msg_iter *lttng_live_msg_iter,
980 struct lttng_live_trace *live_trace,
981 struct lttng_live_stream_iterator **youngest_trace_stream_iter)
982 {
983 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
984 bt_logging_level log_level = lttng_live_msg_iter->log_level;
985 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
986 enum lttng_live_iterator_status stream_iter_status;;
987 int64_t youngest_candidate_msg_ts = INT64_MAX;
988 uint64_t stream_iter_idx;
989
990 BT_ASSERT_DBG(live_trace);
991 BT_ASSERT_DBG(live_trace->stream_iterators);
992 /*
993 * Update the current message of every stream iterators of this trace.
994 * The current msg of every stream must have a timestamp equal or
995 * larger than the last message returned by this iterator. We must
996 * ensure monotonicity.
997 */
998 stream_iter_idx = 0;
999 while (stream_iter_idx < live_trace->stream_iterators->len) {
1000 bool stream_iter_is_ended = false;
1001 struct lttng_live_stream_iterator *stream_iter =
1002 g_ptr_array_index(live_trace->stream_iterators,
1003 stream_iter_idx);
1004
1005 /*
1006 * Find if there is are now current message for this stream
1007 * iterator get it.
1008 */
1009 while (!stream_iter->current_msg) {
1010 bt_message *msg = NULL;
1011 int64_t curr_msg_ts_ns = INT64_MAX;
1012 stream_iter_status = lttng_live_iterator_next_msg_on_stream(
1013 lttng_live_msg_iter, stream_iter, &msg);
1014
1015 BT_COMP_LOGD("live stream iterator returned status :%s",
1016 print_live_iterator_status(stream_iter_status));
1017 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1018 stream_iter_is_ended = true;
1019 break;
1020 }
1021
1022 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1023 goto end;
1024 }
1025
1026 BT_ASSERT_DBG(msg);
1027
1028 /*
1029 * Get the timestamp in nanoseconds from origin of this
1030 * messsage.
1031 */
1032 live_get_msg_ts_ns(stream_iter, lttng_live_msg_iter,
1033 msg, lttng_live_msg_iter->last_msg_ts_ns,
1034 &curr_msg_ts_ns);
1035
1036 /*
1037 * Check if the message of the current live stream
1038 * iterator occured at the exact same time or after the
1039 * last message returned by this component's message
1040 * iterator. If not, we return an error.
1041 */
1042 if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
1043 stream_iter->current_msg = msg;
1044 stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
1045 } else {
1046 /*
1047 * We received a message in the past. To ensure
1048 * monotonicity, we can't send it forward.
1049 */
1050 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1051 "Message's timestamp is less than "
1052 "lttng-live's message iterator's last "
1053 "returned timestamp: "
1054 "lttng-live-msg-iter-addr=%p, ts=%" PRId64 ", "
1055 "last-msg-ts=%" PRId64,
1056 lttng_live_msg_iter, curr_msg_ts_ns,
1057 lttng_live_msg_iter->last_msg_ts_ns);
1058 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1059 goto end;
1060 }
1061 }
1062
1063 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1064
1065 if (!stream_iter_is_ended) {
1066 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1067 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1068 /*
1069 * Update the current best candidate message
1070 * for the stream iterator of this live trace
1071 * to be forwarded downstream.
1072 */
1073 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1074 youngest_candidate_stream_iter = stream_iter;
1075 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1076 /*
1077 * Order the messages in an arbitrary but
1078 * deterministic way.
1079 */
1080 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1081 int ret = common_muxing_compare_messages(
1082 stream_iter->current_msg,
1083 youngest_candidate_stream_iter->current_msg);
1084 if (ret < 0) {
1085 /*
1086 * The `youngest_candidate_stream_iter->current_msg`
1087 * should go first. Update the next
1088 * iterator and the current timestamp.
1089 */
1090 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1091 youngest_candidate_stream_iter = stream_iter;
1092 } else if (ret == 0) {
1093 /*
1094 * Unable to pick which one should go
1095 * first.
1096 */
1097 BT_COMP_LOGW("Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1098 "stream-iter-addr=%p"
1099 "stream-iter-addr=%p",
1100 stream_iter,
1101 youngest_candidate_stream_iter);
1102 }
1103 }
1104
1105 stream_iter_idx++;
1106 } else {
1107 /*
1108 * The live stream iterator has ended. That
1109 * iterator is removed from the array, but
1110 * there is no need to increment
1111 * stream_iter_idx as
1112 * g_ptr_array_remove_index_fast replaces the
1113 * removed element with the array's last
1114 * element.
1115 */
1116 g_ptr_array_remove_index_fast(
1117 live_trace->stream_iterators,
1118 stream_iter_idx);
1119 }
1120 }
1121
1122 if (youngest_candidate_stream_iter) {
1123 *youngest_trace_stream_iter = youngest_candidate_stream_iter;
1124 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1125 } else {
1126 /*
1127 * The only case where we don't have a candidate for this trace
1128 * is if we reached the end of all the iterators.
1129 */
1130 BT_ASSERT(live_trace->stream_iterators->len == 0);
1131 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1132 }
1133
1134 end:
1135 return stream_iter_status;
1136 }
1137
1138 static
1139 enum lttng_live_iterator_status next_stream_iterator_for_session(
1140 struct lttng_live_msg_iter *lttng_live_msg_iter,
1141 struct lttng_live_session *session,
1142 struct lttng_live_stream_iterator **youngest_session_stream_iter)
1143 {
1144 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
1145 bt_logging_level log_level = lttng_live_msg_iter->log_level;
1146 enum lttng_live_iterator_status stream_iter_status;
1147 uint64_t trace_idx = 0;
1148 int64_t youngest_candidate_msg_ts = INT64_MAX;
1149 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
1150
1151 /*
1152 * Make sure we are attached to the session and look for new streams
1153 * and metadata.
1154 */
1155 stream_iter_status = lttng_live_get_session(lttng_live_msg_iter, session);
1156 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
1157 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE &&
1158 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) {
1159 goto end;
1160 }
1161
1162 BT_ASSERT_DBG(session->traces);
1163
1164 while (trace_idx < session->traces->len) {
1165 bool trace_is_ended = false;
1166 struct lttng_live_stream_iterator *stream_iter;
1167 struct lttng_live_trace *trace =
1168 g_ptr_array_index(session->traces, trace_idx);
1169
1170 stream_iter_status = next_stream_iterator_for_trace(
1171 lttng_live_msg_iter, trace, &stream_iter);
1172 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1173 /*
1174 * All the live stream iterators for this trace are
1175 * ENDed. Remove the trace from this session.
1176 */
1177 trace_is_ended = true;
1178 } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1179 goto end;
1180 }
1181
1182 if (!trace_is_ended) {
1183 BT_ASSERT_DBG(stream_iter);
1184
1185 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1186 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1187 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1188 youngest_candidate_stream_iter = stream_iter;
1189 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1190 /*
1191 * Order the messages in an arbitrary but
1192 * deterministic way.
1193 */
1194 int ret = common_muxing_compare_messages(
1195 stream_iter->current_msg,
1196 youngest_candidate_stream_iter->current_msg);
1197 if (ret < 0) {
1198 /*
1199 * The `youngest_candidate_stream_iter->current_msg`
1200 * should go first. Update the next iterator
1201 * and the current timestamp.
1202 */
1203 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1204 youngest_candidate_stream_iter = stream_iter;
1205 } else if (ret == 0) {
1206 /* Unable to pick which one should go first. */
1207 BT_COMP_LOGW("Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1208 "stream-iter-addr=%p" "stream-iter-addr=%p",
1209 stream_iter, youngest_candidate_stream_iter);
1210 }
1211 }
1212 trace_idx++;
1213 } else {
1214 /*
1215 * trace_idx is not incremented since
1216 * g_ptr_array_remove_index_fast replaces the
1217 * element at trace_idx with the array's last element.
1218 */
1219 g_ptr_array_remove_index_fast(session->traces,
1220 trace_idx);
1221 }
1222 }
1223 if (youngest_candidate_stream_iter) {
1224 *youngest_session_stream_iter = youngest_candidate_stream_iter;
1225 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1226 } else {
1227 /*
1228 * The only cases where we don't have a candidate for this
1229 * trace is:
1230 * 1. if we reached the end of all the iterators of all the
1231 * traces of this session,
1232 * 2. if we never had live stream iterator in the first place.
1233 *
1234 * In either cases, we return END.
1235 */
1236 BT_ASSERT(session->traces->len == 0);
1237 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1238 }
1239 end:
1240 return stream_iter_status;
1241 }
1242
1243 static inline
1244 void put_messages(bt_message_array_const msgs, uint64_t count)
1245 {
1246 uint64_t i;
1247
1248 for (i = 0; i < count; i++) {
1249 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
1250 }
1251 }
1252
1253 BT_HIDDEN
1254 bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next(
1255 bt_self_message_iterator *self_msg_it,
1256 bt_message_array_const msgs, uint64_t capacity,
1257 uint64_t *count)
1258 {
1259 bt_component_class_message_iterator_next_method_status status;
1260 struct lttng_live_msg_iter *lttng_live_msg_iter =
1261 bt_self_message_iterator_get_data(self_msg_it);
1262 struct lttng_live_component *lttng_live =
1263 lttng_live_msg_iter->lttng_live_comp;
1264 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
1265 bt_logging_level log_level = lttng_live_msg_iter->log_level;
1266 enum lttng_live_iterator_status stream_iter_status;
1267 uint64_t session_idx;
1268
1269 *count = 0;
1270
1271 BT_ASSERT_DBG(lttng_live_msg_iter);
1272
1273 /*
1274 * Clear all the invalid message reference that might be left over in
1275 * the output array.
1276 */
1277 memset(msgs, 0, capacity * sizeof(*msgs));
1278
1279 /*
1280 * If no session are exposed on the relay found at the url provided by
1281 * the user, session count will be 0. In this case, we return status
1282 * end to return gracefully.
1283 */
1284 if (lttng_live_msg_iter->sessions->len == 0) {
1285 if (lttng_live->params.sess_not_found_act !=
1286 SESSION_NOT_FOUND_ACTION_CONTINUE) {
1287 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END;
1288 goto no_session;
1289 } else {
1290 /*
1291 * The are no more active session for this session
1292 * name. Retry to create a viewer session for the
1293 * requested session name.
1294 */
1295 if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
1296 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1297 goto no_session;
1298 }
1299 }
1300 }
1301
1302 if (lttng_live_msg_iter->active_stream_iter == 0) {
1303 lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter);
1304 }
1305
1306 /*
1307 * Here the muxing of message is done.
1308 *
1309 * We need to iterate over all the streams of all the traces of all the
1310 * viewer sessions in order to get the message with the smallest
1311 * timestamp. In this case, a session is a viewer session and there is
1312 * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
1313 * kernel). Each viewer session can have multiple traces, for example,
1314 * 64bit UST viewer sessions could have multiple per-pid traces.
1315 *
1316 * We iterate over the streams of each traces to update and see what is
1317 * their next message's timestamp. From those timestamps, we select the
1318 * message with the smallest timestamp as the best candidate message
1319 * for that trace and do the same thing across all the sessions.
1320 *
1321 * We then compare the timestamp of best candidate message of all the
1322 * sessions to pick the message with the smallest timestamp and we
1323 * return it.
1324 */
1325 while (*count < capacity) {
1326 struct lttng_live_stream_iterator *youngest_stream_iter = NULL,
1327 *candidate_stream_iter = NULL;
1328 int64_t youngest_msg_ts_ns = INT64_MAX;
1329
1330 BT_ASSERT_DBG(lttng_live_msg_iter->sessions);
1331 session_idx = 0;
1332 while (session_idx < lttng_live_msg_iter->sessions->len) {
1333 struct lttng_live_session *session =
1334 g_ptr_array_index(lttng_live_msg_iter->sessions,
1335 session_idx);
1336
1337 /* Find the best candidate message to send downstream. */
1338 stream_iter_status = next_stream_iterator_for_session(
1339 lttng_live_msg_iter, session,
1340 &candidate_stream_iter);
1341
1342 /* If we receive an END status, it means that either:
1343 * - Those traces never had active streams (UST with no
1344 * data produced yet),
1345 * - All live stream iterators have ENDed.*/
1346 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1347 if (session->closed && session->traces->len == 0) {
1348 /*
1349 * Remove the session from the list.
1350 * session_idx is not modified since
1351 * g_ptr_array_remove_index_fast
1352 * replaces the the removed element with
1353 * the array's last element.
1354 */
1355 g_ptr_array_remove_index_fast(
1356 lttng_live_msg_iter->sessions,
1357 session_idx);
1358 } else {
1359 session_idx++;
1360 }
1361 continue;
1362 }
1363
1364 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1365 goto end;
1366 }
1367
1368 if (G_UNLIKELY(youngest_stream_iter == NULL) ||
1369 candidate_stream_iter->current_msg_ts_ns < youngest_msg_ts_ns) {
1370 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1371 youngest_stream_iter = candidate_stream_iter;
1372 } else if (candidate_stream_iter->current_msg_ts_ns == youngest_msg_ts_ns) {
1373 /*
1374 * The currently selected message to be sent
1375 * downstream next has the exact same timestamp
1376 * that of the current candidate message. We
1377 * must break the tie in a predictable manner.
1378 */
1379 BT_COMP_LOGD_STR("Two of the next message candidates have the same timestamps, pick one deterministically.");
1380 /*
1381 * Order the messages in an arbitrary but
1382 * deterministic way.
1383 */
1384 int ret = common_muxing_compare_messages(
1385 candidate_stream_iter->current_msg,
1386 youngest_stream_iter->current_msg);
1387 if (ret < 0) {
1388 /*
1389 * The `candidate_stream_iter->current_msg`
1390 * should go first. Update the next
1391 * iterator and the current timestamp.
1392 */
1393 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1394 youngest_stream_iter = candidate_stream_iter;
1395 } else if (ret == 0) {
1396 /* Unable to pick which one should go first. */
1397 BT_COMP_LOGW("Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1398 "next-stream-iter-addr=%p" "candidate-stream-iter-addr=%p",
1399 youngest_stream_iter, candidate_stream_iter);
1400 }
1401 }
1402
1403 session_idx++;
1404 }
1405
1406 if (!youngest_stream_iter) {
1407 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1408 goto end;
1409 }
1410
1411 BT_ASSERT_DBG(youngest_stream_iter->current_msg);
1412 /* Ensure monotonicity. */
1413 BT_ASSERT_DBG(lttng_live_msg_iter->last_msg_ts_ns <=
1414 youngest_stream_iter->current_msg_ts_ns);
1415
1416 /*
1417 * Insert the next message to the message batch. This will set
1418 * stream iterator current messsage to NULL so that next time
1419 * we fetch the next message of that stream iterator
1420 */
1421 BT_MESSAGE_MOVE_REF(msgs[*count], youngest_stream_iter->current_msg);
1422 (*count)++;
1423
1424 /* Update the last timestamp in nanoseconds sent downstream. */
1425 lttng_live_msg_iter->last_msg_ts_ns = youngest_msg_ts_ns;
1426 youngest_stream_iter->current_msg_ts_ns = INT64_MAX;
1427
1428 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1429 }
1430 end:
1431 switch (stream_iter_status) {
1432 case LTTNG_LIVE_ITERATOR_STATUS_OK:
1433 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
1434 if (*count > 0) {
1435 /*
1436 * We received a again status but we have some messages
1437 * to send downstream. We send them and return OK for
1438 * now. On the next call we return again if there are
1439 * still no new message to send.
1440 */
1441 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
1442 } else {
1443 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_AGAIN;
1444 }
1445 break;
1446 case LTTNG_LIVE_ITERATOR_STATUS_END:
1447 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END;
1448 break;
1449 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
1450 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR;
1451 break;
1452 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
1453 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
1454 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
1455 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1456 /* Put all existing messages on error. */
1457 put_messages(msgs, *count);
1458 break;
1459 default:
1460 abort();
1461 }
1462
1463 no_session:
1464 return status;
1465 }
1466
1467 BT_HIDDEN
1468 bt_component_class_message_iterator_initialize_method_status lttng_live_msg_iter_init(
1469 bt_self_message_iterator *self_msg_it,
1470 bt_self_message_iterator_configuration *config,
1471 bt_self_component_source *self_comp_src,
1472 bt_self_component_port_output *self_port)
1473 {
1474 bt_component_class_message_iterator_initialize_method_status ret =
1475 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_OK;
1476 bt_self_component *self_comp =
1477 bt_self_component_source_as_self_component(self_comp_src);
1478 struct lttng_live_component *lttng_live;
1479 struct lttng_live_msg_iter *lttng_live_msg_iter;
1480 bt_logging_level log_level;
1481
1482 BT_ASSERT(self_msg_it);
1483
1484 lttng_live = bt_self_component_get_data(self_comp);
1485 log_level = lttng_live->log_level;
1486 self_comp = lttng_live->self_comp;
1487
1488 /* There can be only one downstream iterator at the same time. */
1489 BT_ASSERT(!lttng_live->has_msg_iter);
1490 lttng_live->has_msg_iter = true;
1491
1492 lttng_live_msg_iter = g_new0(struct lttng_live_msg_iter, 1);
1493 if (!lttng_live_msg_iter) {
1494 ret = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR;
1495 goto end;
1496 }
1497
1498 lttng_live_msg_iter->log_level = lttng_live->log_level;
1499 lttng_live_msg_iter->self_comp = lttng_live->self_comp;
1500 lttng_live_msg_iter->lttng_live_comp = lttng_live;
1501 lttng_live_msg_iter->self_msg_iter = self_msg_it;
1502
1503 lttng_live_msg_iter->active_stream_iter = 0;
1504 lttng_live_msg_iter->last_msg_ts_ns = INT64_MIN;
1505 lttng_live_msg_iter->sessions = g_ptr_array_new_with_free_func(
1506 (GDestroyNotify) lttng_live_destroy_session);
1507 BT_ASSERT(lttng_live_msg_iter->sessions);
1508
1509 lttng_live_msg_iter->viewer_connection =
1510 live_viewer_connection_create(lttng_live->params.url->str, false,
1511 lttng_live_msg_iter, self_comp, NULL, log_level);
1512 if (!lttng_live_msg_iter->viewer_connection) {
1513 goto error;
1514 }
1515
1516 if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
1517 goto error;
1518 }
1519 if (lttng_live_msg_iter->sessions->len == 0) {
1520 switch (lttng_live->params.sess_not_found_act) {
1521 case SESSION_NOT_FOUND_ACTION_CONTINUE:
1522 BT_COMP_LOGI("Unable to connect to the requested live viewer "
1523 "session. Keep trying to connect because of "
1524 "%s=\"%s\" component parameter: url=\"%s\"",
1525 SESS_NOT_FOUND_ACTION_PARAM,
1526 SESS_NOT_FOUND_ACTION_CONTINUE_STR,
1527 lttng_live->params.url->str);
1528 break;
1529 case SESSION_NOT_FOUND_ACTION_FAIL:
1530 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Unable to connect to the requested live viewer "
1531 "session. Fail the message iterator"
1532 "initialization because of %s=\"%s\" "
1533 "component parameter: url =\"%s\"",
1534 SESS_NOT_FOUND_ACTION_PARAM,
1535 SESS_NOT_FOUND_ACTION_FAIL_STR,
1536 lttng_live->params.url->str);
1537 goto error;
1538 case SESSION_NOT_FOUND_ACTION_END:
1539 BT_COMP_LOGI("Unable to connect to the requested live viewer "
1540 "session. End gracefully at the first _next() "
1541 "call because of %s=\"%s\" component parameter: "
1542 "url=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM,
1543 SESS_NOT_FOUND_ACTION_END_STR,
1544 lttng_live->params.url->str);
1545 break;
1546 default:
1547 abort();
1548 }
1549 }
1550
1551 bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
1552
1553 goto end;
1554 error:
1555 ret = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR;
1556 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
1557 end:
1558 return ret;
1559 }
1560
1561 static struct bt_param_validation_map_value_entry_descr list_sessions_params[] = {
1562 { URL_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY, { .type = BT_VALUE_TYPE_STRING } },
1563 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END
1564 };
1565
1566 static
1567 bt_component_class_query_method_status lttng_live_query_list_sessions(
1568 const bt_value *params, const bt_value **result,
1569 bt_self_component_class *self_comp_class,
1570 bt_logging_level log_level)
1571 {
1572 bt_component_class_query_method_status status;
1573 const bt_value *url_value = NULL;
1574 const char *url;
1575 struct live_viewer_connection *viewer_connection = NULL;
1576 enum bt_param_validation_status validation_status;
1577 gchar *validate_error = NULL;
1578
1579 validation_status = bt_param_validation_validate(params,
1580 list_sessions_params, &validate_error);
1581 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
1582 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1583 goto error;
1584 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
1585 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1586 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "%s",
1587 validate_error);
1588 goto error;
1589 }
1590
1591 url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
1592 url = bt_value_string_get(url_value);
1593
1594 viewer_connection = live_viewer_connection_create(url, true, NULL,
1595 NULL, self_comp_class, log_level);
1596 if (!viewer_connection) {
1597 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
1598 "Failed to create viewer connection");
1599 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1600 goto error;
1601 }
1602
1603 status = live_viewer_connection_list_sessions(viewer_connection,
1604 result);
1605 if (status != BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK) {
1606 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
1607 "Failed to list viewer sessions");
1608 goto error;
1609 }
1610
1611 goto end;
1612
1613 error:
1614 BT_VALUE_PUT_REF_AND_RESET(*result);
1615
1616 if (status >= 0) {
1617 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1618 }
1619
1620 end:
1621 if (viewer_connection) {
1622 live_viewer_connection_destroy(viewer_connection);
1623 }
1624
1625 g_free(validate_error);
1626
1627 return status;
1628 }
1629
1630 static
1631 bt_component_class_query_method_status lttng_live_query_support_info(
1632 const bt_value *params, const bt_value **result,
1633 bt_self_component_class *self_comp_class,
1634 bt_logging_level log_level)
1635 {
1636 bt_component_class_query_method_status status =
1637 BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
1638 const bt_value *input_type_value;
1639 const bt_value *input_value;
1640 double weight = 0;
1641 struct bt_common_lttng_live_url_parts parts = { 0 };
1642
1643 /* Used by the logging macros */
1644 __attribute__((unused)) bt_self_component *self_comp = NULL;
1645
1646 *result = NULL;
1647 input_type_value = bt_value_map_borrow_entry_value_const(params,
1648 "type");
1649 if (!input_type_value) {
1650 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
1651 "Missing expected `type` parameter.");
1652 goto error;
1653 }
1654
1655 if (!bt_value_is_string(input_type_value)) {
1656 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
1657 "`type` parameter is not a string value.");
1658 goto error;
1659 }
1660
1661 if (strcmp(bt_value_string_get(input_type_value), "string") != 0) {
1662 /* We don't handle file system paths */
1663 goto create_result;
1664 }
1665
1666 input_value = bt_value_map_borrow_entry_value_const(params, "input");
1667 if (!input_value) {
1668 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
1669 "Missing expected `input` parameter.");
1670 goto error;
1671 }
1672
1673 if (!bt_value_is_string(input_value)) {
1674 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
1675 "`input` parameter is not a string value.");
1676 goto error;
1677 }
1678
1679 parts = bt_common_parse_lttng_live_url(bt_value_string_get(input_value),
1680 NULL, 0);
1681 if (parts.session_name) {
1682 /*
1683 * Looks pretty much like an LTTng live URL: we got the
1684 * session name part, which forms a complete URL.
1685 */
1686 weight = .75;
1687 }
1688
1689 create_result:
1690 *result = bt_value_real_create_init(weight);
1691 if (!*result) {
1692 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1693 goto error;
1694 }
1695
1696 goto end;
1697
1698 error:
1699 if (status >= 0) {
1700 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1701 }
1702
1703 BT_ASSERT(!*result);
1704
1705 end:
1706 bt_common_destroy_lttng_live_url_parts(&parts);
1707 return status;
1708 }
1709
1710 BT_HIDDEN
1711 bt_component_class_query_method_status lttng_live_query(
1712 bt_self_component_class_source *comp_class,
1713 bt_private_query_executor *priv_query_exec,
1714 const char *object, const bt_value *params,
1715 __attribute__((unused)) void *method_data,
1716 const bt_value **result)
1717 {
1718 bt_component_class_query_method_status status =
1719 BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
1720 bt_self_component *self_comp = NULL;
1721 bt_self_component_class *self_comp_class =
1722 bt_self_component_class_source_as_self_component_class(comp_class);
1723 bt_logging_level log_level = bt_query_executor_get_logging_level(
1724 bt_private_query_executor_as_query_executor_const(
1725 priv_query_exec));
1726
1727 if (strcmp(object, "sessions") == 0) {
1728 status = lttng_live_query_list_sessions(params, result,
1729 self_comp_class, log_level);
1730 } else if (strcmp(object, "babeltrace.support-info") == 0) {
1731 status = lttng_live_query_support_info(params, result,
1732 self_comp_class, log_level);
1733 } else {
1734 BT_COMP_LOGI("Unknown query object `%s`", object);
1735 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT;
1736 goto end;
1737 }
1738
1739 end:
1740 return status;
1741 }
1742
1743 static
1744 void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
1745 {
1746 if (!lttng_live) {
1747 return;
1748 }
1749 if (lttng_live->params.url) {
1750 g_string_free(lttng_live->params.url, TRUE);
1751 }
1752 g_free(lttng_live);
1753 }
1754
1755 BT_HIDDEN
1756 void lttng_live_component_finalize(bt_self_component_source *component)
1757 {
1758 void *data = bt_self_component_get_data(
1759 bt_self_component_source_as_self_component(component));
1760
1761 if (!data) {
1762 return;
1763 }
1764 lttng_live_component_destroy_data(data);
1765 }
1766
1767 static
1768 enum session_not_found_action parse_session_not_found_action_param(
1769 const bt_value *no_session_param)
1770 {
1771 enum session_not_found_action action;
1772 const char *no_session_act_str = bt_value_string_get(no_session_param);
1773
1774 if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) {
1775 action = SESSION_NOT_FOUND_ACTION_CONTINUE;
1776 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) {
1777 action = SESSION_NOT_FOUND_ACTION_FAIL;
1778 } else {
1779 BT_ASSERT(strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0);
1780 action = SESSION_NOT_FOUND_ACTION_END;
1781 }
1782
1783 return action;
1784 }
1785
1786 static struct bt_param_validation_value_descr inputs_elem_descr = {
1787 .type = BT_VALUE_TYPE_STRING,
1788 };
1789
1790 static const char *sess_not_found_action_choices[] = {
1791 SESS_NOT_FOUND_ACTION_CONTINUE_STR,
1792 SESS_NOT_FOUND_ACTION_FAIL_STR,
1793 SESS_NOT_FOUND_ACTION_END_STR,
1794 };
1795
1796 static struct bt_param_validation_map_value_entry_descr params_descr[] = {
1797 { INPUTS_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY, { BT_VALUE_TYPE_ARRAY, .array = {
1798 .min_length = 1,
1799 .max_length = 1,
1800 .element_type = &inputs_elem_descr,
1801 } } },
1802 { SESS_NOT_FOUND_ACTION_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { BT_VALUE_TYPE_STRING, .string = {
1803 .choices = sess_not_found_action_choices,
1804 } } },
1805 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END
1806 };
1807
1808 static
1809 bt_component_class_initialize_method_status lttng_live_component_create(
1810 const bt_value *params,
1811 bt_logging_level log_level,
1812 bt_self_component *self_comp,
1813 struct lttng_live_component **component)
1814 {
1815 struct lttng_live_component *lttng_live = NULL;
1816 const bt_value *inputs_value;
1817 const bt_value *url_value;
1818 const bt_value *value;
1819 const char *url;
1820 enum bt_param_validation_status validation_status;
1821 gchar *validation_error = NULL;
1822 bt_component_class_initialize_method_status status;
1823
1824 validation_status = bt_param_validation_validate(params, params_descr,
1825 &validation_error);
1826 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
1827 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1828 goto error;
1829 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
1830 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "%s", validation_error);
1831 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1832 goto error;
1833 }
1834
1835 lttng_live = g_new0(struct lttng_live_component, 1);
1836 if (!lttng_live) {
1837 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1838 goto end;
1839 }
1840 lttng_live->log_level = log_level;
1841 lttng_live->self_comp = self_comp;
1842 lttng_live->max_query_size = MAX_QUERY_SIZE;
1843 lttng_live->has_msg_iter = false;
1844
1845 inputs_value =
1846 bt_value_map_borrow_entry_value_const(params, INPUTS_PARAM);
1847 url_value =
1848 bt_value_array_borrow_element_by_index_const(inputs_value, 0);
1849 url = bt_value_string_get(url_value);
1850
1851 lttng_live->params.url = g_string_new(url);
1852 if (!lttng_live->params.url) {
1853 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1854 goto error;
1855 }
1856
1857 value = bt_value_map_borrow_entry_value_const(params,
1858 SESS_NOT_FOUND_ACTION_PARAM);
1859 if (value) {
1860 lttng_live->params.sess_not_found_act =
1861 parse_session_not_found_action_param(value);
1862 } else {
1863 BT_COMP_LOGI("Optional `%s` parameter is missing: "
1864 "defaulting to `%s`.",
1865 SESS_NOT_FOUND_ACTION_PARAM,
1866 SESS_NOT_FOUND_ACTION_CONTINUE_STR);
1867 lttng_live->params.sess_not_found_act =
1868 SESSION_NOT_FOUND_ACTION_CONTINUE;
1869 }
1870
1871 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
1872 goto end;
1873
1874 error:
1875 lttng_live_component_destroy_data(lttng_live);
1876 lttng_live = NULL;
1877 end:
1878 g_free(validation_error);
1879
1880 *component = lttng_live;
1881 return status;
1882 }
1883
1884 BT_HIDDEN
1885 bt_component_class_initialize_method_status lttng_live_component_init(
1886 bt_self_component_source *self_comp_src,
1887 bt_self_component_source_configuration *config,
1888 const bt_value *params,
1889 __attribute__((unused)) void *init_method_data)
1890 {
1891 struct lttng_live_component *lttng_live;
1892 bt_component_class_initialize_method_status ret;
1893 bt_self_component *self_comp =
1894 bt_self_component_source_as_self_component(self_comp_src);
1895 bt_logging_level log_level = bt_component_get_logging_level(
1896 bt_self_component_as_component(self_comp));
1897 bt_self_component_add_port_status add_port_status;
1898
1899 ret = lttng_live_component_create(params, log_level, self_comp, &lttng_live);
1900 if (ret != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
1901 goto error;
1902 }
1903
1904 add_port_status = bt_self_component_source_add_output_port(
1905 self_comp_src, "out", NULL, NULL);
1906 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
1907 ret = (int) add_port_status;
1908 goto end;
1909 }
1910
1911 bt_self_component_set_data(self_comp, lttng_live);
1912 goto end;
1913
1914 error:
1915 lttng_live_component_destroy_data(lttng_live);
1916 lttng_live = NULL;
1917 end:
1918 return ret;
1919 }
This page took 0.132774 seconds and 4 git commands to generate.