4 * Babeltrace CTF LTTng-live Client Component
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
9 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 #include <babeltrace/ctf-ir/packet.h>
31 #include <babeltrace/graph/component-source.h>
32 #include <babeltrace/graph/private-port.h>
33 #include <babeltrace/graph/port.h>
34 #include <babeltrace/graph/private-component.h>
35 #include <babeltrace/graph/private-component-source.h>
36 #include <babeltrace/graph/private-notification-iterator.h>
37 #include <babeltrace/graph/notification-stream.h>
38 #include <babeltrace/graph/notification-packet.h>
39 #include <babeltrace/graph/notification-event.h>
40 #include <babeltrace/graph/notification-heap.h>
41 #include <babeltrace/graph/notification-iterator.h>
42 #include <babeltrace/graph/notification-inactivity.h>
43 #include <babeltrace/compiler-internal.h>
48 #include <plugins-common.h>
50 #include "lttng-live-internal.h"
51 #include "data-stream.h"
54 #define PRINT_ERR_STREAM (lttng_live->error_fp)
55 #define PRINT_PREFIX "lttng-live"
56 #define PRINT_DBG_CHECK lttng_live_debug
57 #define MAX_QUERY_SIZE (256*1024)
61 #define print_dbg(fmt, args...) \
62 fprintf(stderr, "%s() at " __FILE__ ":%d " fmt "\n", \
63 __func__, __LINE__, ## args)
65 static const char *print_state(struct lttng_live_stream_iterator
*s
)
68 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
:
69 return "ACTIVE_NO_DATA";
70 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
:
71 return "QUIESCENT_NO_DATA";
72 case LTTNG_LIVE_STREAM_QUIESCENT
:
74 case LTTNG_LIVE_STREAM_ACTIVE_DATA
:
76 case LTTNG_LIVE_STREAM_EOF
:
83 #define print_dbg(fmt, args...)
86 #define print_stream_state(stream) \
87 print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64, \
88 bt_port_get_name(bt_port_from_private_port(stream->port)), \
89 print_state(stream), stream->last_returned_inactivity_timestamp, \
90 stream->current_inactivity_timestamp)
93 bool lttng_live_debug
;
96 int lttng_live_add_port(struct lttng_live_component
*lttng_live
,
97 struct lttng_live_stream_iterator
*stream_iter
)
100 struct bt_private_port
*private_port
;
101 char name
[STREAM_NAME_MAX_LEN
];
103 ret
= sprintf(name
, STREAM_NAME_PREFIX
"%" PRIu64
, stream_iter
->viewer_stream_id
);
105 strcpy(stream_iter
->name
, name
);
106 private_port
= bt_private_component_source_add_output_private_port(
107 lttng_live
->private_component
, name
, stream_iter
);
111 PDBG("Added port %s\n", name
);
113 if (lttng_live
->no_stream_port
) {
114 ret
= bt_private_port_remove_from_component(lttng_live
->no_stream_port
);
118 BT_PUT(lttng_live
->no_stream_port
);
119 lttng_live
->no_stream_iter
->port
= NULL
;
121 stream_iter
->port
= private_port
;
126 int lttng_live_remove_port(struct lttng_live_component
*lttng_live
,
127 struct bt_private_port
*port
)
129 struct bt_component
*component
;
133 component
= bt_component_from_private_component(lttng_live
->private_component
);
134 nr_ports
= bt_component_source_get_output_port_count(component
);
140 assert(!lttng_live
->no_stream_port
);
141 lttng_live
->no_stream_port
=
142 bt_private_component_source_add_output_private_port(lttng_live
->private_component
,
143 "no-stream", lttng_live
->no_stream_iter
);
144 if (!lttng_live
->no_stream_port
) {
147 lttng_live
->no_stream_iter
->port
= lttng_live
->no_stream_port
;
149 ret
= bt_private_port_remove_from_component(port
);
157 struct lttng_live_trace
*lttng_live_find_trace(struct lttng_live_session
*session
,
160 struct lttng_live_trace
*trace
;
162 bt_list_for_each_entry(trace
, &session
->traces
, node
) {
163 if (trace
->id
== trace_id
) {
171 void lttng_live_destroy_trace(struct bt_object
*obj
)
173 struct lttng_live_trace
*trace
= container_of(obj
, struct lttng_live_trace
, obj
);
175 PDBG("Destroy trace\n");
176 assert(bt_list_empty(&trace
->streams
));
177 bt_list_del(&trace
->node
);
178 lttng_live_metadata_fini(trace
);
179 BT_PUT(trace
->cc_prio_map
);
184 struct lttng_live_trace
*lttng_live_create_trace(struct lttng_live_session
*session
,
187 struct lttng_live_trace
*trace
= NULL
;
189 trace
= g_new0(struct lttng_live_trace
, 1);
193 trace
->session
= session
;
194 trace
->id
= trace_id
;
195 BT_INIT_LIST_HEAD(&trace
->streams
);
196 trace
->new_metadata_needed
= true;
197 bt_list_add(&trace
->node
, &session
->traces
);
198 bt_object_init(&trace
->obj
, lttng_live_destroy_trace
);
208 struct lttng_live_trace
*lttng_live_ref_trace(struct lttng_live_session
*session
,
211 struct lttng_live_trace
*trace
;
213 trace
= lttng_live_find_trace(session
, trace_id
);
218 return lttng_live_create_trace(session
, trace_id
);
222 void lttng_live_unref_trace(struct lttng_live_trace
*trace
)
228 void lttng_live_close_trace_streams(struct lttng_live_trace
*trace
)
230 struct lttng_live_stream_iterator
*stream
, *s
;
232 bt_list_for_each_entry_safe(stream
, s
, &trace
->streams
, node
) {
233 lttng_live_stream_iterator_destroy(stream
);
235 lttng_live_metadata_fini(trace
);
239 int lttng_live_add_session(struct lttng_live_component
*lttng_live
, uint64_t session_id
)
242 struct lttng_live_session
*s
;
244 s
= g_new0(struct lttng_live_session
, 1);
250 BT_INIT_LIST_HEAD(&s
->traces
);
251 s
->lttng_live
= lttng_live
;
252 s
->new_streams_needed
= true;
254 PDBG("Reading from session %" PRIu64
"\n", s
->id
);
255 bt_list_add(&s
->node
, <tng_live
->sessions
);
258 PERR("Error adding session\n");
266 void lttng_live_destroy_session(struct lttng_live_session
*session
)
268 struct lttng_live_trace
*trace
, *t
;
270 PDBG("Destroy session\n");
271 if (session
->id
!= -1ULL) {
272 if (lttng_live_detach_session(session
)) {
273 /* Old relayd cannot detach sessions. */
274 PDBG("Unable to detach session %" PRIu64
"\n",
279 bt_list_for_each_entry_safe(trace
, t
, &session
->traces
, node
) {
280 lttng_live_close_trace_streams(trace
);
282 bt_list_del(&session
->node
);
287 void lttng_live_iterator_finalize(struct bt_private_notification_iterator
*it
)
289 struct lttng_live_stream_iterator_generic
*s
=
290 bt_private_notification_iterator_get_user_data(it
);
293 case LIVE_STREAM_TYPE_NO_STREAM
:
295 /* Leave no_stream_iter in place when port is removed. */
298 case LIVE_STREAM_TYPE_STREAM
:
300 struct lttng_live_stream_iterator
*stream_iter
=
301 container_of(s
, struct lttng_live_stream_iterator
, p
);
303 lttng_live_stream_iterator_destroy(stream_iter
);
310 enum bt_ctf_lttng_live_iterator_status
lttng_live_iterator_next_check_stream_state(
311 struct lttng_live_component
*lttng_live
,
312 struct lttng_live_stream_iterator
*lttng_live_stream
)
314 switch (lttng_live_stream
->state
) {
315 case LTTNG_LIVE_STREAM_QUIESCENT
:
316 case LTTNG_LIVE_STREAM_ACTIVE_DATA
:
318 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
:
320 PERR("Unexpected stream state \"ACTIVE_NO_DATA\"\n");
321 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
;
322 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
:
324 PERR("Unexpected stream state \"QUIESCENT_NO_DATA\"\n");
325 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
;
326 case LTTNG_LIVE_STREAM_EOF
:
329 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
333 * For active no data stream, fetch next data. It can be either:
334 * - quiescent: need to put it in the prio heap at quiescent end
336 * - have data: need to wire up first event into the prio heap,
337 * - have no data on this stream at this point: need to retry (AGAIN) or
341 enum bt_ctf_lttng_live_iterator_status
lttng_live_iterator_next_handle_one_no_data_stream(
342 struct lttng_live_component
*lttng_live
,
343 struct lttng_live_stream_iterator
*lttng_live_stream
)
345 enum bt_ctf_lttng_live_iterator_status ret
=
346 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
347 struct packet_index index
;
348 enum lttng_live_stream_state orig_state
= lttng_live_stream
->state
;
350 if (lttng_live_stream
->trace
->new_metadata_needed
) {
351 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
354 if (lttng_live_stream
->trace
->session
->new_streams_needed
) {
355 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
358 if (lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
359 && lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
) {
362 ret
= lttng_live_get_next_index(lttng_live
, lttng_live_stream
, &index
);
363 if (ret
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
366 assert(lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_EOF
);
367 if (lttng_live_stream
->state
== LTTNG_LIVE_STREAM_QUIESCENT
) {
368 if (orig_state
== LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
369 && lttng_live_stream
->last_returned_inactivity_timestamp
==
370 lttng_live_stream
->current_inactivity_timestamp
) {
371 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN
;
372 print_stream_state(lttng_live_stream
);
374 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
378 lttng_live_stream
->base_offset
= index
.offset
;
379 lttng_live_stream
->offset
= index
.offset
;
380 lttng_live_stream
->len
= index
.packet_size
/ CHAR_BIT
;
382 if (ret
== BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
383 ret
= lttng_live_iterator_next_check_stream_state(
384 lttng_live
, lttng_live_stream
);
390 * Creation of the notification requires the ctf trace to be created
391 * beforehand, but the live protocol gives us all streams (including
392 * metadata) at once. So we split it in three steps: getting streams,
393 * getting metadata (which creates the ctf trace), and then creating the
394 * per-stream notifications.
397 enum bt_ctf_lttng_live_iterator_status
lttng_live_get_session(
398 struct lttng_live_component
*lttng_live
,
399 struct lttng_live_session
*session
)
401 enum bt_ctf_lttng_live_iterator_status status
;
402 struct lttng_live_trace
*trace
, *t
;
404 if (lttng_live_attach_session(session
)) {
405 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
;
407 status
= lttng_live_get_new_streams(session
);
408 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
&&
409 status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
) {
412 bt_list_for_each_entry_safe(trace
, t
, &session
->traces
, node
) {
413 status
= lttng_live_metadata_update(trace
);
414 if (status
== BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
) {
417 retval
= bt_ctf_trace_set_is_static(trace
->trace
);
419 } else if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
423 return lttng_live_lazy_notif_init(session
);
427 void lttng_live_need_new_streams(struct lttng_live_component
*lttng_live
)
429 struct lttng_live_session
*session
;
431 bt_list_for_each_entry(session
, <tng_live
->sessions
, node
) {
432 session
->new_streams_needed
= true;
437 void lttng_live_force_new_streams_and_metadata(struct lttng_live_component
*lttng_live
)
439 struct lttng_live_session
*session
;
441 bt_list_for_each_entry(session
, <tng_live
->sessions
, node
) {
442 struct lttng_live_trace
*trace
;
444 session
->new_streams_needed
= true;
445 bt_list_for_each_entry(trace
, &session
->traces
, node
) {
446 trace
->new_metadata_needed
= true;
452 enum bt_notification_iterator_status
lttng_live_iterator_next_handle_new_streams_and_metadata(
453 struct lttng_live_component
*lttng_live
)
455 enum bt_ctf_lttng_live_iterator_status ret
=
456 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
457 unsigned int nr_sessions_opened
= 0;
458 struct lttng_live_session
*session
, *s
;
460 bt_list_for_each_entry_safe(session
, s
, <tng_live
->sessions
, node
) {
461 if (session
->closed
&& bt_list_empty(&session
->traces
)) {
462 lttng_live_destroy_session(session
);
466 * Currently, when there are no sessions, we quit immediately.
467 * We may want to add a component parameter to keep trying until
468 * we get data in the future.
469 * Also, in a remotely distant future, we could add a "new
470 * session" flag to the protocol, which would tell us that we
471 * need to query for new sessions even though we have sessions
474 if (bt_list_empty(<tng_live
->sessions
)) {
475 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
;
478 bt_list_for_each_entry(session
, <tng_live
->sessions
, node
) {
479 ret
= lttng_live_get_session(lttng_live
, session
);
481 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
:
483 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
:
484 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
489 if (!session
->closed
) {
490 nr_sessions_opened
++;
494 if (ret
== BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
&& !nr_sessions_opened
) {
495 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
;
501 enum bt_ctf_lttng_live_iterator_status
emit_inactivity_notification(
502 struct lttng_live_component
*lttng_live
,
503 struct lttng_live_stream_iterator
*lttng_live_stream
,
504 struct bt_notification
**notification
,
507 enum bt_ctf_lttng_live_iterator_status ret
=
508 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
509 struct lttng_live_trace
*trace
;
510 struct bt_ctf_clock_class
*clock_class
= NULL
;
511 struct bt_ctf_clock_value
*clock_value
= NULL
;
512 struct bt_notification
*notif
= NULL
;
515 trace
= lttng_live_stream
->trace
;
519 clock_class
= bt_clock_class_priority_map_get_clock_class_by_index(trace
->cc_prio_map
, 0);
523 clock_value
= bt_ctf_clock_value_create(clock_class
, timestamp
);
527 notif
= bt_notification_inactivity_create(trace
->cc_prio_map
);
531 retval
= bt_notification_inactivity_set_clock_value(notif
, clock_value
);
535 *notification
= notif
;
542 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
;
548 enum bt_ctf_lttng_live_iterator_status
lttng_live_iterator_next_handle_one_quiescent_stream(
549 struct lttng_live_component
*lttng_live
,
550 struct lttng_live_stream_iterator
*lttng_live_stream
,
551 struct bt_notification
**notification
)
553 enum bt_ctf_lttng_live_iterator_status ret
=
554 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
555 struct bt_ctf_clock_class
*clock_class
= NULL
;
556 struct bt_ctf_clock_value
*clock_value
= NULL
;
558 if (lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_QUIESCENT
) {
559 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
562 if (lttng_live_stream
->current_inactivity_timestamp
==
563 lttng_live_stream
->last_returned_inactivity_timestamp
) {
564 lttng_live_stream
->state
= LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
;
565 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
569 ret
= emit_inactivity_notification(lttng_live
, lttng_live_stream
, notification
,
570 (uint64_t) lttng_live_stream
->current_inactivity_timestamp
);
572 lttng_live_stream
->last_returned_inactivity_timestamp
=
573 lttng_live_stream
->current_inactivity_timestamp
;
581 enum bt_ctf_lttng_live_iterator_status
lttng_live_iterator_next_handle_one_active_data_stream(
582 struct lttng_live_component
*lttng_live
,
583 struct lttng_live_stream_iterator
*lttng_live_stream
,
584 struct bt_notification
**notification
)
586 enum bt_ctf_lttng_live_iterator_status ret
=
587 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
588 enum bt_ctf_notif_iter_status status
;
589 struct lttng_live_session
*session
;
591 bt_list_for_each_entry(session
, <tng_live
->sessions
, node
) {
592 struct lttng_live_trace
*trace
;
594 if (session
->new_streams_needed
) {
595 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
597 bt_list_for_each_entry(trace
, &session
->traces
, node
) {
598 if (trace
->new_metadata_needed
) {
599 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
604 if (lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_ACTIVE_DATA
) {
605 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
;
607 if (lttng_live_stream
->packet_end_notif_queue
) {
608 *notification
= lttng_live_stream
->packet_end_notif_queue
;
609 lttng_live_stream
->packet_end_notif_queue
= NULL
;
610 status
= BT_CTF_NOTIF_ITER_STATUS_OK
;
612 status
= bt_ctf_notif_iter_get_next_notification(
613 lttng_live_stream
->notif_iter
,
614 lttng_live_stream
->trace
->cc_prio_map
,
616 if (status
== BT_CTF_NOTIF_ITER_STATUS_OK
) {
618 * Consider empty packets as inactivity.
620 if (bt_notification_get_type(*notification
) == BT_NOTIFICATION_TYPE_PACKET_END
) {
621 lttng_live_stream
->packet_end_notif_queue
= *notification
;
622 *notification
= NULL
;
623 return emit_inactivity_notification(lttng_live
,
624 lttng_live_stream
, notification
,
625 lttng_live_stream
->current_packet_end_timestamp
);
630 case BT_CTF_NOTIF_ITER_STATUS_EOF
:
631 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
;
633 case BT_CTF_NOTIF_ITER_STATUS_OK
:
634 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
636 case BT_CTF_NOTIF_ITER_STATUS_AGAIN
:
638 * Continue immediately (end of packet). The next
639 * get_index may return AGAIN to delay the following
642 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
644 case BT_CTF_NOTIF_ITER_STATUS_INVAL
:
645 /* No argument provided by the user, so don't return INVAL. */
646 case BT_CTF_NOTIF_ITER_STATUS_ERROR
:
648 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
;
656 * handle_no_data_streams()
658 * - for each ACTIVE_NO_DATA stream:
659 * - query relayd for stream data, or quiescence info.
660 * - if need metadata, get metadata, goto retry.
661 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
662 * - if quiescent, move to QUIESCENT streams
663 * - if fetched data, move to ACTIVE_DATA streams
664 * (at this point each stream either has data, or is quiescent)
668 * handle_new_streams_and_metadata()
669 * - query relayd for known streams, add them as ACTIVE_NO_DATA
670 * - query relayd for metadata
672 * call handle_active_no_data_streams()
674 * handle_quiescent_streams()
675 * - if at least one stream is ACTIVE_DATA:
676 * - peek stream event with lowest timestamp -> next_ts
677 * - for each quiescent stream
678 * - if next_ts >= quiescent end
679 * - set state to ACTIVE_NO_DATA
681 * - for each quiescent stream
682 * - set state to ACTIVE_NO_DATA
684 * call handle_active_no_data_streams()
686 * handle_active_data_streams()
687 * - if at least one stream is ACTIVE_DATA:
688 * - get stream event with lowest timestamp from heap
689 * - make that stream event the current notification.
690 * - move this stream heap position to its next event
691 * - if we need to fetch data from relayd, move
692 * stream to ACTIVE_NO_DATA.
696 * end criterion: ctrl-c on client. If relayd exits or the session
697 * closes on the relay daemon side, we keep on waiting for streams.
698 * Eventually handle --end timestamp (also an end criterion).
700 * When disconnected from relayd: try to re-connect endlessly.
703 struct bt_notification_iterator_next_return
lttng_live_iterator_next_stream(
704 struct bt_private_notification_iterator
*iterator
,
705 struct lttng_live_stream_iterator
*stream_iter
)
707 enum bt_ctf_lttng_live_iterator_status status
;
708 struct bt_notification_iterator_next_return next_return
;
709 struct lttng_live_component
*lttng_live
;
711 lttng_live
= stream_iter
->trace
->session
->lttng_live
;
713 print_stream_state(stream_iter
);
714 next_return
.notification
= NULL
;
715 status
= lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live
);
716 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
719 status
= lttng_live_iterator_next_handle_one_no_data_stream(
720 lttng_live
, stream_iter
);
721 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
724 status
= lttng_live_iterator_next_handle_one_quiescent_stream(
725 lttng_live
, stream_iter
, &next_return
.notification
);
726 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
727 assert(next_return
.notification
== NULL
);
730 if (next_return
.notification
) {
733 status
= lttng_live_iterator_next_handle_one_active_data_stream(lttng_live
,
734 stream_iter
, &next_return
.notification
);
735 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
736 assert(next_return
.notification
== NULL
);
741 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
:
742 print_dbg("continue");
744 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN
:
745 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_AGAIN
;
748 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
:
749 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_END
;
752 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
:
753 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_OK
;
756 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL
:
757 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_INVALID
;
759 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM
:
760 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_NOMEM
;
762 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED
:
763 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED
;
765 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
:
766 default: /* fall-through */
767 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
774 struct bt_notification_iterator_next_return
lttng_live_iterator_next_no_stream(
775 struct bt_private_notification_iterator
*iterator
,
776 struct lttng_live_no_stream_iterator
*no_stream_iter
)
778 enum bt_ctf_lttng_live_iterator_status status
;
779 struct bt_notification_iterator_next_return next_return
;
780 struct lttng_live_component
*lttng_live
;
782 lttng_live
= no_stream_iter
->lttng_live
;
784 lttng_live_force_new_streams_and_metadata(lttng_live
);
785 status
= lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live
);
786 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
789 if (no_stream_iter
->port
) {
790 status
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN
;
792 status
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
;
796 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
:
798 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN
:
799 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_AGAIN
;
801 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
:
802 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_END
;
804 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
:
805 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_OK
;
807 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL
:
808 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_INVALID
;
810 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM
:
811 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_NOMEM
;
813 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED
:
814 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED
;
816 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
:
817 default: /* fall-through */
818 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
825 struct bt_notification_iterator_next_return
lttng_live_iterator_next(
826 struct bt_private_notification_iterator
*iterator
)
828 struct lttng_live_stream_iterator_generic
*s
=
829 bt_private_notification_iterator_get_user_data(iterator
);
830 struct bt_notification_iterator_next_return next_return
;
833 case LIVE_STREAM_TYPE_NO_STREAM
:
834 next_return
= lttng_live_iterator_next_no_stream(iterator
,
835 container_of(s
, struct lttng_live_no_stream_iterator
, p
));
837 case LIVE_STREAM_TYPE_STREAM
:
838 next_return
= lttng_live_iterator_next_stream(iterator
,
839 container_of(s
, struct lttng_live_stream_iterator
, p
));
842 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
849 enum bt_notification_iterator_status
lttng_live_iterator_init(
850 struct bt_private_notification_iterator
*it
,
851 struct bt_private_port
*port
)
853 enum bt_notification_iterator_status ret
=
854 BT_NOTIFICATION_ITERATOR_STATUS_OK
;
855 struct lttng_live_stream_iterator_generic
*s
;
856 struct lttng_live_component
*lttng_live
;
860 s
= bt_private_port_get_user_data(port
);
863 case LIVE_STREAM_TYPE_NO_STREAM
:
865 struct lttng_live_no_stream_iterator
*no_stream_iter
=
866 container_of(s
, struct lttng_live_no_stream_iterator
, p
);
867 lttng_live
= no_stream_iter
->lttng_live
;
868 ret
= bt_private_notification_iterator_set_user_data(it
, no_stream_iter
);
874 case LIVE_STREAM_TYPE_STREAM
:
876 struct lttng_live_stream_iterator
*stream_iter
=
877 container_of(s
, struct lttng_live_stream_iterator
, p
);
878 lttng_live
= stream_iter
->trace
->session
->lttng_live
;
879 ret
= bt_private_notification_iterator_set_user_data(it
, stream_iter
);
886 ret
= BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
893 if (bt_private_notification_iterator_set_user_data(it
, NULL
)
894 != BT_NOTIFICATION_ITERATOR_STATUS_OK
) {
895 PERR("Error setting private data to NULL\n");
901 struct bt_value
*lttng_live_query_list_sessions(struct bt_component_class
*comp_class
,
902 struct bt_value
*params
)
904 struct bt_value
*url_value
= NULL
;
905 struct bt_value
*results
= NULL
;
907 struct bt_live_viewer_connection
*viewer_connection
= NULL
;
908 enum bt_value_status ret
;
910 url_value
= bt_value_map_get(params
, "url");
911 if (!url_value
|| bt_value_is_null(url_value
) || !bt_value_is_string(url_value
)) {
912 fprintf(stderr
, "Mandatory \"url\" parameter missing\n");
916 ret
= bt_value_string_get(url_value
, &url
);
917 if (ret
!= BT_VALUE_STATUS_OK
) {
918 fprintf(stderr
, "\"url\" parameter is required to be a string value\n");
922 viewer_connection
= bt_live_viewer_connection_create(url
, stderr
);
923 if (!viewer_connection
) {
924 ret
= BT_COMPONENT_STATUS_NOMEM
;
928 results
= bt_live_viewer_connection_list_sessions(viewer_connection
);
933 if (viewer_connection
) {
934 bt_live_viewer_connection_destroy(viewer_connection
);
941 struct bt_value
*lttng_live_query(struct bt_component_class
*comp_class
,
942 const char *object
, struct bt_value
*params
)
944 if (strcmp(object
, "sessions") == 0) {
945 return lttng_live_query_list_sessions(comp_class
,
948 fprintf(stderr
, "Unknown query object `%s`\n", object
);
953 void lttng_live_component_destroy_data(struct lttng_live_component
*lttng_live
)
956 struct lttng_live_session
*session
, *s
;
958 bt_list_for_each_entry_safe(session
, s
, <tng_live
->sessions
, node
) {
959 lttng_live_destroy_session(session
);
961 BT_PUT(lttng_live
->viewer_connection
);
962 if (lttng_live
->url
) {
963 g_string_free(lttng_live
->url
, TRUE
);
965 if (lttng_live
->no_stream_port
) {
966 ret
= bt_private_port_remove_from_component(lttng_live
->no_stream_port
);
968 BT_PUT(lttng_live
->no_stream_port
);
970 if (lttng_live
->no_stream_iter
) {
971 g_free(lttng_live
->no_stream_iter
);
977 void lttng_live_component_finalize(struct bt_private_component
*component
)
979 void *data
= bt_private_component_get_user_data(component
);
984 lttng_live_component_destroy_data(data
);
988 struct lttng_live_component
*lttng_live_component_create(struct bt_value
*params
,
989 struct bt_private_component
*private_component
)
991 struct lttng_live_component
*lttng_live
;
992 struct bt_value
*value
= NULL
;
994 enum bt_value_status ret
;
996 lttng_live
= g_new0(struct lttng_live_component
, 1);
1000 lttng_live
->error_fp
= stderr
;
1001 /* TODO: make this an overridable parameter. */
1002 lttng_live
->max_query_size
= MAX_QUERY_SIZE
;
1003 BT_INIT_LIST_HEAD(<tng_live
->sessions
);
1004 value
= bt_value_map_get(params
, "url");
1005 if (!value
|| bt_value_is_null(value
) || !bt_value_is_string(value
)) {
1006 fprintf(stderr
, "Mandatory \"url\" parameter missing\n");
1009 ret
= bt_value_string_get(value
, &url
);
1010 if (ret
!= BT_VALUE_STATUS_OK
) {
1011 fprintf(stderr
, "\"url\" parameter is required to be a string value\n");
1014 lttng_live
->url
= g_string_new(url
);
1015 if (!lttng_live
->url
) {
1018 lttng_live
->viewer_connection
=
1019 bt_live_viewer_connection_create(lttng_live
->url
->str
,
1021 if (!lttng_live
->viewer_connection
) {
1022 ret
= BT_COMPONENT_STATUS_NOMEM
;
1025 if (lttng_live_create_viewer_session(lttng_live
)) {
1026 ret
= BT_COMPONENT_STATUS_ERROR
;
1029 lttng_live
->private_component
= private_component
;
1034 lttng_live_component_destroy_data(lttng_live
);
1041 enum bt_component_status
lttng_live_component_init(struct bt_private_component
*component
,
1042 struct bt_value
*params
, void *init_method_data
)
1044 struct lttng_live_component
*lttng_live
;
1045 enum bt_component_status ret
= BT_COMPONENT_STATUS_OK
;
1047 lttng_live_debug
= g_strcmp0(getenv("LTTNG_LIVE_DEBUG"), "1") == 0;
1049 /* Passes ownership of iter ref to lttng_live_component_create. */
1050 lttng_live
= lttng_live_component_create(params
, component
);
1052 ret
= BT_COMPONENT_STATUS_NOMEM
;
1056 lttng_live
->no_stream_iter
= g_new0(struct lttng_live_no_stream_iterator
, 1);
1057 lttng_live
->no_stream_iter
->p
.type
= LIVE_STREAM_TYPE_NO_STREAM
;
1058 lttng_live
->no_stream_iter
->lttng_live
= lttng_live
;
1060 lttng_live
->no_stream_port
=
1061 bt_private_component_source_add_output_private_port(
1062 lttng_live
->private_component
, "no-stream",
1063 lttng_live
->no_stream_iter
);
1064 lttng_live
->no_stream_iter
->port
= lttng_live
->no_stream_port
;
1066 ret
= bt_private_component_set_user_data(component
, lttng_live
);
1067 if (ret
!= BT_COMPONENT_STATUS_OK
) {
1074 (void) bt_private_component_set_user_data(component
, NULL
);
1075 lttng_live_component_destroy_data(lttng_live
);