* Babeltrace CTF LTTng-live Client Component
*
* Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
* Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
* SOFTWARE.
*/
+#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC"
+#include "logging.h"
+
+#include <babeltrace/ctf-ir/packet.h>
+#include <babeltrace/graph/component-source.h>
+#include <babeltrace/graph/private-port.h>
+#include <babeltrace/graph/port.h>
+#include <babeltrace/graph/private-component.h>
+#include <babeltrace/graph/private-component-source.h>
+#include <babeltrace/graph/private-notification-iterator.h>
+#include <babeltrace/graph/notification-stream.h>
+#include <babeltrace/graph/notification-packet.h>
+#include <babeltrace/graph/notification-event.h>
+#include <babeltrace/graph/notification-heap.h>
+#include <babeltrace/graph/notification-iterator.h>
+#include <babeltrace/graph/notification-inactivity.h>
+#include <babeltrace/graph/graph.h>
+#include <babeltrace/compiler-internal.h>
+#include <babeltrace/types.h>
+#include <inttypes.h>
+#include <glib.h>
+#include <assert.h>
+#include <unistd.h>
+#include <plugins-common.h>
+
+#include "data-stream.h"
+#include "metadata.h"
#include "lttng-live-internal.h"
-#include <babeltrace/component/source.h>
+
+#define MAX_QUERY_SIZE (256*1024)
+
+#define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
+
+static const char *print_state(struct lttng_live_stream_iterator *s)
+{
+ switch (s->state) {
+ case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
+ return "ACTIVE_NO_DATA";
+ case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
+ return "QUIESCENT_NO_DATA";
+ case LTTNG_LIVE_STREAM_QUIESCENT:
+ return "QUIESCENT";
+ case LTTNG_LIVE_STREAM_ACTIVE_DATA:
+ return "ACTIVE_DATA";
+ case LTTNG_LIVE_STREAM_EOF:
+ return "EOF";
+ default:
+ return "ERROR";
+ }
+}
+
+static
+void print_stream_state(struct lttng_live_stream_iterator *stream)
+{
+ struct bt_port *port;
+
+ port = bt_port_from_private_port(stream->port);
+ print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64,
+ bt_port_get_name(port),
+ print_state(stream),
+ stream->last_returned_inactivity_timestamp,
+ stream->current_inactivity_timestamp);
+ bt_put(port);
+}
+
+BT_HIDDEN
+bt_bool lttng_live_is_canceled(struct lttng_live_component *lttng_live)
+{
+ struct bt_component *component;
+ struct bt_graph *graph;
+ bt_bool ret;
+
+ if (!lttng_live) {
+ return BT_FALSE;
+ }
+
+ component = bt_component_from_private_component(lttng_live->private_component);
+ graph = bt_component_get_graph(component);
+ ret = bt_graph_is_canceled(graph);
+ bt_put(graph);
+ bt_put(component);
+ return ret;
+}
+
+BT_HIDDEN
+int lttng_live_add_port(struct lttng_live_component *lttng_live,
+ struct lttng_live_stream_iterator *stream_iter)
+{
+ int ret;
+ struct bt_private_port *private_port;
+ char name[STREAM_NAME_MAX_LEN];
+ enum bt_component_status status;
+
+ ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id);
+ assert(ret > 0);
+ strcpy(stream_iter->name, name);
+ if (lttng_live_is_canceled(lttng_live)) {
+ return 0;
+ }
+ status = bt_private_component_source_add_output_private_port(
+ lttng_live->private_component, name, stream_iter,
+ &private_port);
+ switch (status) {
+ case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
+ return 0;
+ case BT_COMPONENT_STATUS_OK:
+ break;
+ default:
+ return -1;
+ }
+ bt_put(private_port); /* weak */
+ BT_LOGI("Added port %s", name);
+
+ if (lttng_live->no_stream_port) {
+ bt_get(lttng_live->no_stream_port);
+ ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
+ bt_put(lttng_live->no_stream_port);
+ if (ret) {
+ return -1;
+ }
+ lttng_live->no_stream_port = NULL;
+ lttng_live->no_stream_iter->port = NULL;
+ }
+ stream_iter->port = private_port;
+ return 0;
+}
+
+BT_HIDDEN
+int lttng_live_remove_port(struct lttng_live_component *lttng_live,
+ struct bt_private_port *port)
+{
+ struct bt_component *component;
+ int64_t nr_ports;
+ int ret;
+
+ component = bt_component_from_private_component(lttng_live->private_component);
+ nr_ports = bt_component_source_get_output_port_count(component);
+ if (nr_ports < 0) {
+ return -1;
+ }
+ BT_PUT(component);
+ if (nr_ports == 1) {
+ enum bt_component_status status;
+
+ assert(!lttng_live->no_stream_port);
+
+ if (lttng_live_is_canceled(lttng_live)) {
+ return 0;
+ }
+ status = bt_private_component_source_add_output_private_port(lttng_live->private_component,
+ "no-stream", lttng_live->no_stream_iter,
+ <tng_live->no_stream_port);
+ switch (status) {
+ case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
+ return 0;
+ case BT_COMPONENT_STATUS_OK:
+ break;
+ default:
+ return -1;
+ }
+ bt_put(lttng_live->no_stream_port); /* weak */
+ lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
+ }
+ bt_get(port);
+ ret = bt_private_port_remove_from_component(port);
+ bt_put(port);
+ if (ret) {
+ return -1;
+ }
+ return 0;
+}
+
+static
+struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
+ uint64_t trace_id)
+{
+ struct lttng_live_trace *trace;
+
+ bt_list_for_each_entry(trace, &session->traces, node) {
+ if (trace->id == trace_id) {
+ return trace;
+ }
+ }
+ return NULL;
+}
+
+static
+void lttng_live_destroy_trace(struct bt_object *obj)
+{
+ struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj);
+
+ BT_LOGI("Destroy trace");
+ assert(bt_list_empty(&trace->streams));
+ bt_list_del(&trace->node);
+
+ if (trace->trace) {
+ int retval;
+
+ retval = bt_ctf_trace_set_is_static(trace->trace);
+ assert(!retval);
+ BT_PUT(trace->trace);
+ }
+ lttng_live_metadata_fini(trace);
+ BT_PUT(trace->cc_prio_map);
+ g_free(trace);
+}
+
+static
+struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
+ uint64_t trace_id)
+{
+ struct lttng_live_trace *trace = NULL;
+
+ trace = g_new0(struct lttng_live_trace, 1);
+ if (!trace) {
+ goto error;
+ }
+ trace->session = session;
+ trace->id = trace_id;
+ BT_INIT_LIST_HEAD(&trace->streams);
+ trace->new_metadata_needed = true;
+ bt_list_add(&trace->node, &session->traces);
+ bt_object_init(&trace->obj, lttng_live_destroy_trace);
+ BT_LOGI("Create trace");
+ goto end;
+error:
+ g_free(trace);
+ trace = NULL;
+end:
+ return trace;
+}
BT_HIDDEN
-enum bt_component_status lttng_live_init(struct bt_component *component,
+struct lttng_live_trace *lttng_live_ref_trace(struct lttng_live_session *session,
+ uint64_t trace_id)
+{
+ struct lttng_live_trace *trace;
+
+ trace = lttng_live_find_trace(session, trace_id);
+ if (trace) {
+ bt_get(trace);
+ return trace;
+ }
+ return lttng_live_create_trace(session, trace_id);
+}
+
+BT_HIDDEN
+void lttng_live_unref_trace(struct lttng_live_trace *trace)
+{
+ bt_put(trace);
+}
+
+static
+void lttng_live_close_trace_streams(struct lttng_live_trace *trace)
+{
+ struct lttng_live_stream_iterator *stream, *s;
+
+ bt_list_for_each_entry_safe(stream, s, &trace->streams, node) {
+ lttng_live_stream_iterator_destroy(stream);
+ }
+ lttng_live_metadata_fini(trace);
+}
+
+BT_HIDDEN
+int lttng_live_add_session(struct lttng_live_component *lttng_live,
+ uint64_t session_id, const char *hostname,
+ const char *session_name)
+{
+ int ret = 0;
+ struct lttng_live_session *s;
+
+ s = g_new0(struct lttng_live_session, 1);
+ if (!s) {
+ goto error;
+ }
+
+ s->id = session_id;
+ BT_INIT_LIST_HEAD(&s->traces);
+ s->lttng_live = lttng_live;
+ s->new_streams_needed = true;
+ s->hostname = g_string_new(hostname);
+ s->session_name = g_string_new(session_name);
+
+ BT_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s",
+ s->id, hostname, session_name);
+ bt_list_add(&s->node, <tng_live->sessions);
+ goto end;
+error:
+ BT_LOGE("Error adding session");
+ g_free(s);
+ ret = -1;
+end:
+ return ret;
+}
+
+static
+void lttng_live_destroy_session(struct lttng_live_session *session)
+{
+ struct lttng_live_trace *trace, *t;
+
+ BT_LOGI("Destroy session");
+ if (session->id != -1ULL) {
+ if (lttng_live_detach_session(session)) {
+ if (!lttng_live_is_canceled(session->lttng_live)) {
+ /* Old relayd cannot detach sessions. */
+ BT_LOGD("Unable to detach session %" PRIu64,
+ session->id);
+ }
+ }
+ session->id = -1ULL;
+ }
+ bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
+ lttng_live_close_trace_streams(trace);
+ }
+ bt_list_del(&session->node);
+ if (session->hostname) {
+ g_string_free(session->hostname, TRUE);
+ }
+ if (session->session_name) {
+ g_string_free(session->session_name, TRUE);
+ }
+ g_free(session);
+}
+
+BT_HIDDEN
+void lttng_live_iterator_finalize(struct bt_private_notification_iterator *it)
+{
+ struct lttng_live_stream_iterator_generic *s =
+ bt_private_notification_iterator_get_user_data(it);
+
+ switch (s->type) {
+ case LIVE_STREAM_TYPE_NO_STREAM:
+ {
+ /* Leave no_stream_iter in place when port is removed. */
+ break;
+ }
+ case LIVE_STREAM_TYPE_STREAM:
+ {
+ struct lttng_live_stream_iterator *stream_iter =
+ container_of(s, struct lttng_live_stream_iterator, p);
+
+ lttng_live_stream_iterator_destroy(stream_iter);
+ break;
+ }
+ }
+}
+
+static
+enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
+ struct lttng_live_component *lttng_live,
+ struct lttng_live_stream_iterator *lttng_live_stream)
+{
+ switch (lttng_live_stream->state) {
+ case LTTNG_LIVE_STREAM_QUIESCENT:
+ case LTTNG_LIVE_STREAM_ACTIVE_DATA:
+ break;
+ case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
+ /* Invalid state. */
+ BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
+ abort();
+ case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
+ /* Invalid state. */
+ BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
+ abort();
+ case LTTNG_LIVE_STREAM_EOF:
+ break;
+ }
+ return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+}
+
+/*
+ * For active no data stream, fetch next data. It can be either:
+ * - quiescent: need to put it in the prio heap at quiescent end
+ * timestamp,
+ * - have data: need to wire up first event into the prio heap,
+ * - have no data on this stream at this point: need to retry (AGAIN) or
+ * return EOF.
+ */
+static
+enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
+ struct lttng_live_component *lttng_live,
+ struct lttng_live_stream_iterator *lttng_live_stream)
+{
+ enum bt_ctf_lttng_live_iterator_status ret =
+ BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ struct packet_index index;
+ enum lttng_live_stream_state orig_state = lttng_live_stream->state;
+
+ if (lttng_live_stream->trace->new_metadata_needed) {
+ ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ goto end;
+ }
+ if (lttng_live_stream->trace->session->new_streams_needed) {
+ ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ goto end;
+ }
+ if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
+ && lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
+ goto end;
+ }
+ ret = lttng_live_get_next_index(lttng_live, lttng_live_stream, &index);
+ if (ret != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+ assert(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
+ if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
+ if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
+ && lttng_live_stream->last_returned_inactivity_timestamp ==
+ lttng_live_stream->current_inactivity_timestamp) {
+ ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ print_stream_state(lttng_live_stream);
+ } else {
+ ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ }
+ goto end;
+ }
+ lttng_live_stream->base_offset = index.offset;
+ lttng_live_stream->offset = index.offset;
+ lttng_live_stream->len = index.packet_size / CHAR_BIT;
+end:
+ if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ ret = lttng_live_iterator_next_check_stream_state(
+ lttng_live, lttng_live_stream);
+ }
+ return ret;
+}
+
+/*
+ * Creation of the notification requires the ctf trace to be created
+ * beforehand, but the live protocol gives us all streams (including
+ * metadata) at once. So we split it in three steps: getting streams,
+ * getting metadata (which creates the ctf trace), and then creating the
+ * per-stream notifications.
+ */
+static
+enum bt_ctf_lttng_live_iterator_status lttng_live_get_session(
+ struct lttng_live_component *lttng_live,
+ struct lttng_live_session *session)
+{
+ enum bt_ctf_lttng_live_iterator_status status;
+ struct lttng_live_trace *trace, *t;
+
+ if (lttng_live_attach_session(session)) {
+ if (lttng_live_is_canceled(lttng_live)) {
+ return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ } else {
+ return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ }
+ }
+ status = lttng_live_get_new_streams(session);
+ if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
+ status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
+ return status;
+ }
+ bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
+ status = lttng_live_metadata_update(trace);
+ if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
+ status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
+ return status;
+ }
+ }
+ return lttng_live_lazy_notif_init(session);
+}
+
+BT_HIDDEN
+void lttng_live_need_new_streams(struct lttng_live_component *lttng_live)
+{
+ struct lttng_live_session *session;
+
+ bt_list_for_each_entry(session, <tng_live->sessions, node) {
+ session->new_streams_needed = true;
+ }
+}
+
+static
+void lttng_live_force_new_streams_and_metadata(struct lttng_live_component *lttng_live)
+{
+ struct lttng_live_session *session;
+
+ bt_list_for_each_entry(session, <tng_live->sessions, node) {
+ struct lttng_live_trace *trace;
+
+ session->new_streams_needed = true;
+ bt_list_for_each_entry(trace, &session->traces, node) {
+ trace->new_metadata_needed = true;
+ }
+ }
+}
+
+static
+enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
+ struct lttng_live_component *lttng_live)
+{
+ enum bt_ctf_lttng_live_iterator_status ret =
+ BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ unsigned int nr_sessions_opened = 0;
+ struct lttng_live_session *session, *s;
+
+ bt_list_for_each_entry_safe(session, s, <tng_live->sessions, node) {
+ if (session->closed && bt_list_empty(&session->traces)) {
+ lttng_live_destroy_session(session);
+ }
+ }
+ /*
+ * Currently, when there are no sessions, we quit immediately.
+ * We may want to add a component parameter to keep trying until
+ * we get data in the future.
+ * Also, in a remotely distant future, we could add a "new
+ * session" flag to the protocol, which would tell us that we
+ * need to query for new sessions even though we have sessions
+ * currently ongoing.
+ */
+ if (bt_list_empty(<tng_live->sessions)) {
+ ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
+ goto end;
+ }
+ bt_list_for_each_entry(session, <tng_live->sessions, node) {
+ ret = lttng_live_get_session(lttng_live, session);
+ switch (ret) {
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
+ break;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
+ ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ break;
+ default:
+ goto end;
+ }
+ if (!session->closed) {
+ nr_sessions_opened++;
+ }
+ }
+end:
+ if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) {
+ ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
+ }
+ return ret;
+}
+
+static
+enum bt_ctf_lttng_live_iterator_status emit_inactivity_notification(
+ struct lttng_live_component *lttng_live,
+ struct lttng_live_stream_iterator *lttng_live_stream,
+ struct bt_notification **notification,
+ uint64_t timestamp)
+{
+ enum bt_ctf_lttng_live_iterator_status ret =
+ BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ struct lttng_live_trace *trace;
+ struct bt_ctf_clock_class *clock_class = NULL;
+ struct bt_ctf_clock_value *clock_value = NULL;
+ struct bt_notification *notif = NULL;
+ int retval;
+
+ trace = lttng_live_stream->trace;
+ if (!trace) {
+ goto error;
+ }
+ clock_class = bt_clock_class_priority_map_get_clock_class_by_index(trace->cc_prio_map, 0);
+ if (!clock_class) {
+ goto error;
+ }
+ clock_value = bt_ctf_clock_value_create(clock_class, timestamp);
+ if (!clock_value) {
+ goto error;
+ }
+ notif = bt_notification_inactivity_create(trace->cc_prio_map);
+ if (!notif) {
+ goto error;
+ }
+ retval = bt_notification_inactivity_set_clock_value(notif, clock_value);
+ if (retval) {
+ goto error;
+ }
+ *notification = notif;
+end:
+ bt_put(clock_value);
+ bt_put(clock_class);
+ return ret;
+
+error:
+ ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ bt_put(notif);
+ goto end;
+}
+
+static
+enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
+ struct lttng_live_component *lttng_live,
+ struct lttng_live_stream_iterator *lttng_live_stream,
+ struct bt_notification **notification)
+{
+ enum bt_ctf_lttng_live_iterator_status ret =
+ BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ struct bt_ctf_clock_class *clock_class = NULL;
+ struct bt_ctf_clock_value *clock_value = NULL;
+
+ if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
+ return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ }
+
+ if (lttng_live_stream->current_inactivity_timestamp ==
+ lttng_live_stream->last_returned_inactivity_timestamp) {
+ lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
+ ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ goto end;
+ }
+
+ ret = emit_inactivity_notification(lttng_live, lttng_live_stream, notification,
+ (uint64_t) lttng_live_stream->current_inactivity_timestamp);
+
+ lttng_live_stream->last_returned_inactivity_timestamp =
+ lttng_live_stream->current_inactivity_timestamp;
+end:
+ bt_put(clock_value);
+ bt_put(clock_class);
+ return ret;
+}
+
+static
+enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
+ struct lttng_live_component *lttng_live,
+ struct lttng_live_stream_iterator *lttng_live_stream,
+ struct bt_notification **notification)
+{
+ enum bt_ctf_lttng_live_iterator_status ret =
+ BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ enum bt_ctf_notif_iter_status status;
+ struct lttng_live_session *session;
+
+ bt_list_for_each_entry(session, <tng_live->sessions, node) {
+ struct lttng_live_trace *trace;
+
+ if (session->new_streams_needed) {
+ return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ }
+ bt_list_for_each_entry(trace, &session->traces, node) {
+ if (trace->new_metadata_needed) {
+ return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ }
+ }
+ }
+
+ if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
+ return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ }
+ if (lttng_live_stream->packet_end_notif_queue) {
+ *notification = lttng_live_stream->packet_end_notif_queue;
+ lttng_live_stream->packet_end_notif_queue = NULL;
+ status = BT_CTF_NOTIF_ITER_STATUS_OK;
+ } else {
+ status = bt_ctf_notif_iter_get_next_notification(
+ lttng_live_stream->notif_iter,
+ lttng_live_stream->trace->cc_prio_map,
+ notification);
+ if (status == BT_CTF_NOTIF_ITER_STATUS_OK) {
+ /*
+ * Consider empty packets as inactivity.
+ */
+ if (bt_notification_get_type(*notification) == BT_NOTIFICATION_TYPE_PACKET_END) {
+ lttng_live_stream->packet_end_notif_queue = *notification;
+ *notification = NULL;
+ return emit_inactivity_notification(lttng_live,
+ lttng_live_stream, notification,
+ lttng_live_stream->current_packet_end_timestamp);
+ }
+ }
+ }
+ switch (status) {
+ case BT_CTF_NOTIF_ITER_STATUS_EOF:
+ ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
+ break;
+ case BT_CTF_NOTIF_ITER_STATUS_OK:
+ ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ break;
+ case BT_CTF_NOTIF_ITER_STATUS_AGAIN:
+ /*
+ * Continue immediately (end of packet). The next
+ * get_index may return AGAIN to delay the following
+ * attempt.
+ */
+ ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ break;
+ case BT_CTF_NOTIF_ITER_STATUS_INVAL:
+ /* No argument provided by the user, so don't return INVAL. */
+ case BT_CTF_NOTIF_ITER_STATUS_ERROR:
+ default:
+ ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ break;
+ }
+ return ret;
+}
+
+/*
+ * helper function:
+ * handle_no_data_streams()
+ * retry:
+ * - for each ACTIVE_NO_DATA stream:
+ * - query relayd for stream data, or quiescence info.
+ * - if need metadata, get metadata, goto retry.
+ * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
+ * - if quiescent, move to QUIESCENT streams
+ * - if fetched data, move to ACTIVE_DATA streams
+ * (at this point each stream either has data, or is quiescent)
+ *
+ *
+ * iterator_next:
+ * handle_new_streams_and_metadata()
+ * - query relayd for known streams, add them as ACTIVE_NO_DATA
+ * - query relayd for metadata
+ *
+ * call handle_active_no_data_streams()
+ *
+ * handle_quiescent_streams()
+ * - if at least one stream is ACTIVE_DATA:
+ * - peek stream event with lowest timestamp -> next_ts
+ * - for each quiescent stream
+ * - if next_ts >= quiescent end
+ * - set state to ACTIVE_NO_DATA
+ * - else
+ * - for each quiescent stream
+ * - set state to ACTIVE_NO_DATA
+ *
+ * call handle_active_no_data_streams()
+ *
+ * handle_active_data_streams()
+ * - if at least one stream is ACTIVE_DATA:
+ * - get stream event with lowest timestamp from heap
+ * - make that stream event the current notification.
+ * - move this stream heap position to its next event
+ * - if we need to fetch data from relayd, move
+ * stream to ACTIVE_NO_DATA.
+ * - return OK
+ * - return AGAIN
+ *
+ * end criterion: ctrl-c on client. If relayd exits or the session
+ * closes on the relay daemon side, we keep on waiting for streams.
+ * Eventually handle --end timestamp (also an end criterion).
+ *
+ * When disconnected from relayd: try to re-connect endlessly.
+ */
+static
+struct bt_notification_iterator_next_return lttng_live_iterator_next_stream(
+ struct bt_private_notification_iterator *iterator,
+ struct lttng_live_stream_iterator *stream_iter)
+{
+ enum bt_ctf_lttng_live_iterator_status status;
+ struct bt_notification_iterator_next_return next_return;
+ struct lttng_live_component *lttng_live;
+
+ lttng_live = stream_iter->trace->session->lttng_live;
+retry:
+ print_stream_state(stream_iter);
+ next_return.notification = NULL;
+ status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
+ if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+ status = lttng_live_iterator_next_handle_one_no_data_stream(
+ lttng_live, stream_iter);
+ if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+ status = lttng_live_iterator_next_handle_one_quiescent_stream(
+ lttng_live, stream_iter, &next_return.notification);
+ if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ assert(next_return.notification == NULL);
+ goto end;
+ }
+ if (next_return.notification) {
+ goto end;
+ }
+ status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live,
+ stream_iter, &next_return.notification);
+ if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ assert(next_return.notification == NULL);
+ }
+
+end:
+ switch (status) {
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
+ print_dbg("continue");
+ goto retry;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
+ print_dbg("again");
+ break;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
+ print_dbg("end");
+ break;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
+ print_dbg("ok");
+ break;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
+ break;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
+ break;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
+ break;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
+ default: /* fall-through */
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+ break;
+ }
+ return next_return;
+}
+
+static
+struct bt_notification_iterator_next_return lttng_live_iterator_next_no_stream(
+ struct bt_private_notification_iterator *iterator,
+ struct lttng_live_no_stream_iterator *no_stream_iter)
+{
+ enum bt_ctf_lttng_live_iterator_status status;
+ struct bt_notification_iterator_next_return next_return;
+ struct lttng_live_component *lttng_live;
+
+ lttng_live = no_stream_iter->lttng_live;
+retry:
+ lttng_live_force_new_streams_and_metadata(lttng_live);
+ next_return.notification = NULL;
+ status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
+ if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+ if (no_stream_iter->port) {
+ status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ } else {
+ status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
+ }
+end:
+ switch (status) {
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
+ goto retry;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
+ break;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
+ break;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
+ break;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
+ break;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
+ break;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
+ default: /* fall-through */
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+ break;
+ }
+ return next_return;
+}
+
+BT_HIDDEN
+struct bt_notification_iterator_next_return lttng_live_iterator_next(
+ struct bt_private_notification_iterator *iterator)
+{
+ struct lttng_live_stream_iterator_generic *s =
+ bt_private_notification_iterator_get_user_data(iterator);
+ struct bt_notification_iterator_next_return next_return;
+
+ switch (s->type) {
+ case LIVE_STREAM_TYPE_NO_STREAM:
+ next_return = lttng_live_iterator_next_no_stream(iterator,
+ container_of(s, struct lttng_live_no_stream_iterator, p));
+ break;
+ case LIVE_STREAM_TYPE_STREAM:
+ next_return = lttng_live_iterator_next_stream(iterator,
+ container_of(s, struct lttng_live_stream_iterator, p));
+ break;
+ default:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+ break;
+ }
+ return next_return;
+}
+
+BT_HIDDEN
+enum bt_notification_iterator_status lttng_live_iterator_init(
+ struct bt_private_notification_iterator *it,
+ struct bt_private_port *port)
+{
+ enum bt_notification_iterator_status ret =
+ BT_NOTIFICATION_ITERATOR_STATUS_OK;
+ struct lttng_live_stream_iterator_generic *s;
+
+ assert(it);
+
+ s = bt_private_port_get_user_data(port);
+ assert(s);
+ switch (s->type) {
+ case LIVE_STREAM_TYPE_NO_STREAM:
+ {
+ struct lttng_live_no_stream_iterator *no_stream_iter =
+ container_of(s, struct lttng_live_no_stream_iterator, p);
+ ret = bt_private_notification_iterator_set_user_data(it, no_stream_iter);
+ if (ret) {
+ goto error;
+ }
+ break;
+ }
+ case LIVE_STREAM_TYPE_STREAM:
+ {
+ struct lttng_live_stream_iterator *stream_iter =
+ container_of(s, struct lttng_live_stream_iterator, p);
+ ret = bt_private_notification_iterator_set_user_data(it, stream_iter);
+ if (ret) {
+ goto error;
+ }
+ break;
+ }
+ default:
+ ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+ goto end;
+ }
+
+end:
+ return ret;
+error:
+ if (bt_private_notification_iterator_set_user_data(it, NULL)
+ != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
+ BT_LOGE("Error setting private data to NULL");
+ }
+ goto end;
+}
+
+static
+struct bt_value *lttng_live_query_list_sessions(struct bt_component_class *comp_class,
struct bt_value *params)
{
- return BT_COMPONENT_STATUS_OK;
+ struct bt_value *url_value = NULL;
+ struct bt_value *results = NULL;
+ const char *url;
+ struct bt_live_viewer_connection *viewer_connection = NULL;
+
+ url_value = bt_value_map_get(params, "url");
+ if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) {
+ BT_LOGW("Mandatory \"url\" parameter missing");
+ goto error;
+ }
+
+ if (bt_value_string_get(url_value, &url) != BT_VALUE_STATUS_OK) {
+ BT_LOGW("\"url\" parameter is required to be a string value");
+ goto error;
+ }
+
+ viewer_connection = bt_live_viewer_connection_create(url, NULL);
+ if (!viewer_connection) {
+ goto error;
+ }
+
+ results = bt_live_viewer_connection_list_sessions(viewer_connection);
+ goto end;
+error:
+ BT_PUT(results);
+end:
+ if (viewer_connection) {
+ bt_live_viewer_connection_destroy(viewer_connection);
+ }
+ BT_PUT(url_value);
+ return results;
+}
+
+BT_HIDDEN
+struct bt_value *lttng_live_query(struct bt_component_class *comp_class,
+ const char *object, struct bt_value *params)
+{
+ if (strcmp(object, "sessions") == 0) {
+ return lttng_live_query_list_sessions(comp_class,
+ params);
+ }
+ BT_LOGW("Unknown query object `%s`", object);
+ return NULL;
+}
+
+static
+void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
+{
+ int ret;
+ struct lttng_live_session *session, *s;
+
+ bt_list_for_each_entry_safe(session, s, <tng_live->sessions, node) {
+ lttng_live_destroy_session(session);
+ }
+ BT_PUT(lttng_live->viewer_connection);
+ if (lttng_live->url) {
+ g_string_free(lttng_live->url, TRUE);
+ }
+ if (lttng_live->no_stream_port) {
+ bt_get(lttng_live->no_stream_port);
+ ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
+ bt_put(lttng_live->no_stream_port);
+ assert(!ret);
+ }
+ if (lttng_live->no_stream_iter) {
+ g_free(lttng_live->no_stream_iter);
+ }
+ g_free(lttng_live);
+}
+
+BT_HIDDEN
+void lttng_live_component_finalize(struct bt_private_component *component)
+{
+ void *data = bt_private_component_get_user_data(component);
+
+ if (!data) {
+ return;
+ }
+ lttng_live_component_destroy_data(data);
+}
+
+static
+struct lttng_live_component *lttng_live_component_create(struct bt_value *params,
+ struct bt_private_component *private_component)
+{
+ struct lttng_live_component *lttng_live;
+ struct bt_value *value = NULL;
+ const char *url;
+ enum bt_value_status ret;
+
+ lttng_live = g_new0(struct lttng_live_component, 1);
+ if (!lttng_live) {
+ goto end;
+ }
+ /* TODO: make this an overridable parameter. */
+ lttng_live->max_query_size = MAX_QUERY_SIZE;
+ BT_INIT_LIST_HEAD(<tng_live->sessions);
+ value = bt_value_map_get(params, "url");
+ if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) {
+ BT_LOGW("Mandatory \"url\" parameter missing");
+ goto error;
+ }
+ ret = bt_value_string_get(value, &url);
+ if (ret != BT_VALUE_STATUS_OK) {
+ BT_LOGW("\"url\" parameter is required to be a string value");
+ goto error;
+ }
+ lttng_live->url = g_string_new(url);
+ if (!lttng_live->url) {
+ goto error;
+ }
+ BT_PUT(value);
+ lttng_live->viewer_connection =
+ bt_live_viewer_connection_create(lttng_live->url->str, lttng_live);
+ if (!lttng_live->viewer_connection) {
+ goto error;
+ }
+ if (lttng_live_create_viewer_session(lttng_live)) {
+ goto error;
+ }
+ lttng_live->private_component = private_component;
+
+ goto end;
+
+error:
+ lttng_live_component_destroy_data(lttng_live);
+ lttng_live = NULL;
+end:
+ return lttng_live;
+}
+
+BT_HIDDEN
+enum bt_component_status lttng_live_component_init(
+ struct bt_private_component *private_component,
+ struct bt_value *params, void *init_method_data)
+{
+ struct lttng_live_component *lttng_live;
+ enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
+
+ /* Passes ownership of iter ref to lttng_live_component_create. */
+ lttng_live = lttng_live_component_create(params, private_component);
+ if (!lttng_live) {
+ //TODO : we need access to the application cancel state
+ //because we are not part of a graph yet.
+ ret = BT_COMPONENT_STATUS_NOMEM;
+ goto end;
+ }
+
+ lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1);
+ lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM;
+ lttng_live->no_stream_iter->lttng_live = lttng_live;
+ if (lttng_live_is_canceled(lttng_live)) {
+ goto end;
+ }
+ ret = bt_private_component_source_add_output_private_port(
+ lttng_live->private_component, "no-stream",
+ lttng_live->no_stream_iter,
+ <tng_live->no_stream_port);
+ if (ret != BT_COMPONENT_STATUS_OK) {
+ goto end;
+ }
+ bt_put(lttng_live->no_stream_port); /* weak */
+ lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
+
+ ret = bt_private_component_set_user_data(private_component, lttng_live);
+ if (ret != BT_COMPONENT_STATUS_OK) {
+ goto error;
+ }
+
+end:
+ return ret;
+error:
+ (void) bt_private_component_set_user_data(private_component, NULL);
+ lttng_live_component_destroy_data(lttng_live);
+ return ret;
+}
+
+BT_HIDDEN
+enum bt_component_status lttng_live_accept_port_connection(
+ struct bt_private_component *private_component,
+ struct bt_private_port *self_private_port,
+ struct bt_port *other_port)
+{
+ struct lttng_live_component *lttng_live =
+ bt_private_component_get_user_data(private_component);
+ struct bt_component *other_component;
+ enum bt_component_status status = BT_COMPONENT_STATUS_OK;
+ struct bt_port *self_port = bt_port_from_private_port(self_private_port);
+
+ other_component = bt_port_get_component(other_port);
+ bt_put(other_component); /* weak */
+
+ if (!lttng_live->downstream_component) {
+ lttng_live->downstream_component = other_component;
+ goto end;
+ }
+
+ /*
+ * Compare prior component to ensure we are connected to the
+ * same downstream component as prior ports.
+ */
+ if (lttng_live->downstream_component != other_component) {
+ BT_LOGW("Cannot connect ctf.lttng-live component port \"%s\" to component \"%s\": already connected to component \"%s\".",
+ bt_port_get_name(self_port),
+ bt_component_get_name(other_component),
+ bt_component_get_name(lttng_live->downstream_component));
+ status = BT_COMPONENT_STATUS_REFUSE_PORT_CONNECTION;
+ goto end;
+ }
+end:
+ bt_put(self_port);
+ return status;
}