4 * Babeltrace CTF LTTng-live Client Component
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
9 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 #define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC"
33 #include <babeltrace/ctf-ir/packet.h>
34 #include <babeltrace/graph/component-source.h>
35 #include <babeltrace/graph/private-port.h>
36 #include <babeltrace/graph/port.h>
37 #include <babeltrace/graph/private-component.h>
38 #include <babeltrace/graph/private-component-source.h>
39 #include <babeltrace/graph/private-notification-iterator.h>
40 #include <babeltrace/graph/notification-stream.h>
41 #include <babeltrace/graph/notification-packet.h>
42 #include <babeltrace/graph/notification-event.h>
43 #include <babeltrace/graph/notification-heap.h>
44 #include <babeltrace/graph/notification-iterator.h>
45 #include <babeltrace/graph/notification-inactivity.h>
46 #include <babeltrace/graph/graph.h>
47 #include <babeltrace/compiler-internal.h>
52 #include <plugins-common.h>
54 #include "data-stream.h"
56 #include "lttng-live-internal.h"
58 #define MAX_QUERY_SIZE (256*1024)
60 #define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
62 static const char *print_state(struct lttng_live_stream_iterator
*s
)
65 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
:
66 return "ACTIVE_NO_DATA";
67 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
:
68 return "QUIESCENT_NO_DATA";
69 case LTTNG_LIVE_STREAM_QUIESCENT
:
71 case LTTNG_LIVE_STREAM_ACTIVE_DATA
:
73 case LTTNG_LIVE_STREAM_EOF
:
80 #define print_stream_state(stream) \
81 print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64, \
82 bt_port_get_name(bt_port_from_private_port(stream->port)), \
83 print_state(stream), stream->last_returned_inactivity_timestamp, \
84 stream->current_inactivity_timestamp)
87 int lttng_live_add_port(struct lttng_live_component
*lttng_live
,
88 struct lttng_live_stream_iterator
*stream_iter
)
91 struct bt_private_port
*private_port
;
92 char name
[STREAM_NAME_MAX_LEN
];
94 ret
= sprintf(name
, STREAM_NAME_PREFIX
"%" PRIu64
, stream_iter
->viewer_stream_id
);
96 strcpy(stream_iter
->name
, name
);
97 ret
= bt_private_component_source_add_output_private_port(
98 lttng_live
->private_component
, name
, stream_iter
,
103 BT_LOGI("Added port %s", name
);
105 if (lttng_live
->no_stream_port
) {
106 ret
= bt_private_port_remove_from_component(lttng_live
->no_stream_port
);
110 BT_PUT(lttng_live
->no_stream_port
);
111 lttng_live
->no_stream_iter
->port
= NULL
;
113 stream_iter
->port
= private_port
;
118 int lttng_live_remove_port(struct lttng_live_component
*lttng_live
,
119 struct bt_private_port
*port
)
121 struct bt_component
*component
;
125 component
= bt_component_from_private_component(lttng_live
->private_component
);
126 nr_ports
= bt_component_source_get_output_port_count(component
);
132 assert(!lttng_live
->no_stream_port
);
133 ret
= bt_private_component_source_add_output_private_port(lttng_live
->private_component
,
134 "no-stream", lttng_live
->no_stream_iter
,
135 <tng_live
->no_stream_port
);
139 lttng_live
->no_stream_iter
->port
= lttng_live
->no_stream_port
;
141 ret
= bt_private_port_remove_from_component(port
);
149 struct lttng_live_trace
*lttng_live_find_trace(struct lttng_live_session
*session
,
152 struct lttng_live_trace
*trace
;
154 bt_list_for_each_entry(trace
, &session
->traces
, node
) {
155 if (trace
->id
== trace_id
) {
163 void lttng_live_destroy_trace(struct bt_object
*obj
)
165 struct lttng_live_trace
*trace
= container_of(obj
, struct lttng_live_trace
, obj
);
168 BT_LOGI("Destroy trace");
169 assert(bt_list_empty(&trace
->streams
));
170 bt_list_del(&trace
->node
);
172 retval
= bt_ctf_trace_set_is_static(trace
->trace
);
175 lttng_live_metadata_fini(trace
);
176 BT_PUT(trace
->cc_prio_map
);
181 struct lttng_live_trace
*lttng_live_create_trace(struct lttng_live_session
*session
,
184 struct lttng_live_trace
*trace
= NULL
;
186 trace
= g_new0(struct lttng_live_trace
, 1);
190 trace
->session
= session
;
191 trace
->id
= trace_id
;
192 BT_INIT_LIST_HEAD(&trace
->streams
);
193 trace
->new_metadata_needed
= true;
194 bt_list_add(&trace
->node
, &session
->traces
);
195 bt_object_init(&trace
->obj
, lttng_live_destroy_trace
);
196 BT_LOGI("Create trace");
206 struct lttng_live_trace
*lttng_live_ref_trace(struct lttng_live_session
*session
,
209 struct lttng_live_trace
*trace
;
211 trace
= lttng_live_find_trace(session
, trace_id
);
216 return lttng_live_create_trace(session
, trace_id
);
220 void lttng_live_unref_trace(struct lttng_live_trace
*trace
)
226 void lttng_live_close_trace_streams(struct lttng_live_trace
*trace
)
228 struct lttng_live_stream_iterator
*stream
, *s
;
230 bt_list_for_each_entry_safe(stream
, s
, &trace
->streams
, node
) {
231 lttng_live_stream_iterator_destroy(stream
);
233 lttng_live_metadata_fini(trace
);
237 int lttng_live_add_session(struct lttng_live_component
*lttng_live
,
238 uint64_t session_id
, const char *hostname
,
239 const char *session_name
)
242 struct lttng_live_session
*s
;
244 s
= g_new0(struct lttng_live_session
, 1);
250 BT_INIT_LIST_HEAD(&s
->traces
);
251 s
->lttng_live
= lttng_live
;
252 s
->new_streams_needed
= true;
253 s
->hostname
= g_string_new(hostname
);
254 s
->session_name
= g_string_new(session_name
);
256 BT_LOGI("Reading from session: %" PRIu64
" hostname: %s session_name: %s",
257 s
->id
, hostname
, session_name
);
258 bt_list_add(&s
->node
, <tng_live
->sessions
);
261 BT_LOGE("Error adding session");
269 void lttng_live_destroy_session(struct lttng_live_session
*session
)
271 struct lttng_live_trace
*trace
, *t
;
273 BT_LOGI("Destroy session");
274 if (session
->id
!= -1ULL) {
275 if (lttng_live_detach_session(session
)) {
276 if (!bt_graph_is_canceled(session
->lttng_live
->graph
)) {
277 /* Old relayd cannot detach sessions. */
278 BT_LOGD("Unable to detach session %" PRIu64
,
284 bt_list_for_each_entry_safe(trace
, t
, &session
->traces
, node
) {
285 lttng_live_close_trace_streams(trace
);
287 bt_list_del(&session
->node
);
288 if (session
->hostname
) {
289 g_string_free(session
->hostname
, TRUE
);
291 if (session
->session_name
) {
292 g_string_free(session
->session_name
, TRUE
);
298 void lttng_live_iterator_finalize(struct bt_private_notification_iterator
*it
)
300 struct lttng_live_stream_iterator_generic
*s
=
301 bt_private_notification_iterator_get_user_data(it
);
304 case LIVE_STREAM_TYPE_NO_STREAM
:
306 /* Leave no_stream_iter in place when port is removed. */
309 case LIVE_STREAM_TYPE_STREAM
:
311 struct lttng_live_stream_iterator
*stream_iter
=
312 container_of(s
, struct lttng_live_stream_iterator
, p
);
314 lttng_live_stream_iterator_destroy(stream_iter
);
321 enum bt_ctf_lttng_live_iterator_status
lttng_live_iterator_next_check_stream_state(
322 struct lttng_live_component
*lttng_live
,
323 struct lttng_live_stream_iterator
*lttng_live_stream
)
325 switch (lttng_live_stream
->state
) {
326 case LTTNG_LIVE_STREAM_QUIESCENT
:
327 case LTTNG_LIVE_STREAM_ACTIVE_DATA
:
329 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
:
331 BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
333 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
:
335 BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
337 case LTTNG_LIVE_STREAM_EOF
:
340 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
344 * For active no data stream, fetch next data. It can be either:
345 * - quiescent: need to put it in the prio heap at quiescent end
347 * - have data: need to wire up first event into the prio heap,
348 * - have no data on this stream at this point: need to retry (AGAIN) or
352 enum bt_ctf_lttng_live_iterator_status
lttng_live_iterator_next_handle_one_no_data_stream(
353 struct lttng_live_component
*lttng_live
,
354 struct lttng_live_stream_iterator
*lttng_live_stream
)
356 enum bt_ctf_lttng_live_iterator_status ret
=
357 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
358 struct packet_index index
;
359 enum lttng_live_stream_state orig_state
= lttng_live_stream
->state
;
361 if (lttng_live_stream
->trace
->new_metadata_needed
) {
362 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
365 if (lttng_live_stream
->trace
->session
->new_streams_needed
) {
366 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
369 if (lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
370 && lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
) {
373 ret
= lttng_live_get_next_index(lttng_live
, lttng_live_stream
, &index
);
374 if (ret
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
377 assert(lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_EOF
);
378 if (lttng_live_stream
->state
== LTTNG_LIVE_STREAM_QUIESCENT
) {
379 if (orig_state
== LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
380 && lttng_live_stream
->last_returned_inactivity_timestamp
==
381 lttng_live_stream
->current_inactivity_timestamp
) {
382 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN
;
383 print_stream_state(lttng_live_stream
);
385 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
389 lttng_live_stream
->base_offset
= index
.offset
;
390 lttng_live_stream
->offset
= index
.offset
;
391 lttng_live_stream
->len
= index
.packet_size
/ CHAR_BIT
;
393 if (ret
== BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
394 ret
= lttng_live_iterator_next_check_stream_state(
395 lttng_live
, lttng_live_stream
);
401 * Creation of the notification requires the ctf trace to be created
402 * beforehand, but the live protocol gives us all streams (including
403 * metadata) at once. So we split it in three steps: getting streams,
404 * getting metadata (which creates the ctf trace), and then creating the
405 * per-stream notifications.
408 enum bt_ctf_lttng_live_iterator_status
lttng_live_get_session(
409 struct lttng_live_component
*lttng_live
,
410 struct lttng_live_session
*session
)
412 enum bt_ctf_lttng_live_iterator_status status
;
413 struct lttng_live_trace
*trace
, *t
;
415 if (lttng_live_attach_session(session
)) {
416 if (bt_graph_is_canceled(lttng_live
->graph
)) {
417 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN
;
419 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
;
422 status
= lttng_live_get_new_streams(session
);
423 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
&&
424 status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
) {
427 bt_list_for_each_entry_safe(trace
, t
, &session
->traces
, node
) {
428 status
= lttng_live_metadata_update(trace
);
429 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
&&
430 status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
) {
434 return lttng_live_lazy_notif_init(session
);
438 void lttng_live_need_new_streams(struct lttng_live_component
*lttng_live
)
440 struct lttng_live_session
*session
;
442 bt_list_for_each_entry(session
, <tng_live
->sessions
, node
) {
443 session
->new_streams_needed
= true;
448 void lttng_live_force_new_streams_and_metadata(struct lttng_live_component
*lttng_live
)
450 struct lttng_live_session
*session
;
452 bt_list_for_each_entry(session
, <tng_live
->sessions
, node
) {
453 struct lttng_live_trace
*trace
;
455 session
->new_streams_needed
= true;
456 bt_list_for_each_entry(trace
, &session
->traces
, node
) {
457 trace
->new_metadata_needed
= true;
463 enum bt_ctf_lttng_live_iterator_status
lttng_live_iterator_next_handle_new_streams_and_metadata(
464 struct lttng_live_component
*lttng_live
)
466 enum bt_ctf_lttng_live_iterator_status ret
=
467 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
468 unsigned int nr_sessions_opened
= 0;
469 struct lttng_live_session
*session
, *s
;
471 bt_list_for_each_entry_safe(session
, s
, <tng_live
->sessions
, node
) {
472 if (session
->closed
&& bt_list_empty(&session
->traces
)) {
473 lttng_live_destroy_session(session
);
477 * Currently, when there are no sessions, we quit immediately.
478 * We may want to add a component parameter to keep trying until
479 * we get data in the future.
480 * Also, in a remotely distant future, we could add a "new
481 * session" flag to the protocol, which would tell us that we
482 * need to query for new sessions even though we have sessions
485 if (bt_list_empty(<tng_live
->sessions
)) {
486 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
;
489 bt_list_for_each_entry(session
, <tng_live
->sessions
, node
) {
490 ret
= lttng_live_get_session(lttng_live
, session
);
492 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
:
494 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
:
495 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
500 if (!session
->closed
) {
501 nr_sessions_opened
++;
505 if (ret
== BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
&& !nr_sessions_opened
) {
506 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
;
512 enum bt_ctf_lttng_live_iterator_status
emit_inactivity_notification(
513 struct lttng_live_component
*lttng_live
,
514 struct lttng_live_stream_iterator
*lttng_live_stream
,
515 struct bt_notification
**notification
,
518 enum bt_ctf_lttng_live_iterator_status ret
=
519 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
520 struct lttng_live_trace
*trace
;
521 struct bt_ctf_clock_class
*clock_class
= NULL
;
522 struct bt_ctf_clock_value
*clock_value
= NULL
;
523 struct bt_notification
*notif
= NULL
;
526 trace
= lttng_live_stream
->trace
;
530 clock_class
= bt_clock_class_priority_map_get_clock_class_by_index(trace
->cc_prio_map
, 0);
534 clock_value
= bt_ctf_clock_value_create(clock_class
, timestamp
);
538 notif
= bt_notification_inactivity_create(trace
->cc_prio_map
);
542 retval
= bt_notification_inactivity_set_clock_value(notif
, clock_value
);
546 *notification
= notif
;
553 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
;
559 enum bt_ctf_lttng_live_iterator_status
lttng_live_iterator_next_handle_one_quiescent_stream(
560 struct lttng_live_component
*lttng_live
,
561 struct lttng_live_stream_iterator
*lttng_live_stream
,
562 struct bt_notification
**notification
)
564 enum bt_ctf_lttng_live_iterator_status ret
=
565 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
566 struct bt_ctf_clock_class
*clock_class
= NULL
;
567 struct bt_ctf_clock_value
*clock_value
= NULL
;
569 if (lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_QUIESCENT
) {
570 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
573 if (lttng_live_stream
->current_inactivity_timestamp
==
574 lttng_live_stream
->last_returned_inactivity_timestamp
) {
575 lttng_live_stream
->state
= LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
;
576 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
580 ret
= emit_inactivity_notification(lttng_live
, lttng_live_stream
, notification
,
581 (uint64_t) lttng_live_stream
->current_inactivity_timestamp
);
583 lttng_live_stream
->last_returned_inactivity_timestamp
=
584 lttng_live_stream
->current_inactivity_timestamp
;
592 enum bt_ctf_lttng_live_iterator_status
lttng_live_iterator_next_handle_one_active_data_stream(
593 struct lttng_live_component
*lttng_live
,
594 struct lttng_live_stream_iterator
*lttng_live_stream
,
595 struct bt_notification
**notification
)
597 enum bt_ctf_lttng_live_iterator_status ret
=
598 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
599 enum bt_ctf_notif_iter_status status
;
600 struct lttng_live_session
*session
;
602 bt_list_for_each_entry(session
, <tng_live
->sessions
, node
) {
603 struct lttng_live_trace
*trace
;
605 if (session
->new_streams_needed
) {
606 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
608 bt_list_for_each_entry(trace
, &session
->traces
, node
) {
609 if (trace
->new_metadata_needed
) {
610 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
615 if (lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_ACTIVE_DATA
) {
616 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
;
618 if (lttng_live_stream
->packet_end_notif_queue
) {
619 *notification
= lttng_live_stream
->packet_end_notif_queue
;
620 lttng_live_stream
->packet_end_notif_queue
= NULL
;
621 status
= BT_CTF_NOTIF_ITER_STATUS_OK
;
623 status
= bt_ctf_notif_iter_get_next_notification(
624 lttng_live_stream
->notif_iter
,
625 lttng_live_stream
->trace
->cc_prio_map
,
627 if (status
== BT_CTF_NOTIF_ITER_STATUS_OK
) {
629 * Consider empty packets as inactivity.
631 if (bt_notification_get_type(*notification
) == BT_NOTIFICATION_TYPE_PACKET_END
) {
632 lttng_live_stream
->packet_end_notif_queue
= *notification
;
633 *notification
= NULL
;
634 return emit_inactivity_notification(lttng_live
,
635 lttng_live_stream
, notification
,
636 lttng_live_stream
->current_packet_end_timestamp
);
641 case BT_CTF_NOTIF_ITER_STATUS_EOF
:
642 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
;
644 case BT_CTF_NOTIF_ITER_STATUS_OK
:
645 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
647 case BT_CTF_NOTIF_ITER_STATUS_AGAIN
:
649 * Continue immediately (end of packet). The next
650 * get_index may return AGAIN to delay the following
653 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
655 case BT_CTF_NOTIF_ITER_STATUS_INVAL
:
656 /* No argument provided by the user, so don't return INVAL. */
657 case BT_CTF_NOTIF_ITER_STATUS_ERROR
:
659 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
;
667 * handle_no_data_streams()
669 * - for each ACTIVE_NO_DATA stream:
670 * - query relayd for stream data, or quiescence info.
671 * - if need metadata, get metadata, goto retry.
672 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
673 * - if quiescent, move to QUIESCENT streams
674 * - if fetched data, move to ACTIVE_DATA streams
675 * (at this point each stream either has data, or is quiescent)
679 * handle_new_streams_and_metadata()
680 * - query relayd for known streams, add them as ACTIVE_NO_DATA
681 * - query relayd for metadata
683 * call handle_active_no_data_streams()
685 * handle_quiescent_streams()
686 * - if at least one stream is ACTIVE_DATA:
687 * - peek stream event with lowest timestamp -> next_ts
688 * - for each quiescent stream
689 * - if next_ts >= quiescent end
690 * - set state to ACTIVE_NO_DATA
692 * - for each quiescent stream
693 * - set state to ACTIVE_NO_DATA
695 * call handle_active_no_data_streams()
697 * handle_active_data_streams()
698 * - if at least one stream is ACTIVE_DATA:
699 * - get stream event with lowest timestamp from heap
700 * - make that stream event the current notification.
701 * - move this stream heap position to its next event
702 * - if we need to fetch data from relayd, move
703 * stream to ACTIVE_NO_DATA.
707 * end criterion: ctrl-c on client. If relayd exits or the session
708 * closes on the relay daemon side, we keep on waiting for streams.
709 * Eventually handle --end timestamp (also an end criterion).
711 * When disconnected from relayd: try to re-connect endlessly.
714 struct bt_notification_iterator_next_return
lttng_live_iterator_next_stream(
715 struct bt_private_notification_iterator
*iterator
,
716 struct lttng_live_stream_iterator
*stream_iter
)
718 enum bt_ctf_lttng_live_iterator_status status
;
719 struct bt_notification_iterator_next_return next_return
;
720 struct lttng_live_component
*lttng_live
;
722 lttng_live
= stream_iter
->trace
->session
->lttng_live
;
724 print_stream_state(stream_iter
);
725 next_return
.notification
= NULL
;
726 status
= lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live
);
727 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
730 status
= lttng_live_iterator_next_handle_one_no_data_stream(
731 lttng_live
, stream_iter
);
732 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
735 status
= lttng_live_iterator_next_handle_one_quiescent_stream(
736 lttng_live
, stream_iter
, &next_return
.notification
);
737 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
738 assert(next_return
.notification
== NULL
);
741 if (next_return
.notification
) {
744 status
= lttng_live_iterator_next_handle_one_active_data_stream(lttng_live
,
745 stream_iter
, &next_return
.notification
);
746 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
747 assert(next_return
.notification
== NULL
);
752 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
:
753 print_dbg("continue");
755 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN
:
756 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_AGAIN
;
759 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
:
760 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_END
;
763 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
:
764 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_OK
;
767 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL
:
768 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_INVALID
;
770 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM
:
771 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_NOMEM
;
773 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED
:
774 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED
;
776 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
:
777 default: /* fall-through */
778 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
785 struct bt_notification_iterator_next_return
lttng_live_iterator_next_no_stream(
786 struct bt_private_notification_iterator
*iterator
,
787 struct lttng_live_no_stream_iterator
*no_stream_iter
)
789 enum bt_ctf_lttng_live_iterator_status status
;
790 struct bt_notification_iterator_next_return next_return
;
791 struct lttng_live_component
*lttng_live
;
793 lttng_live
= no_stream_iter
->lttng_live
;
795 lttng_live_force_new_streams_and_metadata(lttng_live
);
796 status
= lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live
);
797 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
800 if (no_stream_iter
->port
) {
801 status
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN
;
803 status
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
;
807 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
:
809 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN
:
810 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_AGAIN
;
812 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
:
813 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_END
;
815 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
:
816 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_OK
;
818 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL
:
819 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_INVALID
;
821 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM
:
822 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_NOMEM
;
824 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED
:
825 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED
;
827 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
:
828 default: /* fall-through */
829 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
836 struct bt_notification_iterator_next_return
lttng_live_iterator_next(
837 struct bt_private_notification_iterator
*iterator
)
839 struct lttng_live_stream_iterator_generic
*s
=
840 bt_private_notification_iterator_get_user_data(iterator
);
841 struct bt_notification_iterator_next_return next_return
;
844 case LIVE_STREAM_TYPE_NO_STREAM
:
845 next_return
= lttng_live_iterator_next_no_stream(iterator
,
846 container_of(s
, struct lttng_live_no_stream_iterator
, p
));
848 case LIVE_STREAM_TYPE_STREAM
:
849 next_return
= lttng_live_iterator_next_stream(iterator
,
850 container_of(s
, struct lttng_live_stream_iterator
, p
));
853 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
860 enum bt_notification_iterator_status
lttng_live_iterator_init(
861 struct bt_private_notification_iterator
*it
,
862 struct bt_private_port
*port
)
864 enum bt_notification_iterator_status ret
=
865 BT_NOTIFICATION_ITERATOR_STATUS_OK
;
866 struct lttng_live_stream_iterator_generic
*s
;
870 s
= bt_private_port_get_user_data(port
);
873 case LIVE_STREAM_TYPE_NO_STREAM
:
875 struct lttng_live_no_stream_iterator
*no_stream_iter
=
876 container_of(s
, struct lttng_live_no_stream_iterator
, p
);
877 ret
= bt_private_notification_iterator_set_user_data(it
, no_stream_iter
);
883 case LIVE_STREAM_TYPE_STREAM
:
885 struct lttng_live_stream_iterator
*stream_iter
=
886 container_of(s
, struct lttng_live_stream_iterator
, p
);
887 ret
= bt_private_notification_iterator_set_user_data(it
, stream_iter
);
894 ret
= BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
901 if (bt_private_notification_iterator_set_user_data(it
, NULL
)
902 != BT_NOTIFICATION_ITERATOR_STATUS_OK
) {
903 BT_LOGE("Error setting private data to NULL");
909 struct bt_value
*lttng_live_query_list_sessions(struct bt_component_class
*comp_class
,
910 struct bt_value
*params
)
912 struct bt_value
*url_value
= NULL
;
913 struct bt_value
*results
= NULL
;
915 struct bt_live_viewer_connection
*viewer_connection
= NULL
;
917 url_value
= bt_value_map_get(params
, "url");
918 if (!url_value
|| bt_value_is_null(url_value
) || !bt_value_is_string(url_value
)) {
919 BT_LOGW("Mandatory \"url\" parameter missing");
923 if (bt_value_string_get(url_value
, &url
) != BT_VALUE_STATUS_OK
) {
924 BT_LOGW("\"url\" parameter is required to be a string value");
928 viewer_connection
= bt_live_viewer_connection_create(url
, NULL
);
929 if (!viewer_connection
) {
933 results
= bt_live_viewer_connection_list_sessions(viewer_connection
);
938 if (viewer_connection
) {
939 bt_live_viewer_connection_destroy(viewer_connection
);
946 struct bt_value
*lttng_live_query(struct bt_component_class
*comp_class
,
947 const char *object
, struct bt_value
*params
)
949 if (strcmp(object
, "sessions") == 0) {
950 return lttng_live_query_list_sessions(comp_class
,
953 BT_LOGW("Unknown query object `%s`", object
);
958 void lttng_live_component_destroy_data(struct lttng_live_component
*lttng_live
)
961 struct lttng_live_session
*session
, *s
;
963 bt_list_for_each_entry_safe(session
, s
, <tng_live
->sessions
, node
) {
964 lttng_live_destroy_session(session
);
966 BT_PUT(lttng_live
->viewer_connection
);
967 if (lttng_live
->url
) {
968 g_string_free(lttng_live
->url
, TRUE
);
970 if (lttng_live
->no_stream_port
) {
971 ret
= bt_private_port_remove_from_component(lttng_live
->no_stream_port
);
973 BT_PUT(lttng_live
->no_stream_port
);
975 if (lttng_live
->no_stream_iter
) {
976 g_free(lttng_live
->no_stream_iter
);
982 void lttng_live_component_finalize(struct bt_private_component
*component
)
984 void *data
= bt_private_component_get_user_data(component
);
989 lttng_live_component_destroy_data(data
);
993 struct lttng_live_component
*lttng_live_component_create(struct bt_value
*params
,
994 struct bt_private_component
*private_component
,
995 struct bt_graph
*graph
)
997 struct lttng_live_component
*lttng_live
;
998 struct bt_value
*value
= NULL
;
1000 enum bt_value_status ret
;
1002 lttng_live
= g_new0(struct lttng_live_component
, 1);
1006 /* TODO: make this an overridable parameter. */
1007 lttng_live
->max_query_size
= MAX_QUERY_SIZE
;
1008 BT_INIT_LIST_HEAD(<tng_live
->sessions
);
1009 value
= bt_value_map_get(params
, "url");
1010 if (!value
|| bt_value_is_null(value
) || !bt_value_is_string(value
)) {
1011 BT_LOGW("Mandatory \"url\" parameter missing");
1014 ret
= bt_value_string_get(value
, &url
);
1015 if (ret
!= BT_VALUE_STATUS_OK
) {
1016 BT_LOGW("\"url\" parameter is required to be a string value");
1019 lttng_live
->url
= g_string_new(url
);
1020 if (!lttng_live
->url
) {
1023 lttng_live
->viewer_connection
=
1024 bt_live_viewer_connection_create(lttng_live
->url
->str
, lttng_live
);
1025 if (!lttng_live
->viewer_connection
) {
1028 if (lttng_live_create_viewer_session(lttng_live
)) {
1031 lttng_live
->private_component
= private_component
;
1032 lttng_live
->graph
= graph
;
1037 lttng_live_component_destroy_data(lttng_live
);
1044 enum bt_component_status
lttng_live_component_init(
1045 struct bt_private_component
*private_component
,
1046 struct bt_value
*params
, void *init_method_data
)
1048 struct lttng_live_component
*lttng_live
;
1049 enum bt_component_status ret
= BT_COMPONENT_STATUS_OK
;
1050 struct bt_component
*component
;
1051 struct bt_graph
*graph
;
1053 component
= bt_component_from_private_component(private_component
);
1054 graph
= bt_component_get_graph(component
);
1055 bt_put(graph
); /* weak */
1058 /* Passes ownership of iter ref to lttng_live_component_create. */
1059 lttng_live
= lttng_live_component_create(params
, private_component
,
1062 if (bt_graph_is_canceled(graph
)) {
1063 ret
= BT_COMPONENT_STATUS_AGAIN
;
1065 ret
= BT_COMPONENT_STATUS_NOMEM
;
1070 lttng_live
->no_stream_iter
= g_new0(struct lttng_live_no_stream_iterator
, 1);
1071 lttng_live
->no_stream_iter
->p
.type
= LIVE_STREAM_TYPE_NO_STREAM
;
1072 lttng_live
->no_stream_iter
->lttng_live
= lttng_live
;
1073 ret
= bt_private_component_source_add_output_private_port(
1074 lttng_live
->private_component
, "no-stream",
1075 lttng_live
->no_stream_iter
,
1076 <tng_live
->no_stream_port
);
1077 if (ret
!= BT_COMPONENT_STATUS_OK
) {
1081 lttng_live
->no_stream_iter
->port
= lttng_live
->no_stream_port
;
1083 ret
= bt_private_component_set_user_data(private_component
, lttng_live
);
1084 if (ret
!= BT_COMPONENT_STATUS_OK
) {
1091 (void) bt_private_component_set_user_data(private_component
, NULL
);
1092 lttng_live_component_destroy_data(lttng_live
);
1097 enum bt_component_status
lttng_live_accept_port_connection(
1098 struct bt_private_component
*private_component
,
1099 struct bt_private_port
*self_private_port
,
1100 struct bt_port
*other_port
)
1102 struct lttng_live_component
*lttng_live
=
1103 bt_private_component_get_user_data(private_component
);
1104 struct bt_component
*other_component
;
1105 enum bt_component_status status
= BT_COMPONENT_STATUS_OK
;
1106 struct bt_port
*self_port
= bt_port_from_private_port(self_private_port
);
1108 other_component
= bt_port_get_component(other_port
);
1109 bt_put(other_component
); /* weak */
1111 if (!lttng_live
->downstream_component
) {
1112 lttng_live
->downstream_component
= other_component
;
1117 * Compare prior component to ensure we are connected to the
1118 * same downstream component as prior ports.
1120 if (lttng_live
->downstream_component
!= other_component
) {
1121 BT_LOGW("Cannot connect ctf.lttng-live component port \"%s\" to component \"%s\": already connected to component \"%s\".",
1122 bt_port_get_name(self_port
),
1123 bt_component_get_name(other_component
),
1124 bt_component_get_name(lttng_live
->downstream_component
));
1125 status
= BT_COMPONENT_STATUS_REFUSE_PORT_CONNECTION
;