4 * Babeltrace CTF LTTng-live Client Component
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
9 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 #define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC"
33 #include <babeltrace/ctf-ir/packet.h>
34 #include <babeltrace/graph/component-source.h>
35 #include <babeltrace/graph/private-port.h>
36 #include <babeltrace/graph/port.h>
37 #include <babeltrace/graph/private-component.h>
38 #include <babeltrace/graph/private-component-source.h>
39 #include <babeltrace/graph/private-notification-iterator.h>
40 #include <babeltrace/graph/notification-stream.h>
41 #include <babeltrace/graph/notification-packet.h>
42 #include <babeltrace/graph/notification-event.h>
43 #include <babeltrace/graph/notification-heap.h>
44 #include <babeltrace/graph/notification-iterator.h>
45 #include <babeltrace/graph/notification-inactivity.h>
46 #include <babeltrace/graph/graph.h>
47 #include <babeltrace/compiler-internal.h>
48 #include <babeltrace/types.h>
53 #include <plugins-common.h>
55 #include "data-stream.h"
57 #include "lttng-live-internal.h"
59 #define MAX_QUERY_SIZE (256*1024)
61 #define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
63 static const char *print_state(struct lttng_live_stream_iterator
*s
)
66 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
:
67 return "ACTIVE_NO_DATA";
68 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
:
69 return "QUIESCENT_NO_DATA";
70 case LTTNG_LIVE_STREAM_QUIESCENT
:
72 case LTTNG_LIVE_STREAM_ACTIVE_DATA
:
74 case LTTNG_LIVE_STREAM_EOF
:
82 void print_stream_state(struct lttng_live_stream_iterator
*stream
)
86 port
= bt_port_from_private_port(stream
->port
);
87 print_dbg("stream %s state %s last_inact_ts %" PRId64
" cur_inact_ts %" PRId64
,
88 bt_port_get_name(port
),
90 stream
->last_returned_inactivity_timestamp
,
91 stream
->current_inactivity_timestamp
);
96 bt_bool
lttng_live_is_canceled(struct lttng_live_component
*lttng_live
)
98 struct bt_component
*component
;
99 struct bt_graph
*graph
;
106 component
= bt_component_from_private_component(lttng_live
->private_component
);
107 graph
= bt_component_get_graph(component
);
108 ret
= bt_graph_is_canceled(graph
);
115 int lttng_live_add_port(struct lttng_live_component
*lttng_live
,
116 struct lttng_live_stream_iterator
*stream_iter
)
119 struct bt_private_port
*private_port
;
120 char name
[STREAM_NAME_MAX_LEN
];
121 enum bt_component_status status
;
123 ret
= sprintf(name
, STREAM_NAME_PREFIX
"%" PRIu64
, stream_iter
->viewer_stream_id
);
125 strcpy(stream_iter
->name
, name
);
126 if (lttng_live_is_canceled(lttng_live
)) {
129 status
= bt_private_component_source_add_output_private_port(
130 lttng_live
->private_component
, name
, stream_iter
,
133 case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED
:
135 case BT_COMPONENT_STATUS_OK
:
140 bt_put(private_port
); /* weak */
141 BT_LOGI("Added port %s", name
);
143 if (lttng_live
->no_stream_port
) {
144 bt_get(lttng_live
->no_stream_port
);
145 ret
= bt_private_port_remove_from_component(lttng_live
->no_stream_port
);
146 bt_put(lttng_live
->no_stream_port
);
150 lttng_live
->no_stream_port
= NULL
;
151 lttng_live
->no_stream_iter
->port
= NULL
;
153 stream_iter
->port
= private_port
;
158 int lttng_live_remove_port(struct lttng_live_component
*lttng_live
,
159 struct bt_private_port
*port
)
161 struct bt_component
*component
;
165 component
= bt_component_from_private_component(lttng_live
->private_component
);
166 nr_ports
= bt_component_source_get_output_port_count(component
);
172 enum bt_component_status status
;
174 assert(!lttng_live
->no_stream_port
);
176 if (lttng_live_is_canceled(lttng_live
)) {
179 status
= bt_private_component_source_add_output_private_port(lttng_live
->private_component
,
180 "no-stream", lttng_live
->no_stream_iter
,
181 <tng_live
->no_stream_port
);
183 case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED
:
185 case BT_COMPONENT_STATUS_OK
:
190 bt_put(lttng_live
->no_stream_port
); /* weak */
191 lttng_live
->no_stream_iter
->port
= lttng_live
->no_stream_port
;
194 ret
= bt_private_port_remove_from_component(port
);
203 struct lttng_live_trace
*lttng_live_find_trace(struct lttng_live_session
*session
,
206 struct lttng_live_trace
*trace
;
208 bt_list_for_each_entry(trace
, &session
->traces
, node
) {
209 if (trace
->id
== trace_id
) {
217 void lttng_live_destroy_trace(struct bt_object
*obj
)
219 struct lttng_live_trace
*trace
= container_of(obj
, struct lttng_live_trace
, obj
);
221 BT_LOGI("Destroy trace");
222 assert(bt_list_empty(&trace
->streams
));
223 bt_list_del(&trace
->node
);
228 retval
= bt_ctf_trace_set_is_static(trace
->trace
);
230 BT_PUT(trace
->trace
);
232 lttng_live_metadata_fini(trace
);
233 BT_PUT(trace
->cc_prio_map
);
238 struct lttng_live_trace
*lttng_live_create_trace(struct lttng_live_session
*session
,
241 struct lttng_live_trace
*trace
= NULL
;
243 trace
= g_new0(struct lttng_live_trace
, 1);
247 trace
->session
= session
;
248 trace
->id
= trace_id
;
249 BT_INIT_LIST_HEAD(&trace
->streams
);
250 trace
->new_metadata_needed
= true;
251 bt_list_add(&trace
->node
, &session
->traces
);
252 bt_object_init(&trace
->obj
, lttng_live_destroy_trace
);
253 BT_LOGI("Create trace");
263 struct lttng_live_trace
*lttng_live_ref_trace(struct lttng_live_session
*session
,
266 struct lttng_live_trace
*trace
;
268 trace
= lttng_live_find_trace(session
, trace_id
);
273 return lttng_live_create_trace(session
, trace_id
);
277 void lttng_live_unref_trace(struct lttng_live_trace
*trace
)
283 void lttng_live_close_trace_streams(struct lttng_live_trace
*trace
)
285 struct lttng_live_stream_iterator
*stream
, *s
;
287 bt_list_for_each_entry_safe(stream
, s
, &trace
->streams
, node
) {
288 lttng_live_stream_iterator_destroy(stream
);
290 lttng_live_metadata_fini(trace
);
294 int lttng_live_add_session(struct lttng_live_component
*lttng_live
,
295 uint64_t session_id
, const char *hostname
,
296 const char *session_name
)
299 struct lttng_live_session
*s
;
301 s
= g_new0(struct lttng_live_session
, 1);
307 BT_INIT_LIST_HEAD(&s
->traces
);
308 s
->lttng_live
= lttng_live
;
309 s
->new_streams_needed
= true;
310 s
->hostname
= g_string_new(hostname
);
311 s
->session_name
= g_string_new(session_name
);
313 BT_LOGI("Reading from session: %" PRIu64
" hostname: %s session_name: %s",
314 s
->id
, hostname
, session_name
);
315 bt_list_add(&s
->node
, <tng_live
->sessions
);
318 BT_LOGE("Error adding session");
326 void lttng_live_destroy_session(struct lttng_live_session
*session
)
328 struct lttng_live_trace
*trace
, *t
;
330 BT_LOGI("Destroy session");
331 if (session
->id
!= -1ULL) {
332 if (lttng_live_detach_session(session
)) {
333 if (!lttng_live_is_canceled(session
->lttng_live
)) {
334 /* Old relayd cannot detach sessions. */
335 BT_LOGD("Unable to detach session %" PRIu64
,
341 bt_list_for_each_entry_safe(trace
, t
, &session
->traces
, node
) {
342 lttng_live_close_trace_streams(trace
);
344 bt_list_del(&session
->node
);
345 if (session
->hostname
) {
346 g_string_free(session
->hostname
, TRUE
);
348 if (session
->session_name
) {
349 g_string_free(session
->session_name
, TRUE
);
355 void lttng_live_iterator_finalize(struct bt_private_notification_iterator
*it
)
357 struct lttng_live_stream_iterator_generic
*s
=
358 bt_private_notification_iterator_get_user_data(it
);
361 case LIVE_STREAM_TYPE_NO_STREAM
:
363 /* Leave no_stream_iter in place when port is removed. */
366 case LIVE_STREAM_TYPE_STREAM
:
368 struct lttng_live_stream_iterator
*stream_iter
=
369 container_of(s
, struct lttng_live_stream_iterator
, p
);
371 lttng_live_stream_iterator_destroy(stream_iter
);
378 enum bt_ctf_lttng_live_iterator_status
lttng_live_iterator_next_check_stream_state(
379 struct lttng_live_component
*lttng_live
,
380 struct lttng_live_stream_iterator
*lttng_live_stream
)
382 switch (lttng_live_stream
->state
) {
383 case LTTNG_LIVE_STREAM_QUIESCENT
:
384 case LTTNG_LIVE_STREAM_ACTIVE_DATA
:
386 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
:
388 BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
390 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
:
392 BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
394 case LTTNG_LIVE_STREAM_EOF
:
397 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
401 * For active no data stream, fetch next data. It can be either:
402 * - quiescent: need to put it in the prio heap at quiescent end
404 * - have data: need to wire up first event into the prio heap,
405 * - have no data on this stream at this point: need to retry (AGAIN) or
409 enum bt_ctf_lttng_live_iterator_status
lttng_live_iterator_next_handle_one_no_data_stream(
410 struct lttng_live_component
*lttng_live
,
411 struct lttng_live_stream_iterator
*lttng_live_stream
)
413 enum bt_ctf_lttng_live_iterator_status ret
=
414 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
415 struct packet_index index
;
416 enum lttng_live_stream_state orig_state
= lttng_live_stream
->state
;
418 if (lttng_live_stream
->trace
->new_metadata_needed
) {
419 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
422 if (lttng_live_stream
->trace
->session
->new_streams_needed
) {
423 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
426 if (lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
427 && lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
) {
430 ret
= lttng_live_get_next_index(lttng_live
, lttng_live_stream
, &index
);
431 if (ret
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
434 assert(lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_EOF
);
435 if (lttng_live_stream
->state
== LTTNG_LIVE_STREAM_QUIESCENT
) {
436 if (orig_state
== LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
437 && lttng_live_stream
->last_returned_inactivity_timestamp
==
438 lttng_live_stream
->current_inactivity_timestamp
) {
439 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN
;
440 print_stream_state(lttng_live_stream
);
442 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
446 lttng_live_stream
->base_offset
= index
.offset
;
447 lttng_live_stream
->offset
= index
.offset
;
448 lttng_live_stream
->len
= index
.packet_size
/ CHAR_BIT
;
450 if (ret
== BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
451 ret
= lttng_live_iterator_next_check_stream_state(
452 lttng_live
, lttng_live_stream
);
458 * Creation of the notification requires the ctf trace to be created
459 * beforehand, but the live protocol gives us all streams (including
460 * metadata) at once. So we split it in three steps: getting streams,
461 * getting metadata (which creates the ctf trace), and then creating the
462 * per-stream notifications.
465 enum bt_ctf_lttng_live_iterator_status
lttng_live_get_session(
466 struct lttng_live_component
*lttng_live
,
467 struct lttng_live_session
*session
)
469 enum bt_ctf_lttng_live_iterator_status status
;
470 struct lttng_live_trace
*trace
, *t
;
472 if (lttng_live_attach_session(session
)) {
473 if (lttng_live_is_canceled(lttng_live
)) {
474 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN
;
476 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
;
479 status
= lttng_live_get_new_streams(session
);
480 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
&&
481 status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
) {
484 bt_list_for_each_entry_safe(trace
, t
, &session
->traces
, node
) {
485 status
= lttng_live_metadata_update(trace
);
486 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
&&
487 status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
) {
491 return lttng_live_lazy_notif_init(session
);
495 void lttng_live_need_new_streams(struct lttng_live_component
*lttng_live
)
497 struct lttng_live_session
*session
;
499 bt_list_for_each_entry(session
, <tng_live
->sessions
, node
) {
500 session
->new_streams_needed
= true;
505 void lttng_live_force_new_streams_and_metadata(struct lttng_live_component
*lttng_live
)
507 struct lttng_live_session
*session
;
509 bt_list_for_each_entry(session
, <tng_live
->sessions
, node
) {
510 struct lttng_live_trace
*trace
;
512 session
->new_streams_needed
= true;
513 bt_list_for_each_entry(trace
, &session
->traces
, node
) {
514 trace
->new_metadata_needed
= true;
520 enum bt_ctf_lttng_live_iterator_status
lttng_live_iterator_next_handle_new_streams_and_metadata(
521 struct lttng_live_component
*lttng_live
)
523 enum bt_ctf_lttng_live_iterator_status ret
=
524 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
525 unsigned int nr_sessions_opened
= 0;
526 struct lttng_live_session
*session
, *s
;
528 bt_list_for_each_entry_safe(session
, s
, <tng_live
->sessions
, node
) {
529 if (session
->closed
&& bt_list_empty(&session
->traces
)) {
530 lttng_live_destroy_session(session
);
534 * Currently, when there are no sessions, we quit immediately.
535 * We may want to add a component parameter to keep trying until
536 * we get data in the future.
537 * Also, in a remotely distant future, we could add a "new
538 * session" flag to the protocol, which would tell us that we
539 * need to query for new sessions even though we have sessions
542 if (bt_list_empty(<tng_live
->sessions
)) {
543 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
;
546 bt_list_for_each_entry(session
, <tng_live
->sessions
, node
) {
547 ret
= lttng_live_get_session(lttng_live
, session
);
549 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
:
551 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
:
552 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
557 if (!session
->closed
) {
558 nr_sessions_opened
++;
562 if (ret
== BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
&& !nr_sessions_opened
) {
563 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
;
569 enum bt_ctf_lttng_live_iterator_status
emit_inactivity_notification(
570 struct lttng_live_component
*lttng_live
,
571 struct lttng_live_stream_iterator
*lttng_live_stream
,
572 struct bt_notification
**notification
,
575 enum bt_ctf_lttng_live_iterator_status ret
=
576 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
577 struct lttng_live_trace
*trace
;
578 struct bt_ctf_clock_class
*clock_class
= NULL
;
579 struct bt_ctf_clock_value
*clock_value
= NULL
;
580 struct bt_notification
*notif
= NULL
;
583 trace
= lttng_live_stream
->trace
;
587 clock_class
= bt_clock_class_priority_map_get_clock_class_by_index(trace
->cc_prio_map
, 0);
591 clock_value
= bt_ctf_clock_value_create(clock_class
, timestamp
);
595 notif
= bt_notification_inactivity_create(trace
->cc_prio_map
);
599 retval
= bt_notification_inactivity_set_clock_value(notif
, clock_value
);
603 *notification
= notif
;
610 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
;
616 enum bt_ctf_lttng_live_iterator_status
lttng_live_iterator_next_handle_one_quiescent_stream(
617 struct lttng_live_component
*lttng_live
,
618 struct lttng_live_stream_iterator
*lttng_live_stream
,
619 struct bt_notification
**notification
)
621 enum bt_ctf_lttng_live_iterator_status ret
=
622 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
623 struct bt_ctf_clock_class
*clock_class
= NULL
;
624 struct bt_ctf_clock_value
*clock_value
= NULL
;
626 if (lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_QUIESCENT
) {
627 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
630 if (lttng_live_stream
->current_inactivity_timestamp
==
631 lttng_live_stream
->last_returned_inactivity_timestamp
) {
632 lttng_live_stream
->state
= LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
;
633 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
637 ret
= emit_inactivity_notification(lttng_live
, lttng_live_stream
, notification
,
638 (uint64_t) lttng_live_stream
->current_inactivity_timestamp
);
640 lttng_live_stream
->last_returned_inactivity_timestamp
=
641 lttng_live_stream
->current_inactivity_timestamp
;
649 enum bt_ctf_lttng_live_iterator_status
lttng_live_iterator_next_handle_one_active_data_stream(
650 struct lttng_live_component
*lttng_live
,
651 struct lttng_live_stream_iterator
*lttng_live_stream
,
652 struct bt_notification
**notification
)
654 enum bt_ctf_lttng_live_iterator_status ret
=
655 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
656 enum bt_ctf_notif_iter_status status
;
657 struct lttng_live_session
*session
;
659 bt_list_for_each_entry(session
, <tng_live
->sessions
, node
) {
660 struct lttng_live_trace
*trace
;
662 if (session
->new_streams_needed
) {
663 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
665 bt_list_for_each_entry(trace
, &session
->traces
, node
) {
666 if (trace
->new_metadata_needed
) {
667 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
672 if (lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_ACTIVE_DATA
) {
673 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
;
675 if (lttng_live_stream
->packet_end_notif_queue
) {
676 *notification
= lttng_live_stream
->packet_end_notif_queue
;
677 lttng_live_stream
->packet_end_notif_queue
= NULL
;
678 status
= BT_CTF_NOTIF_ITER_STATUS_OK
;
680 status
= bt_ctf_notif_iter_get_next_notification(
681 lttng_live_stream
->notif_iter
,
682 lttng_live_stream
->trace
->cc_prio_map
,
684 if (status
== BT_CTF_NOTIF_ITER_STATUS_OK
) {
686 * Consider empty packets as inactivity.
688 if (bt_notification_get_type(*notification
) == BT_NOTIFICATION_TYPE_PACKET_END
) {
689 lttng_live_stream
->packet_end_notif_queue
= *notification
;
690 *notification
= NULL
;
691 return emit_inactivity_notification(lttng_live
,
692 lttng_live_stream
, notification
,
693 lttng_live_stream
->current_packet_end_timestamp
);
698 case BT_CTF_NOTIF_ITER_STATUS_EOF
:
699 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
;
701 case BT_CTF_NOTIF_ITER_STATUS_OK
:
702 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
704 case BT_CTF_NOTIF_ITER_STATUS_AGAIN
:
706 * Continue immediately (end of packet). The next
707 * get_index may return AGAIN to delay the following
710 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
712 case BT_CTF_NOTIF_ITER_STATUS_INVAL
:
713 /* No argument provided by the user, so don't return INVAL. */
714 case BT_CTF_NOTIF_ITER_STATUS_ERROR
:
716 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
;
724 * handle_no_data_streams()
726 * - for each ACTIVE_NO_DATA stream:
727 * - query relayd for stream data, or quiescence info.
728 * - if need metadata, get metadata, goto retry.
729 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
730 * - if quiescent, move to QUIESCENT streams
731 * - if fetched data, move to ACTIVE_DATA streams
732 * (at this point each stream either has data, or is quiescent)
736 * handle_new_streams_and_metadata()
737 * - query relayd for known streams, add them as ACTIVE_NO_DATA
738 * - query relayd for metadata
740 * call handle_active_no_data_streams()
742 * handle_quiescent_streams()
743 * - if at least one stream is ACTIVE_DATA:
744 * - peek stream event with lowest timestamp -> next_ts
745 * - for each quiescent stream
746 * - if next_ts >= quiescent end
747 * - set state to ACTIVE_NO_DATA
749 * - for each quiescent stream
750 * - set state to ACTIVE_NO_DATA
752 * call handle_active_no_data_streams()
754 * handle_active_data_streams()
755 * - if at least one stream is ACTIVE_DATA:
756 * - get stream event with lowest timestamp from heap
757 * - make that stream event the current notification.
758 * - move this stream heap position to its next event
759 * - if we need to fetch data from relayd, move
760 * stream to ACTIVE_NO_DATA.
764 * end criterion: ctrl-c on client. If relayd exits or the session
765 * closes on the relay daemon side, we keep on waiting for streams.
766 * Eventually handle --end timestamp (also an end criterion).
768 * When disconnected from relayd: try to re-connect endlessly.
771 struct bt_notification_iterator_next_return
lttng_live_iterator_next_stream(
772 struct bt_private_notification_iterator
*iterator
,
773 struct lttng_live_stream_iterator
*stream_iter
)
775 enum bt_ctf_lttng_live_iterator_status status
;
776 struct bt_notification_iterator_next_return next_return
;
777 struct lttng_live_component
*lttng_live
;
779 lttng_live
= stream_iter
->trace
->session
->lttng_live
;
781 print_stream_state(stream_iter
);
782 next_return
.notification
= NULL
;
783 status
= lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live
);
784 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
787 status
= lttng_live_iterator_next_handle_one_no_data_stream(
788 lttng_live
, stream_iter
);
789 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
792 status
= lttng_live_iterator_next_handle_one_quiescent_stream(
793 lttng_live
, stream_iter
, &next_return
.notification
);
794 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
795 assert(next_return
.notification
== NULL
);
798 if (next_return
.notification
) {
801 status
= lttng_live_iterator_next_handle_one_active_data_stream(lttng_live
,
802 stream_iter
, &next_return
.notification
);
803 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
804 assert(next_return
.notification
== NULL
);
809 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
:
810 print_dbg("continue");
812 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN
:
813 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_AGAIN
;
816 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
:
817 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_END
;
820 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
:
821 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_OK
;
824 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL
:
825 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_INVALID
;
827 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM
:
828 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_NOMEM
;
830 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED
:
831 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED
;
833 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
:
834 default: /* fall-through */
835 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
842 struct bt_notification_iterator_next_return
lttng_live_iterator_next_no_stream(
843 struct bt_private_notification_iterator
*iterator
,
844 struct lttng_live_no_stream_iterator
*no_stream_iter
)
846 enum bt_ctf_lttng_live_iterator_status status
;
847 struct bt_notification_iterator_next_return next_return
;
848 struct lttng_live_component
*lttng_live
;
850 lttng_live
= no_stream_iter
->lttng_live
;
852 lttng_live_force_new_streams_and_metadata(lttng_live
);
853 next_return
.notification
= NULL
;
854 status
= lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live
);
855 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
858 if (no_stream_iter
->port
) {
859 status
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN
;
861 status
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
;
865 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
:
867 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN
:
868 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_AGAIN
;
870 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
:
871 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_END
;
873 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
:
874 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_OK
;
876 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL
:
877 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_INVALID
;
879 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM
:
880 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_NOMEM
;
882 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED
:
883 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED
;
885 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
:
886 default: /* fall-through */
887 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
894 struct bt_notification_iterator_next_return
lttng_live_iterator_next(
895 struct bt_private_notification_iterator
*iterator
)
897 struct lttng_live_stream_iterator_generic
*s
=
898 bt_private_notification_iterator_get_user_data(iterator
);
899 struct bt_notification_iterator_next_return next_return
;
902 case LIVE_STREAM_TYPE_NO_STREAM
:
903 next_return
= lttng_live_iterator_next_no_stream(iterator
,
904 container_of(s
, struct lttng_live_no_stream_iterator
, p
));
906 case LIVE_STREAM_TYPE_STREAM
:
907 next_return
= lttng_live_iterator_next_stream(iterator
,
908 container_of(s
, struct lttng_live_stream_iterator
, p
));
911 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
918 enum bt_notification_iterator_status
lttng_live_iterator_init(
919 struct bt_private_notification_iterator
*it
,
920 struct bt_private_port
*port
)
922 enum bt_notification_iterator_status ret
=
923 BT_NOTIFICATION_ITERATOR_STATUS_OK
;
924 struct lttng_live_stream_iterator_generic
*s
;
928 s
= bt_private_port_get_user_data(port
);
931 case LIVE_STREAM_TYPE_NO_STREAM
:
933 struct lttng_live_no_stream_iterator
*no_stream_iter
=
934 container_of(s
, struct lttng_live_no_stream_iterator
, p
);
935 ret
= bt_private_notification_iterator_set_user_data(it
, no_stream_iter
);
941 case LIVE_STREAM_TYPE_STREAM
:
943 struct lttng_live_stream_iterator
*stream_iter
=
944 container_of(s
, struct lttng_live_stream_iterator
, p
);
945 ret
= bt_private_notification_iterator_set_user_data(it
, stream_iter
);
952 ret
= BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
959 if (bt_private_notification_iterator_set_user_data(it
, NULL
)
960 != BT_NOTIFICATION_ITERATOR_STATUS_OK
) {
961 BT_LOGE("Error setting private data to NULL");
967 struct bt_value
*lttng_live_query_list_sessions(struct bt_component_class
*comp_class
,
968 struct bt_value
*params
)
970 struct bt_value
*url_value
= NULL
;
971 struct bt_value
*results
= NULL
;
973 struct bt_live_viewer_connection
*viewer_connection
= NULL
;
975 url_value
= bt_value_map_get(params
, "url");
976 if (!url_value
|| bt_value_is_null(url_value
) || !bt_value_is_string(url_value
)) {
977 BT_LOGW("Mandatory \"url\" parameter missing");
981 if (bt_value_string_get(url_value
, &url
) != BT_VALUE_STATUS_OK
) {
982 BT_LOGW("\"url\" parameter is required to be a string value");
986 viewer_connection
= bt_live_viewer_connection_create(url
, NULL
);
987 if (!viewer_connection
) {
991 results
= bt_live_viewer_connection_list_sessions(viewer_connection
);
996 if (viewer_connection
) {
997 bt_live_viewer_connection_destroy(viewer_connection
);
1004 struct bt_value
*lttng_live_query(struct bt_component_class
*comp_class
,
1005 const char *object
, struct bt_value
*params
)
1007 if (strcmp(object
, "sessions") == 0) {
1008 return lttng_live_query_list_sessions(comp_class
,
1011 BT_LOGW("Unknown query object `%s`", object
);
1016 void lttng_live_component_destroy_data(struct lttng_live_component
*lttng_live
)
1019 struct lttng_live_session
*session
, *s
;
1021 bt_list_for_each_entry_safe(session
, s
, <tng_live
->sessions
, node
) {
1022 lttng_live_destroy_session(session
);
1024 BT_PUT(lttng_live
->viewer_connection
);
1025 if (lttng_live
->url
) {
1026 g_string_free(lttng_live
->url
, TRUE
);
1028 if (lttng_live
->no_stream_port
) {
1029 bt_get(lttng_live
->no_stream_port
);
1030 ret
= bt_private_port_remove_from_component(lttng_live
->no_stream_port
);
1031 bt_put(lttng_live
->no_stream_port
);
1034 if (lttng_live
->no_stream_iter
) {
1035 g_free(lttng_live
->no_stream_iter
);
1041 void lttng_live_component_finalize(struct bt_private_component
*component
)
1043 void *data
= bt_private_component_get_user_data(component
);
1048 lttng_live_component_destroy_data(data
);
1052 struct lttng_live_component
*lttng_live_component_create(struct bt_value
*params
,
1053 struct bt_private_component
*private_component
)
1055 struct lttng_live_component
*lttng_live
;
1056 struct bt_value
*value
= NULL
;
1058 enum bt_value_status ret
;
1060 lttng_live
= g_new0(struct lttng_live_component
, 1);
1064 /* TODO: make this an overridable parameter. */
1065 lttng_live
->max_query_size
= MAX_QUERY_SIZE
;
1066 BT_INIT_LIST_HEAD(<tng_live
->sessions
);
1067 value
= bt_value_map_get(params
, "url");
1068 if (!value
|| bt_value_is_null(value
) || !bt_value_is_string(value
)) {
1069 BT_LOGW("Mandatory \"url\" parameter missing");
1072 ret
= bt_value_string_get(value
, &url
);
1073 if (ret
!= BT_VALUE_STATUS_OK
) {
1074 BT_LOGW("\"url\" parameter is required to be a string value");
1077 lttng_live
->url
= g_string_new(url
);
1078 if (!lttng_live
->url
) {
1082 lttng_live
->viewer_connection
=
1083 bt_live_viewer_connection_create(lttng_live
->url
->str
, lttng_live
);
1084 if (!lttng_live
->viewer_connection
) {
1087 if (lttng_live_create_viewer_session(lttng_live
)) {
1090 lttng_live
->private_component
= private_component
;
1095 lttng_live_component_destroy_data(lttng_live
);
1102 enum bt_component_status
lttng_live_component_init(
1103 struct bt_private_component
*private_component
,
1104 struct bt_value
*params
, void *init_method_data
)
1106 struct lttng_live_component
*lttng_live
;
1107 enum bt_component_status ret
= BT_COMPONENT_STATUS_OK
;
1109 /* Passes ownership of iter ref to lttng_live_component_create. */
1110 lttng_live
= lttng_live_component_create(params
, private_component
);
1112 //TODO : we need access to the application cancel state
1113 //because we are not part of a graph yet.
1114 ret
= BT_COMPONENT_STATUS_NOMEM
;
1118 lttng_live
->no_stream_iter
= g_new0(struct lttng_live_no_stream_iterator
, 1);
1119 lttng_live
->no_stream_iter
->p
.type
= LIVE_STREAM_TYPE_NO_STREAM
;
1120 lttng_live
->no_stream_iter
->lttng_live
= lttng_live
;
1121 if (lttng_live_is_canceled(lttng_live
)) {
1124 ret
= bt_private_component_source_add_output_private_port(
1125 lttng_live
->private_component
, "no-stream",
1126 lttng_live
->no_stream_iter
,
1127 <tng_live
->no_stream_port
);
1128 if (ret
!= BT_COMPONENT_STATUS_OK
) {
1131 bt_put(lttng_live
->no_stream_port
); /* weak */
1132 lttng_live
->no_stream_iter
->port
= lttng_live
->no_stream_port
;
1134 ret
= bt_private_component_set_user_data(private_component
, lttng_live
);
1135 if (ret
!= BT_COMPONENT_STATUS_OK
) {
1142 (void) bt_private_component_set_user_data(private_component
, NULL
);
1143 lttng_live_component_destroy_data(lttng_live
);
1148 enum bt_component_status
lttng_live_accept_port_connection(
1149 struct bt_private_component
*private_component
,
1150 struct bt_private_port
*self_private_port
,
1151 struct bt_port
*other_port
)
1153 struct lttng_live_component
*lttng_live
=
1154 bt_private_component_get_user_data(private_component
);
1155 struct bt_component
*other_component
;
1156 enum bt_component_status status
= BT_COMPONENT_STATUS_OK
;
1157 struct bt_port
*self_port
= bt_port_from_private_port(self_private_port
);
1159 other_component
= bt_port_get_component(other_port
);
1160 bt_put(other_component
); /* weak */
1162 if (!lttng_live
->downstream_component
) {
1163 lttng_live
->downstream_component
= other_component
;
1168 * Compare prior component to ensure we are connected to the
1169 * same downstream component as prior ports.
1171 if (lttng_live
->downstream_component
!= other_component
) {
1172 BT_LOGW("Cannot connect ctf.lttng-live component port \"%s\" to component \"%s\": already connected to component \"%s\".",
1173 bt_port_get_name(self_port
),
1174 bt_component_get_name(other_component
),
1175 bt_component_get_name(lttng_live
->downstream_component
));
1176 status
= BT_COMPONENT_STATUS_REFUSE_PORT_CONNECTION
;