4 * Babeltrace CTF LTTng-live Client Component
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
9 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 #include <babeltrace/ctf-ir/packet.h>
31 #include <babeltrace/graph/component-source.h>
32 #include <babeltrace/graph/private-port.h>
33 #include <babeltrace/graph/port.h>
34 #include <babeltrace/graph/private-component.h>
35 #include <babeltrace/graph/private-component-source.h>
36 #include <babeltrace/graph/private-notification-iterator.h>
37 #include <babeltrace/graph/notification-stream.h>
38 #include <babeltrace/graph/notification-packet.h>
39 #include <babeltrace/graph/notification-event.h>
40 #include <babeltrace/graph/notification-heap.h>
41 #include <babeltrace/graph/notification-iterator.h>
42 #include <babeltrace/graph/notification-inactivity.h>
43 #include <babeltrace/compiler-internal.h>
48 #include <plugins-common.h>
50 #define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE"
51 #define BT_LOGLEVEL_NAME "BABELTRACE_PLUGIN_CTF_LTTNG_LIVE_LOG_LEVEL"
53 #include "data-stream.h"
55 #include "lttng-live-internal.h"
57 #define MAX_QUERY_SIZE (256*1024)
59 #define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
61 static const char *print_state(struct lttng_live_stream_iterator
*s
)
64 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
:
65 return "ACTIVE_NO_DATA";
66 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
:
67 return "QUIESCENT_NO_DATA";
68 case LTTNG_LIVE_STREAM_QUIESCENT
:
70 case LTTNG_LIVE_STREAM_ACTIVE_DATA
:
72 case LTTNG_LIVE_STREAM_EOF
:
79 #define print_stream_state(stream) \
80 print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64, \
81 bt_port_get_name(bt_port_from_private_port(stream->port)), \
82 print_state(stream), stream->last_returned_inactivity_timestamp, \
83 stream->current_inactivity_timestamp)
86 int bt_lttng_live_log_level
= BT_LOG_NONE
;
89 int lttng_live_add_port(struct lttng_live_component
*lttng_live
,
90 struct lttng_live_stream_iterator
*stream_iter
)
93 struct bt_private_port
*private_port
;
94 char name
[STREAM_NAME_MAX_LEN
];
96 ret
= sprintf(name
, STREAM_NAME_PREFIX
"%" PRIu64
, stream_iter
->viewer_stream_id
);
98 strcpy(stream_iter
->name
, name
);
99 private_port
= bt_private_component_source_add_output_private_port(
100 lttng_live
->private_component
, name
, stream_iter
);
104 BT_LOGI("Added port %s", name
);
106 if (lttng_live
->no_stream_port
) {
107 ret
= bt_private_port_remove_from_component(lttng_live
->no_stream_port
);
111 BT_PUT(lttng_live
->no_stream_port
);
112 lttng_live
->no_stream_iter
->port
= NULL
;
114 stream_iter
->port
= private_port
;
119 int lttng_live_remove_port(struct lttng_live_component
*lttng_live
,
120 struct bt_private_port
*port
)
122 struct bt_component
*component
;
126 component
= bt_component_from_private_component(lttng_live
->private_component
);
127 nr_ports
= bt_component_source_get_output_port_count(component
);
133 assert(!lttng_live
->no_stream_port
);
134 lttng_live
->no_stream_port
=
135 bt_private_component_source_add_output_private_port(lttng_live
->private_component
,
136 "no-stream", lttng_live
->no_stream_iter
);
137 if (!lttng_live
->no_stream_port
) {
140 lttng_live
->no_stream_iter
->port
= lttng_live
->no_stream_port
;
142 ret
= bt_private_port_remove_from_component(port
);
150 struct lttng_live_trace
*lttng_live_find_trace(struct lttng_live_session
*session
,
153 struct lttng_live_trace
*trace
;
155 bt_list_for_each_entry(trace
, &session
->traces
, node
) {
156 if (trace
->id
== trace_id
) {
164 void lttng_live_destroy_trace(struct bt_object
*obj
)
166 struct lttng_live_trace
*trace
= container_of(obj
, struct lttng_live_trace
, obj
);
169 BT_LOGI("Destroy trace");
170 assert(bt_list_empty(&trace
->streams
));
171 bt_list_del(&trace
->node
);
173 retval
= bt_ctf_trace_set_is_static(trace
->trace
);
176 lttng_live_metadata_fini(trace
);
177 BT_PUT(trace
->cc_prio_map
);
182 struct lttng_live_trace
*lttng_live_create_trace(struct lttng_live_session
*session
,
185 struct lttng_live_trace
*trace
= NULL
;
187 trace
= g_new0(struct lttng_live_trace
, 1);
191 trace
->session
= session
;
192 trace
->id
= trace_id
;
193 BT_INIT_LIST_HEAD(&trace
->streams
);
194 trace
->new_metadata_needed
= true;
195 bt_list_add(&trace
->node
, &session
->traces
);
196 bt_object_init(&trace
->obj
, lttng_live_destroy_trace
);
197 BT_LOGI("Create trace");
207 struct lttng_live_trace
*lttng_live_ref_trace(struct lttng_live_session
*session
,
210 struct lttng_live_trace
*trace
;
212 trace
= lttng_live_find_trace(session
, trace_id
);
217 return lttng_live_create_trace(session
, trace_id
);
221 void lttng_live_unref_trace(struct lttng_live_trace
*trace
)
227 void lttng_live_close_trace_streams(struct lttng_live_trace
*trace
)
229 struct lttng_live_stream_iterator
*stream
, *s
;
231 bt_list_for_each_entry_safe(stream
, s
, &trace
->streams
, node
) {
232 lttng_live_stream_iterator_destroy(stream
);
234 lttng_live_metadata_fini(trace
);
238 int lttng_live_add_session(struct lttng_live_component
*lttng_live
, uint64_t session_id
)
241 struct lttng_live_session
*s
;
243 s
= g_new0(struct lttng_live_session
, 1);
249 BT_INIT_LIST_HEAD(&s
->traces
);
250 s
->lttng_live
= lttng_live
;
251 s
->new_streams_needed
= true;
253 BT_LOGI("Reading from session %" PRIu64
, s
->id
);
254 bt_list_add(&s
->node
, <tng_live
->sessions
);
257 BT_LOGE("Error adding session");
265 void lttng_live_destroy_session(struct lttng_live_session
*session
)
267 struct lttng_live_trace
*trace
, *t
;
269 BT_LOGI("Destroy session");
270 if (session
->id
!= -1ULL) {
271 if (lttng_live_detach_session(session
)) {
272 /* Old relayd cannot detach sessions. */
273 BT_LOGD("Unable to detach session %" PRIu64
,
278 bt_list_for_each_entry_safe(trace
, t
, &session
->traces
, node
) {
279 lttng_live_close_trace_streams(trace
);
281 bt_list_del(&session
->node
);
286 void lttng_live_iterator_finalize(struct bt_private_notification_iterator
*it
)
288 struct lttng_live_stream_iterator_generic
*s
=
289 bt_private_notification_iterator_get_user_data(it
);
292 case LIVE_STREAM_TYPE_NO_STREAM
:
294 /* Leave no_stream_iter in place when port is removed. */
297 case LIVE_STREAM_TYPE_STREAM
:
299 struct lttng_live_stream_iterator
*stream_iter
=
300 container_of(s
, struct lttng_live_stream_iterator
, p
);
302 lttng_live_stream_iterator_destroy(stream_iter
);
309 enum bt_ctf_lttng_live_iterator_status
lttng_live_iterator_next_check_stream_state(
310 struct lttng_live_component
*lttng_live
,
311 struct lttng_live_stream_iterator
*lttng_live_stream
)
313 switch (lttng_live_stream
->state
) {
314 case LTTNG_LIVE_STREAM_QUIESCENT
:
315 case LTTNG_LIVE_STREAM_ACTIVE_DATA
:
317 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
:
319 BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
321 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
:
323 BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
325 case LTTNG_LIVE_STREAM_EOF
:
328 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
332 * For active no data stream, fetch next data. It can be either:
333 * - quiescent: need to put it in the prio heap at quiescent end
335 * - have data: need to wire up first event into the prio heap,
336 * - have no data on this stream at this point: need to retry (AGAIN) or
340 enum bt_ctf_lttng_live_iterator_status
lttng_live_iterator_next_handle_one_no_data_stream(
341 struct lttng_live_component
*lttng_live
,
342 struct lttng_live_stream_iterator
*lttng_live_stream
)
344 enum bt_ctf_lttng_live_iterator_status ret
=
345 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
346 struct packet_index index
;
347 enum lttng_live_stream_state orig_state
= lttng_live_stream
->state
;
349 if (lttng_live_stream
->trace
->new_metadata_needed
) {
350 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
353 if (lttng_live_stream
->trace
->session
->new_streams_needed
) {
354 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
357 if (lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
358 && lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
) {
361 ret
= lttng_live_get_next_index(lttng_live
, lttng_live_stream
, &index
);
362 if (ret
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
365 assert(lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_EOF
);
366 if (lttng_live_stream
->state
== LTTNG_LIVE_STREAM_QUIESCENT
) {
367 if (orig_state
== LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
368 && lttng_live_stream
->last_returned_inactivity_timestamp
==
369 lttng_live_stream
->current_inactivity_timestamp
) {
370 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN
;
371 print_stream_state(lttng_live_stream
);
373 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
377 lttng_live_stream
->base_offset
= index
.offset
;
378 lttng_live_stream
->offset
= index
.offset
;
379 lttng_live_stream
->len
= index
.packet_size
/ CHAR_BIT
;
381 if (ret
== BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
382 ret
= lttng_live_iterator_next_check_stream_state(
383 lttng_live
, lttng_live_stream
);
389 * Creation of the notification requires the ctf trace to be created
390 * beforehand, but the live protocol gives us all streams (including
391 * metadata) at once. So we split it in three steps: getting streams,
392 * getting metadata (which creates the ctf trace), and then creating the
393 * per-stream notifications.
396 enum bt_ctf_lttng_live_iterator_status
lttng_live_get_session(
397 struct lttng_live_component
*lttng_live
,
398 struct lttng_live_session
*session
)
400 enum bt_ctf_lttng_live_iterator_status status
;
401 struct lttng_live_trace
*trace
, *t
;
403 if (lttng_live_attach_session(session
)) {
404 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
;
406 status
= lttng_live_get_new_streams(session
);
407 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
&&
408 status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
) {
411 bt_list_for_each_entry_safe(trace
, t
, &session
->traces
, node
) {
412 status
= lttng_live_metadata_update(trace
);
413 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
&&
414 status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
) {
418 return lttng_live_lazy_notif_init(session
);
422 void lttng_live_need_new_streams(struct lttng_live_component
*lttng_live
)
424 struct lttng_live_session
*session
;
426 bt_list_for_each_entry(session
, <tng_live
->sessions
, node
) {
427 session
->new_streams_needed
= true;
432 void lttng_live_force_new_streams_and_metadata(struct lttng_live_component
*lttng_live
)
434 struct lttng_live_session
*session
;
436 bt_list_for_each_entry(session
, <tng_live
->sessions
, node
) {
437 struct lttng_live_trace
*trace
;
439 session
->new_streams_needed
= true;
440 bt_list_for_each_entry(trace
, &session
->traces
, node
) {
441 trace
->new_metadata_needed
= true;
447 enum bt_notification_iterator_status
lttng_live_iterator_next_handle_new_streams_and_metadata(
448 struct lttng_live_component
*lttng_live
)
450 enum bt_ctf_lttng_live_iterator_status ret
=
451 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
452 unsigned int nr_sessions_opened
= 0;
453 struct lttng_live_session
*session
, *s
;
455 bt_list_for_each_entry_safe(session
, s
, <tng_live
->sessions
, node
) {
456 if (session
->closed
&& bt_list_empty(&session
->traces
)) {
457 lttng_live_destroy_session(session
);
461 * Currently, when there are no sessions, we quit immediately.
462 * We may want to add a component parameter to keep trying until
463 * we get data in the future.
464 * Also, in a remotely distant future, we could add a "new
465 * session" flag to the protocol, which would tell us that we
466 * need to query for new sessions even though we have sessions
469 if (bt_list_empty(<tng_live
->sessions
)) {
470 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
;
473 bt_list_for_each_entry(session
, <tng_live
->sessions
, node
) {
474 ret
= lttng_live_get_session(lttng_live
, session
);
476 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
:
478 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
:
479 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
484 if (!session
->closed
) {
485 nr_sessions_opened
++;
489 if (ret
== BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
&& !nr_sessions_opened
) {
490 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
;
496 enum bt_ctf_lttng_live_iterator_status
emit_inactivity_notification(
497 struct lttng_live_component
*lttng_live
,
498 struct lttng_live_stream_iterator
*lttng_live_stream
,
499 struct bt_notification
**notification
,
502 enum bt_ctf_lttng_live_iterator_status ret
=
503 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
504 struct lttng_live_trace
*trace
;
505 struct bt_ctf_clock_class
*clock_class
= NULL
;
506 struct bt_ctf_clock_value
*clock_value
= NULL
;
507 struct bt_notification
*notif
= NULL
;
510 trace
= lttng_live_stream
->trace
;
514 clock_class
= bt_clock_class_priority_map_get_clock_class_by_index(trace
->cc_prio_map
, 0);
518 clock_value
= bt_ctf_clock_value_create(clock_class
, timestamp
);
522 notif
= bt_notification_inactivity_create(trace
->cc_prio_map
);
526 retval
= bt_notification_inactivity_set_clock_value(notif
, clock_value
);
530 *notification
= notif
;
537 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
;
543 enum bt_ctf_lttng_live_iterator_status
lttng_live_iterator_next_handle_one_quiescent_stream(
544 struct lttng_live_component
*lttng_live
,
545 struct lttng_live_stream_iterator
*lttng_live_stream
,
546 struct bt_notification
**notification
)
548 enum bt_ctf_lttng_live_iterator_status ret
=
549 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
550 struct bt_ctf_clock_class
*clock_class
= NULL
;
551 struct bt_ctf_clock_value
*clock_value
= NULL
;
553 if (lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_QUIESCENT
) {
554 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
557 if (lttng_live_stream
->current_inactivity_timestamp
==
558 lttng_live_stream
->last_returned_inactivity_timestamp
) {
559 lttng_live_stream
->state
= LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
;
560 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
564 ret
= emit_inactivity_notification(lttng_live
, lttng_live_stream
, notification
,
565 (uint64_t) lttng_live_stream
->current_inactivity_timestamp
);
567 lttng_live_stream
->last_returned_inactivity_timestamp
=
568 lttng_live_stream
->current_inactivity_timestamp
;
576 enum bt_ctf_lttng_live_iterator_status
lttng_live_iterator_next_handle_one_active_data_stream(
577 struct lttng_live_component
*lttng_live
,
578 struct lttng_live_stream_iterator
*lttng_live_stream
,
579 struct bt_notification
**notification
)
581 enum bt_ctf_lttng_live_iterator_status ret
=
582 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
583 enum bt_ctf_notif_iter_status status
;
584 struct lttng_live_session
*session
;
586 bt_list_for_each_entry(session
, <tng_live
->sessions
, node
) {
587 struct lttng_live_trace
*trace
;
589 if (session
->new_streams_needed
) {
590 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
592 bt_list_for_each_entry(trace
, &session
->traces
, node
) {
593 if (trace
->new_metadata_needed
) {
594 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
599 if (lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_ACTIVE_DATA
) {
600 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
;
602 if (lttng_live_stream
->packet_end_notif_queue
) {
603 *notification
= lttng_live_stream
->packet_end_notif_queue
;
604 lttng_live_stream
->packet_end_notif_queue
= NULL
;
605 status
= BT_CTF_NOTIF_ITER_STATUS_OK
;
607 status
= bt_ctf_notif_iter_get_next_notification(
608 lttng_live_stream
->notif_iter
,
609 lttng_live_stream
->trace
->cc_prio_map
,
611 if (status
== BT_CTF_NOTIF_ITER_STATUS_OK
) {
613 * Consider empty packets as inactivity.
615 if (bt_notification_get_type(*notification
) == BT_NOTIFICATION_TYPE_PACKET_END
) {
616 lttng_live_stream
->packet_end_notif_queue
= *notification
;
617 *notification
= NULL
;
618 return emit_inactivity_notification(lttng_live
,
619 lttng_live_stream
, notification
,
620 lttng_live_stream
->current_packet_end_timestamp
);
625 case BT_CTF_NOTIF_ITER_STATUS_EOF
:
626 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
;
628 case BT_CTF_NOTIF_ITER_STATUS_OK
:
629 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
631 case BT_CTF_NOTIF_ITER_STATUS_AGAIN
:
633 * Continue immediately (end of packet). The next
634 * get_index may return AGAIN to delay the following
637 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
639 case BT_CTF_NOTIF_ITER_STATUS_INVAL
:
640 /* No argument provided by the user, so don't return INVAL. */
641 case BT_CTF_NOTIF_ITER_STATUS_ERROR
:
643 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
;
651 * handle_no_data_streams()
653 * - for each ACTIVE_NO_DATA stream:
654 * - query relayd for stream data, or quiescence info.
655 * - if need metadata, get metadata, goto retry.
656 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
657 * - if quiescent, move to QUIESCENT streams
658 * - if fetched data, move to ACTIVE_DATA streams
659 * (at this point each stream either has data, or is quiescent)
663 * handle_new_streams_and_metadata()
664 * - query relayd for known streams, add them as ACTIVE_NO_DATA
665 * - query relayd for metadata
667 * call handle_active_no_data_streams()
669 * handle_quiescent_streams()
670 * - if at least one stream is ACTIVE_DATA:
671 * - peek stream event with lowest timestamp -> next_ts
672 * - for each quiescent stream
673 * - if next_ts >= quiescent end
674 * - set state to ACTIVE_NO_DATA
676 * - for each quiescent stream
677 * - set state to ACTIVE_NO_DATA
679 * call handle_active_no_data_streams()
681 * handle_active_data_streams()
682 * - if at least one stream is ACTIVE_DATA:
683 * - get stream event with lowest timestamp from heap
684 * - make that stream event the current notification.
685 * - move this stream heap position to its next event
686 * - if we need to fetch data from relayd, move
687 * stream to ACTIVE_NO_DATA.
691 * end criterion: ctrl-c on client. If relayd exits or the session
692 * closes on the relay daemon side, we keep on waiting for streams.
693 * Eventually handle --end timestamp (also an end criterion).
695 * When disconnected from relayd: try to re-connect endlessly.
698 struct bt_notification_iterator_next_return
lttng_live_iterator_next_stream(
699 struct bt_private_notification_iterator
*iterator
,
700 struct lttng_live_stream_iterator
*stream_iter
)
702 enum bt_ctf_lttng_live_iterator_status status
;
703 struct bt_notification_iterator_next_return next_return
;
704 struct lttng_live_component
*lttng_live
;
706 lttng_live
= stream_iter
->trace
->session
->lttng_live
;
708 print_stream_state(stream_iter
);
709 next_return
.notification
= NULL
;
710 status
= lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live
);
711 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
714 status
= lttng_live_iterator_next_handle_one_no_data_stream(
715 lttng_live
, stream_iter
);
716 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
719 status
= lttng_live_iterator_next_handle_one_quiescent_stream(
720 lttng_live
, stream_iter
, &next_return
.notification
);
721 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
722 assert(next_return
.notification
== NULL
);
725 if (next_return
.notification
) {
728 status
= lttng_live_iterator_next_handle_one_active_data_stream(lttng_live
,
729 stream_iter
, &next_return
.notification
);
730 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
731 assert(next_return
.notification
== NULL
);
736 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
:
737 print_dbg("continue");
739 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN
:
740 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_AGAIN
;
743 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
:
744 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_END
;
747 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
:
748 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_OK
;
751 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL
:
752 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_INVALID
;
754 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM
:
755 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_NOMEM
;
757 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED
:
758 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED
;
760 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
:
761 default: /* fall-through */
762 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
769 struct bt_notification_iterator_next_return
lttng_live_iterator_next_no_stream(
770 struct bt_private_notification_iterator
*iterator
,
771 struct lttng_live_no_stream_iterator
*no_stream_iter
)
773 enum bt_ctf_lttng_live_iterator_status status
;
774 struct bt_notification_iterator_next_return next_return
;
775 struct lttng_live_component
*lttng_live
;
777 lttng_live
= no_stream_iter
->lttng_live
;
779 lttng_live_force_new_streams_and_metadata(lttng_live
);
780 status
= lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live
);
781 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
784 if (no_stream_iter
->port
) {
785 status
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN
;
787 status
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
;
791 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
:
793 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN
:
794 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_AGAIN
;
796 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
:
797 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_END
;
799 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
:
800 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_OK
;
802 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL
:
803 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_INVALID
;
805 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM
:
806 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_NOMEM
;
808 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED
:
809 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED
;
811 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
:
812 default: /* fall-through */
813 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
820 struct bt_notification_iterator_next_return
lttng_live_iterator_next(
821 struct bt_private_notification_iterator
*iterator
)
823 struct lttng_live_stream_iterator_generic
*s
=
824 bt_private_notification_iterator_get_user_data(iterator
);
825 struct bt_notification_iterator_next_return next_return
;
828 case LIVE_STREAM_TYPE_NO_STREAM
:
829 next_return
= lttng_live_iterator_next_no_stream(iterator
,
830 container_of(s
, struct lttng_live_no_stream_iterator
, p
));
832 case LIVE_STREAM_TYPE_STREAM
:
833 next_return
= lttng_live_iterator_next_stream(iterator
,
834 container_of(s
, struct lttng_live_stream_iterator
, p
));
837 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
844 enum bt_notification_iterator_status
lttng_live_iterator_init(
845 struct bt_private_notification_iterator
*it
,
846 struct bt_private_port
*port
)
848 enum bt_notification_iterator_status ret
=
849 BT_NOTIFICATION_ITERATOR_STATUS_OK
;
850 struct lttng_live_stream_iterator_generic
*s
;
854 s
= bt_private_port_get_user_data(port
);
857 case LIVE_STREAM_TYPE_NO_STREAM
:
859 struct lttng_live_no_stream_iterator
*no_stream_iter
=
860 container_of(s
, struct lttng_live_no_stream_iterator
, p
);
861 ret
= bt_private_notification_iterator_set_user_data(it
, no_stream_iter
);
867 case LIVE_STREAM_TYPE_STREAM
:
869 struct lttng_live_stream_iterator
*stream_iter
=
870 container_of(s
, struct lttng_live_stream_iterator
, p
);
871 ret
= bt_private_notification_iterator_set_user_data(it
, stream_iter
);
878 ret
= BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
885 if (bt_private_notification_iterator_set_user_data(it
, NULL
)
886 != BT_NOTIFICATION_ITERATOR_STATUS_OK
) {
887 BT_LOGE("Error setting private data to NULL");
893 struct bt_value
*lttng_live_query_list_sessions(struct bt_component_class
*comp_class
,
894 struct bt_value
*params
)
896 struct bt_value
*url_value
= NULL
;
897 struct bt_value
*results
= NULL
;
899 struct bt_live_viewer_connection
*viewer_connection
= NULL
;
900 enum bt_value_status ret
;
902 url_value
= bt_value_map_get(params
, "url");
903 if (!url_value
|| bt_value_is_null(url_value
) || !bt_value_is_string(url_value
)) {
904 BT_LOGW("Mandatory \"url\" parameter missing");
908 ret
= bt_value_string_get(url_value
, &url
);
909 if (ret
!= BT_VALUE_STATUS_OK
) {
910 BT_LOGW("\"url\" parameter is required to be a string value");
914 viewer_connection
= bt_live_viewer_connection_create(url
, stderr
);
915 if (!viewer_connection
) {
916 ret
= BT_COMPONENT_STATUS_NOMEM
;
920 results
= bt_live_viewer_connection_list_sessions(viewer_connection
);
925 if (viewer_connection
) {
926 bt_live_viewer_connection_destroy(viewer_connection
);
933 struct bt_value
*lttng_live_query(struct bt_component_class
*comp_class
,
934 const char *object
, struct bt_value
*params
)
936 if (strcmp(object
, "sessions") == 0) {
937 return lttng_live_query_list_sessions(comp_class
,
940 BT_LOGW("Unknown query object `%s`", object
);
945 void lttng_live_component_destroy_data(struct lttng_live_component
*lttng_live
)
948 struct lttng_live_session
*session
, *s
;
950 bt_list_for_each_entry_safe(session
, s
, <tng_live
->sessions
, node
) {
951 lttng_live_destroy_session(session
);
953 BT_PUT(lttng_live
->viewer_connection
);
954 if (lttng_live
->url
) {
955 g_string_free(lttng_live
->url
, TRUE
);
957 if (lttng_live
->no_stream_port
) {
958 ret
= bt_private_port_remove_from_component(lttng_live
->no_stream_port
);
960 BT_PUT(lttng_live
->no_stream_port
);
962 if (lttng_live
->no_stream_iter
) {
963 g_free(lttng_live
->no_stream_iter
);
969 void lttng_live_component_finalize(struct bt_private_component
*component
)
971 void *data
= bt_private_component_get_user_data(component
);
976 lttng_live_component_destroy_data(data
);
980 struct lttng_live_component
*lttng_live_component_create(struct bt_value
*params
,
981 struct bt_private_component
*private_component
)
983 struct lttng_live_component
*lttng_live
;
984 struct bt_value
*value
= NULL
;
986 enum bt_value_status ret
;
988 lttng_live
= g_new0(struct lttng_live_component
, 1);
992 /* TODO: make this an overridable parameter. */
993 lttng_live
->max_query_size
= MAX_QUERY_SIZE
;
994 BT_INIT_LIST_HEAD(<tng_live
->sessions
);
995 value
= bt_value_map_get(params
, "url");
996 if (!value
|| bt_value_is_null(value
) || !bt_value_is_string(value
)) {
997 BT_LOGW("Mandatory \"url\" parameter missing");
1000 ret
= bt_value_string_get(value
, &url
);
1001 if (ret
!= BT_VALUE_STATUS_OK
) {
1002 BT_LOGW("\"url\" parameter is required to be a string value");
1005 lttng_live
->url
= g_string_new(url
);
1006 if (!lttng_live
->url
) {
1009 lttng_live
->viewer_connection
=
1010 bt_live_viewer_connection_create(lttng_live
->url
->str
,
1012 if (!lttng_live
->viewer_connection
) {
1013 ret
= BT_COMPONENT_STATUS_NOMEM
;
1016 if (lttng_live_create_viewer_session(lttng_live
)) {
1017 ret
= BT_COMPONENT_STATUS_ERROR
;
1020 lttng_live
->private_component
= private_component
;
1025 lttng_live_component_destroy_data(lttng_live
);
1032 enum bt_component_status
lttng_live_component_init(struct bt_private_component
*component
,
1033 struct bt_value
*params
, void *init_method_data
)
1035 struct lttng_live_component
*lttng_live
;
1036 enum bt_component_status ret
= BT_COMPONENT_STATUS_OK
;
1038 /* Passes ownership of iter ref to lttng_live_component_create. */
1039 lttng_live
= lttng_live_component_create(params
, component
);
1041 ret
= BT_COMPONENT_STATUS_NOMEM
;
1045 lttng_live
->no_stream_iter
= g_new0(struct lttng_live_no_stream_iterator
, 1);
1046 lttng_live
->no_stream_iter
->p
.type
= LIVE_STREAM_TYPE_NO_STREAM
;
1047 lttng_live
->no_stream_iter
->lttng_live
= lttng_live
;
1049 lttng_live
->no_stream_port
=
1050 bt_private_component_source_add_output_private_port(
1051 lttng_live
->private_component
, "no-stream",
1052 lttng_live
->no_stream_iter
);
1053 lttng_live
->no_stream_iter
->port
= lttng_live
->no_stream_port
;
1055 ret
= bt_private_component_set_user_data(component
, lttng_live
);
1056 if (ret
!= BT_COMPONENT_STATUS_OK
) {
1063 (void) bt_private_component_set_user_data(component
, NULL
);
1064 lttng_live_component_destroy_data(lttng_live
);
1069 enum bt_component_status
lttng_live_accept_port_connection(
1070 struct bt_private_component
*private_component
,
1071 struct bt_private_port
*self_private_port
,
1072 struct bt_port
*other_port
)
1074 struct lttng_live_component
*lttng_live
=
1075 bt_private_component_get_user_data(private_component
);
1076 struct bt_component
*other_component
;
1077 enum bt_component_status status
= BT_COMPONENT_STATUS_OK
;
1078 struct bt_port
*self_port
= bt_port_from_private_port(self_private_port
);
1080 other_component
= bt_port_get_component(other_port
);
1081 bt_put(other_component
); /* weak */
1083 if (!lttng_live
->downstream_component
) {
1084 lttng_live
->downstream_component
= other_component
;
1089 * Compare prior component to ensure we are connected to the
1090 * same downstream component as prior ports.
1092 if (lttng_live
->downstream_component
!= other_component
) {
1093 BT_LOGW("Cannot connect ctf.lttng-live component port \"%s\" to component \"%s\": already connected to component \"%s\".",
1094 bt_port_get_name(self_port
),
1095 bt_component_get_name(other_component
),
1096 bt_component_get_name(lttng_live
->downstream_component
));
1097 status
= BT_COMPONENT_STATUS_REFUSE_PORT_CONNECTION
;
1106 void __attribute__((constructor
)) bt_lttng_live_logging_ctor(void)
1108 enum bt_logging_level log_level
= BT_LOG_NONE
;
1109 const char *log_level_env
= getenv(BT_LOGLEVEL_NAME
);
1111 if (!log_level_env
) {
1115 if (strcmp(log_level_env
, "VERBOSE") == 0) {
1116 log_level
= BT_LOGGING_LEVEL_VERBOSE
;
1117 } else if (strcmp(log_level_env
, "DEBUG") == 0) {
1118 log_level
= BT_LOGGING_LEVEL_DEBUG
;
1119 } else if (strcmp(log_level_env
, "INFO") == 0) {
1120 log_level
= BT_LOGGING_LEVEL_INFO
;
1121 } else if (strcmp(log_level_env
, "WARN") == 0) {
1122 log_level
= BT_LOGGING_LEVEL_WARN
;
1123 } else if (strcmp(log_level_env
, "ERROR") == 0) {
1124 log_level
= BT_LOGGING_LEVEL_ERROR
;
1125 } else if (strcmp(log_level_env
, "FATAL") == 0) {
1126 log_level
= BT_LOGGING_LEVEL_FATAL
;
1128 bt_lttng_live_log_level
= BT_LOGGING_LEVEL_FATAL
;
1129 BT_LOGF("Incorrect log level specified in %s",
1134 bt_lttng_live_log_level
= log_level
;