Implement logging in lttng-live component
[babeltrace.git] / plugins / ctf / lttng-live / lttng-live.c
index 5cef2aa99f1936b54781bcb7b442272b723fc64c..74d1ac3762e70477e932f8bdc6db55b7c4004cef 100644 (file)
@@ -4,6 +4,7 @@
  * Babeltrace CTF LTTng-live Client Component
  *
  * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
  *
  * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
  * SOFTWARE.
  */
 
+#include <babeltrace/ctf-ir/packet.h>
+#include <babeltrace/graph/component-source.h>
+#include <babeltrace/graph/private-port.h>
+#include <babeltrace/graph/port.h>
+#include <babeltrace/graph/private-component.h>
+#include <babeltrace/graph/private-component-source.h>
+#include <babeltrace/graph/private-notification-iterator.h>
+#include <babeltrace/graph/notification-stream.h>
+#include <babeltrace/graph/notification-packet.h>
+#include <babeltrace/graph/notification-event.h>
+#include <babeltrace/graph/notification-heap.h>
+#include <babeltrace/graph/notification-iterator.h>
+#include <babeltrace/graph/notification-inactivity.h>
+#include <babeltrace/compiler-internal.h>
+#include <inttypes.h>
+#include <glib.h>
+#include <assert.h>
+#include <unistd.h>
+#include <plugins-common.h>
+
+#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE"
+
 #include "lttng-live-internal.h"
-#include <babeltrace/component/source.h>
+#include "data-stream.h"
+#include "metadata.h"
+
+#define MAX_QUERY_SIZE         (256*1024)
+
+#define print_dbg(fmt, ...)    BT_LOGD(fmt, ## __VA_ARGS__)
+
+static const char *print_state(struct lttng_live_stream_iterator *s)
+{
+       switch (s->state) {
+       case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
+               return "ACTIVE_NO_DATA";
+       case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
+               return "QUIESCENT_NO_DATA";
+       case LTTNG_LIVE_STREAM_QUIESCENT:
+               return "QUIESCENT";
+       case LTTNG_LIVE_STREAM_ACTIVE_DATA:
+               return "ACTIVE_DATA";
+       case LTTNG_LIVE_STREAM_EOF:
+               return "EOF";
+       default:
+               return "ERROR";
+       }
+}
+
+#define print_stream_state(stream)     \
+       print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64, \
+                       bt_port_get_name(bt_port_from_private_port(stream->port)),      \
+                       print_state(stream), stream->last_returned_inactivity_timestamp,        \
+                       stream->current_inactivity_timestamp)
+
+BT_HIDDEN
+int bt_lttng_live_log_level = BT_LOG_NONE;
+
+BT_HIDDEN
+int lttng_live_add_port(struct lttng_live_component *lttng_live,
+               struct lttng_live_stream_iterator *stream_iter)
+{
+       int ret;
+       struct bt_private_port *private_port;
+       char name[STREAM_NAME_MAX_LEN];
+
+       ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id);
+       assert(ret > 0);
+       strcpy(stream_iter->name, name);
+       private_port = bt_private_component_source_add_output_private_port(
+                       lttng_live->private_component, name, stream_iter);
+       if (!private_port) {
+               return -1;
+       }
+       BT_LOGI("Added port %s", name);
+
+       if (lttng_live->no_stream_port) {
+               ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
+               if (ret) {
+                       return -1;
+               }
+               BT_PUT(lttng_live->no_stream_port);
+               lttng_live->no_stream_iter->port = NULL;
+       }
+       stream_iter->port = private_port;
+       return 0;
+}
+
+BT_HIDDEN
+int lttng_live_remove_port(struct lttng_live_component *lttng_live,
+               struct bt_private_port *port)
+{
+       struct bt_component *component;
+       int64_t nr_ports;
+       int ret;
+
+       component = bt_component_from_private_component(lttng_live->private_component);
+       nr_ports = bt_component_source_get_output_port_count(component);
+       if (nr_ports < 0) {
+               return -1;
+       }
+       BT_PUT(component);
+       if (nr_ports == 1) {
+               assert(!lttng_live->no_stream_port);
+               lttng_live->no_stream_port =
+                       bt_private_component_source_add_output_private_port(lttng_live->private_component,
+                               "no-stream", lttng_live->no_stream_iter);
+               if (!lttng_live->no_stream_port) {
+                       return -1;
+               }
+               lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
+       }
+       ret = bt_private_port_remove_from_component(port);
+       if (ret) {
+               return -1;
+       }
+       return 0;
+}
+
+static
+struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
+               uint64_t trace_id)
+{
+       struct lttng_live_trace *trace;
+
+       bt_list_for_each_entry(trace, &session->traces, node) {
+               if (trace->id == trace_id) {
+                       return trace;
+               }
+       }
+       return NULL;
+}
+
+static
+void lttng_live_destroy_trace(struct bt_object *obj)
+{
+       struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj);
+
+       BT_LOGI("Destroy trace");
+       assert(bt_list_empty(&trace->streams));
+       bt_list_del(&trace->node);
+       lttng_live_metadata_fini(trace);
+       BT_PUT(trace->cc_prio_map);
+       g_free(trace);
+}
+
+static
+struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
+               uint64_t trace_id)
+{
+       struct lttng_live_trace *trace = NULL;
+
+       trace = g_new0(struct lttng_live_trace, 1);
+       if (!trace) {
+               goto error;
+       }
+       trace->session = session;
+       trace->id = trace_id;
+       BT_INIT_LIST_HEAD(&trace->streams);
+       trace->new_metadata_needed = true;
+       bt_list_add(&trace->node, &session->traces);
+       bt_object_init(&trace->obj, lttng_live_destroy_trace);
+       BT_LOGI("Create trace");
+       goto end;
+error:
+       g_free(trace);
+       trace = NULL;
+end:
+       return trace;
+}
+
+BT_HIDDEN
+struct lttng_live_trace *lttng_live_ref_trace(struct lttng_live_session *session,
+               uint64_t trace_id)
+{
+       struct lttng_live_trace *trace;
+
+       trace = lttng_live_find_trace(session, trace_id);
+       if (trace) {
+               bt_get(trace);
+               return trace;
+       }
+       return lttng_live_create_trace(session, trace_id);
+}
+
+BT_HIDDEN
+void lttng_live_unref_trace(struct lttng_live_trace *trace)
+{
+       bt_put(trace);
+}
+
+static
+void lttng_live_close_trace_streams(struct lttng_live_trace *trace)
+{
+       struct lttng_live_stream_iterator *stream, *s;
+
+       bt_list_for_each_entry_safe(stream, s, &trace->streams, node) {
+               lttng_live_stream_iterator_destroy(stream);
+       }
+       lttng_live_metadata_fini(trace);
+}
+
+BT_HIDDEN
+int lttng_live_add_session(struct lttng_live_component *lttng_live, uint64_t session_id)
+{
+       int ret = 0;
+       struct lttng_live_session *s;
+
+       s = g_new0(struct lttng_live_session, 1);
+       if (!s) {
+               goto error;
+       }
+
+       s->id = session_id;
+       BT_INIT_LIST_HEAD(&s->traces);
+       s->lttng_live = lttng_live;
+       s->new_streams_needed = true;
+
+       BT_LOGI("Reading from session %" PRIu64, s->id);
+       bt_list_add(&s->node, &lttng_live->sessions);
+       goto end;
+error:
+       BT_LOGE("Error adding session");
+       g_free(s);
+       ret = -1;
+end:
+       return ret;
+}
+
+static
+void lttng_live_destroy_session(struct lttng_live_session *session)
+{
+       struct lttng_live_trace *trace, *t;
+
+       BT_LOGI("Destroy session");
+       if (session->id != -1ULL) {
+               if (lttng_live_detach_session(session)) {
+                       /* Old relayd cannot detach sessions. */
+                       BT_LOGD("Unable to detach session %" PRIu64,
+                               session->id);
+               }
+               session->id = -1ULL;
+       }
+       bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
+               lttng_live_close_trace_streams(trace);
+       }
+       bt_list_del(&session->node);
+       g_free(session);
+}
 
 BT_HIDDEN
-enum bt_component_status lttng_live_init(struct bt_component *component,
+void lttng_live_iterator_finalize(struct bt_private_notification_iterator *it)
+{
+       struct lttng_live_stream_iterator_generic *s =
+                       bt_private_notification_iterator_get_user_data(it);
+
+       switch (s->type) {
+       case LIVE_STREAM_TYPE_NO_STREAM:
+       {
+               /* Leave no_stream_iter in place when port is removed. */
+               break;
+       }
+       case LIVE_STREAM_TYPE_STREAM:
+       {
+               struct lttng_live_stream_iterator *stream_iter =
+                       container_of(s, struct lttng_live_stream_iterator, p);
+
+               lttng_live_stream_iterator_destroy(stream_iter);
+               break;
+       }
+       }
+}
+
+static
+enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
+               struct lttng_live_component *lttng_live,
+               struct lttng_live_stream_iterator *lttng_live_stream)
+{
+       switch (lttng_live_stream->state) {
+       case LTTNG_LIVE_STREAM_QUIESCENT:
+       case LTTNG_LIVE_STREAM_ACTIVE_DATA:
+               break;
+       case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
+               /* Invalid state. */
+               BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
+               abort();
+       case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
+               /* Invalid state. */
+               BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
+               abort();
+       case LTTNG_LIVE_STREAM_EOF:
+               break;
+       }
+       return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+}
+
+/*
+ * For active no data stream, fetch next data. It can be either:
+ * - quiescent: need to put it in the prio heap at quiescent end
+ *   timestamp,
+ * - have data: need to wire up first event into the prio heap,
+ * - have no data on this stream at this point: need to retry (AGAIN) or
+ *   return EOF.
+ */
+static
+enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
+               struct lttng_live_component *lttng_live,
+               struct lttng_live_stream_iterator *lttng_live_stream)
+{
+       enum bt_ctf_lttng_live_iterator_status ret =
+                       BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+       struct packet_index index;
+       enum lttng_live_stream_state orig_state = lttng_live_stream->state;
+
+       if (lttng_live_stream->trace->new_metadata_needed) {
+               ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+               goto end;
+       }
+       if (lttng_live_stream->trace->session->new_streams_needed) {
+               ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+               goto end;
+       }
+       if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
+                       && lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
+               goto end;
+       }
+       ret = lttng_live_get_next_index(lttng_live, lttng_live_stream, &index);
+       if (ret != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+               goto end;
+       }
+       assert(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
+       if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
+               if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
+                               && lttng_live_stream->last_returned_inactivity_timestamp ==
+                                       lttng_live_stream->current_inactivity_timestamp) {
+                       ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+                       print_stream_state(lttng_live_stream);
+               } else {
+                       ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+               }
+               goto end;
+       }
+       lttng_live_stream->base_offset = index.offset;
+       lttng_live_stream->offset = index.offset;
+       lttng_live_stream->len = index.packet_size / CHAR_BIT;
+end:
+       if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+               ret = lttng_live_iterator_next_check_stream_state(
+                               lttng_live, lttng_live_stream);
+       }
+       return ret;
+}
+
+/*
+ * Creation of the notification requires the ctf trace to be created
+ * beforehand, but the live protocol gives us all streams (including
+ * metadata) at once. So we split it in three steps: getting streams,
+ * getting metadata (which creates the ctf trace), and then creating the
+ * per-stream notifications.
+ */
+static
+enum bt_ctf_lttng_live_iterator_status lttng_live_get_session(
+               struct lttng_live_component *lttng_live,
+               struct lttng_live_session *session)
+{
+       enum bt_ctf_lttng_live_iterator_status status;
+       struct lttng_live_trace *trace, *t;
+
+       if (lttng_live_attach_session(session)) {
+               return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+       }
+       status = lttng_live_get_new_streams(session);
+       if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
+                       status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
+               return status;
+       }
+       bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
+               status = lttng_live_metadata_update(trace);
+               if (status == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
+                       int retval;
+
+                       retval = bt_ctf_trace_set_is_static(trace->trace);
+                       assert(!retval);
+               } else if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+                       return status;
+               }
+       }
+       return lttng_live_lazy_notif_init(session);
+}
+
+BT_HIDDEN
+void lttng_live_need_new_streams(struct lttng_live_component *lttng_live)
+{
+       struct lttng_live_session *session;
+
+       bt_list_for_each_entry(session, &lttng_live->sessions, node) {
+               session->new_streams_needed = true;
+       }
+}
+
+static
+void lttng_live_force_new_streams_and_metadata(struct lttng_live_component *lttng_live)
+{
+       struct lttng_live_session *session;
+
+       bt_list_for_each_entry(session, &lttng_live->sessions, node) {
+               struct lttng_live_trace *trace;
+
+               session->new_streams_needed = true;
+               bt_list_for_each_entry(trace, &session->traces, node) {
+                       trace->new_metadata_needed = true;
+               }
+       }
+}
+
+static
+enum bt_notification_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
+               struct lttng_live_component *lttng_live)
+{
+       enum bt_ctf_lttng_live_iterator_status ret =
+                       BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+       unsigned int nr_sessions_opened = 0;
+       struct lttng_live_session *session, *s;
+
+       bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
+               if (session->closed && bt_list_empty(&session->traces)) {
+                       lttng_live_destroy_session(session);
+               }
+       }
+       /*
+        * Currently, when there are no sessions, we quit immediately.
+        * We may want to add a component parameter to keep trying until
+        * we get data in the future.
+        * Also, in a remotely distant future, we could add a "new
+        * session" flag to the protocol, which would tell us that we
+        * need to query for new sessions even though we have sessions
+        * currently ongoing.
+        */
+       if (bt_list_empty(&lttng_live->sessions)) {
+               ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
+               goto end;
+       }
+       bt_list_for_each_entry(session, &lttng_live->sessions, node) {
+               ret = lttng_live_get_session(lttng_live, session);
+               switch (ret) {
+               case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
+                       break;
+               case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
+                       ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+                       break;
+               default:
+                       goto end;
+               }
+               if (!session->closed) {
+                       nr_sessions_opened++;
+               }
+       }
+end:
+       if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) {
+               ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
+       }
+       return ret;
+}
+
+static
+enum bt_ctf_lttng_live_iterator_status emit_inactivity_notification(
+               struct lttng_live_component *lttng_live,
+               struct lttng_live_stream_iterator *lttng_live_stream,
+               struct bt_notification **notification,
+               uint64_t timestamp)
+{
+       enum bt_ctf_lttng_live_iterator_status ret =
+                       BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+       struct lttng_live_trace *trace;
+       struct bt_ctf_clock_class *clock_class = NULL;
+       struct bt_ctf_clock_value *clock_value = NULL;
+       struct bt_notification *notif = NULL;
+       int retval;
+
+       trace = lttng_live_stream->trace;
+       if (!trace) {
+               goto error;
+       }
+       clock_class = bt_clock_class_priority_map_get_clock_class_by_index(trace->cc_prio_map, 0);
+       if (!clock_class) {
+               goto error;
+       }
+       clock_value = bt_ctf_clock_value_create(clock_class, timestamp);
+       if (!clock_value) {
+               goto error;
+       }
+       notif = bt_notification_inactivity_create(trace->cc_prio_map);
+       if (!notif) {
+               goto error;
+       }
+       retval = bt_notification_inactivity_set_clock_value(notif, clock_value);
+       if (retval) {
+               goto error;
+       }
+       *notification = notif;
+end:
+       bt_put(clock_value);
+       bt_put(clock_class);
+       return ret;
+
+error:
+       ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+       bt_put(notif);
+       goto end;
+}
+
+static
+enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
+               struct lttng_live_component *lttng_live,
+               struct lttng_live_stream_iterator *lttng_live_stream,
+               struct bt_notification **notification)
+{
+       enum bt_ctf_lttng_live_iterator_status ret =
+                       BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+       struct bt_ctf_clock_class *clock_class = NULL;
+       struct bt_ctf_clock_value *clock_value = NULL;
+
+       if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
+               return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+       }
+
+       if (lttng_live_stream->current_inactivity_timestamp ==
+                       lttng_live_stream->last_returned_inactivity_timestamp) {
+               lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
+               ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+               goto end;
+       }
+
+       ret = emit_inactivity_notification(lttng_live, lttng_live_stream, notification,
+                       (uint64_t) lttng_live_stream->current_inactivity_timestamp);
+
+       lttng_live_stream->last_returned_inactivity_timestamp =
+                       lttng_live_stream->current_inactivity_timestamp;
+end:
+       bt_put(clock_value);
+       bt_put(clock_class);
+       return ret;
+}
+
+static
+enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
+               struct lttng_live_component *lttng_live,
+               struct lttng_live_stream_iterator *lttng_live_stream,
+               struct bt_notification **notification)
+{
+       enum bt_ctf_lttng_live_iterator_status ret =
+                       BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+       enum bt_ctf_notif_iter_status status;
+       struct lttng_live_session *session;
+
+       bt_list_for_each_entry(session, &lttng_live->sessions, node) {
+               struct lttng_live_trace *trace;
+
+               if (session->new_streams_needed) {
+                       return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+               }
+               bt_list_for_each_entry(trace, &session->traces, node) {
+                       if (trace->new_metadata_needed) {
+                               return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+                       }
+               }
+       }
+
+       if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
+               return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+       }
+       if (lttng_live_stream->packet_end_notif_queue) {
+               *notification = lttng_live_stream->packet_end_notif_queue;
+               lttng_live_stream->packet_end_notif_queue = NULL;
+               status = BT_CTF_NOTIF_ITER_STATUS_OK;
+       } else {
+               status = bt_ctf_notif_iter_get_next_notification(
+                               lttng_live_stream->notif_iter,
+                               lttng_live_stream->trace->cc_prio_map,
+                               notification);
+               if (status == BT_CTF_NOTIF_ITER_STATUS_OK) {
+                       /*
+                        * Consider empty packets as inactivity.
+                        */
+                       if (bt_notification_get_type(*notification) == BT_NOTIFICATION_TYPE_PACKET_END) {
+                               lttng_live_stream->packet_end_notif_queue = *notification;
+                               *notification = NULL;
+                               return emit_inactivity_notification(lttng_live,
+                                               lttng_live_stream, notification,
+                                               lttng_live_stream->current_packet_end_timestamp);
+                       }
+               }
+       }
+       switch (status) {
+       case BT_CTF_NOTIF_ITER_STATUS_EOF:
+               ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
+               break;
+       case BT_CTF_NOTIF_ITER_STATUS_OK:
+               ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+               break;
+       case BT_CTF_NOTIF_ITER_STATUS_AGAIN:
+               /*
+                * Continue immediately (end of packet). The next
+                * get_index may return AGAIN to delay the following
+                * attempt.
+                */
+               ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+               break;
+       case BT_CTF_NOTIF_ITER_STATUS_INVAL:
+               /* No argument provided by the user, so don't return INVAL. */
+       case BT_CTF_NOTIF_ITER_STATUS_ERROR:
+       default:
+               ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+               break;
+       }
+       return ret;
+}
+
+/*
+ * helper function:
+ *            handle_no_data_streams()
+ *              retry:
+ *                - for each ACTIVE_NO_DATA stream:
+ *                  - query relayd for stream data, or quiescence info.
+ *                    - if need metadata, get metadata, goto retry.
+ *                    - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
+ *                  - if quiescent, move to QUIESCENT streams
+ *                  - if fetched data, move to ACTIVE_DATA streams
+ *                (at this point each stream either has data, or is quiescent)
+ *
+ *
+ * iterator_next:
+ *            handle_new_streams_and_metadata()
+ *                  - query relayd for known streams, add them as ACTIVE_NO_DATA
+ *                  - query relayd for metadata
+ *
+ *            call handle_active_no_data_streams()
+ *
+ *            handle_quiescent_streams()
+ *                - if at least one stream is ACTIVE_DATA:
+ *                  - peek stream event with lowest timestamp -> next_ts
+ *                  - for each quiescent stream
+ *                    - if next_ts >= quiescent end
+ *                      - set state to ACTIVE_NO_DATA
+ *                - else
+ *                  - for each quiescent stream
+ *                      - set state to ACTIVE_NO_DATA
+ *
+ *            call handle_active_no_data_streams()
+ *
+ *            handle_active_data_streams()
+ *                - if at least one stream is ACTIVE_DATA:
+ *                    - get stream event with lowest timestamp from heap
+ *                    - make that stream event the current notification.
+ *                    - move this stream heap position to its next event
+ *                      - if we need to fetch data from relayd, move
+ *                        stream to ACTIVE_NO_DATA.
+ *                    - return OK
+ *                - return AGAIN
+ *
+ * end criterion: ctrl-c on client. If relayd exits or the session
+ * closes on the relay daemon side, we keep on waiting for streams.
+ * Eventually handle --end timestamp (also an end criterion).
+ *
+ * When disconnected from relayd: try to re-connect endlessly.
+ */
+static
+struct bt_notification_iterator_next_return lttng_live_iterator_next_stream(
+               struct bt_private_notification_iterator *iterator,
+               struct lttng_live_stream_iterator *stream_iter)
+{
+       enum bt_ctf_lttng_live_iterator_status status;
+       struct bt_notification_iterator_next_return next_return;
+       struct lttng_live_component *lttng_live;
+
+       lttng_live = stream_iter->trace->session->lttng_live;
+retry:
+       print_stream_state(stream_iter);
+       next_return.notification = NULL;
+       status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
+       if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+               goto end;
+       }
+       status = lttng_live_iterator_next_handle_one_no_data_stream(
+                       lttng_live, stream_iter);
+       if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+               goto end;
+       }
+       status = lttng_live_iterator_next_handle_one_quiescent_stream(
+                       lttng_live, stream_iter, &next_return.notification);
+       if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+               assert(next_return.notification == NULL);
+               goto end;
+       }
+       if (next_return.notification) {
+               goto end;
+       }
+       status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live,
+                       stream_iter, &next_return.notification);
+       if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+               assert(next_return.notification == NULL);
+       }
+
+end:
+       switch (status) {
+       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
+               print_dbg("continue");
+               goto retry;
+       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
+               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
+               print_dbg("again");
+               break;
+       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
+               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
+               print_dbg("end");
+               break;
+       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
+               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
+               print_dbg("ok");
+               break;
+       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
+               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
+               break;
+       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
+               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
+               break;
+       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
+               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
+               break;
+       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
+       default:        /* fall-through */
+               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+               break;
+       }
+       return next_return;
+}
+
+static
+struct bt_notification_iterator_next_return lttng_live_iterator_next_no_stream(
+               struct bt_private_notification_iterator *iterator,
+               struct lttng_live_no_stream_iterator *no_stream_iter)
+{
+       enum bt_ctf_lttng_live_iterator_status status;
+       struct bt_notification_iterator_next_return next_return;
+       struct lttng_live_component *lttng_live;
+
+       lttng_live = no_stream_iter->lttng_live;
+retry:
+       lttng_live_force_new_streams_and_metadata(lttng_live);
+       status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
+       if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+               goto end;
+       }
+       if (no_stream_iter->port) {
+               status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+       } else {
+               status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
+       }
+end:
+       switch (status) {
+       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
+               goto retry;
+       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
+               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
+               break;
+       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
+               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
+               break;
+       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
+               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
+               break;
+       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
+               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
+               break;
+       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
+               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
+               break;
+       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
+               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
+               break;
+       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
+       default:        /* fall-through */
+               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+               break;
+       }
+       return next_return;
+}
+
+BT_HIDDEN
+struct bt_notification_iterator_next_return lttng_live_iterator_next(
+               struct bt_private_notification_iterator *iterator)
+{
+       struct lttng_live_stream_iterator_generic *s =
+                       bt_private_notification_iterator_get_user_data(iterator);
+       struct bt_notification_iterator_next_return next_return;
+
+       switch (s->type) {
+       case LIVE_STREAM_TYPE_NO_STREAM:
+               next_return = lttng_live_iterator_next_no_stream(iterator,
+                       container_of(s, struct lttng_live_no_stream_iterator, p));
+               break;
+       case LIVE_STREAM_TYPE_STREAM:
+               next_return = lttng_live_iterator_next_stream(iterator,
+                       container_of(s, struct lttng_live_stream_iterator, p));
+               break;
+       default:
+               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+               break;
+       }
+       return next_return;
+}
+
+BT_HIDDEN
+enum bt_notification_iterator_status lttng_live_iterator_init(
+               struct bt_private_notification_iterator *it,
+               struct bt_private_port *port)
+{
+       enum bt_notification_iterator_status ret =
+                       BT_NOTIFICATION_ITERATOR_STATUS_OK;
+       struct lttng_live_stream_iterator_generic *s;
+
+       assert(it);
+
+       s = bt_private_port_get_user_data(port);
+       assert(s);
+       switch (s->type) {
+       case LIVE_STREAM_TYPE_NO_STREAM:
+       {
+               struct lttng_live_no_stream_iterator *no_stream_iter =
+                       container_of(s, struct lttng_live_no_stream_iterator, p);
+               ret = bt_private_notification_iterator_set_user_data(it, no_stream_iter);
+               if (ret) {
+                       goto error;
+               }
+               break;
+       }
+       case LIVE_STREAM_TYPE_STREAM:
+       {
+               struct lttng_live_stream_iterator *stream_iter =
+                       container_of(s, struct lttng_live_stream_iterator, p);
+               ret = bt_private_notification_iterator_set_user_data(it, stream_iter);
+               if (ret) {
+                       goto error;
+               }
+               break;
+       }
+       default:
+               ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+               goto end;
+       }
+
+end:
+       return ret;
+error:
+       if (bt_private_notification_iterator_set_user_data(it, NULL)
+                       != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
+               BT_LOGE("Error setting private data to NULL");
+       }
+       goto end;
+}
+
+static
+struct bt_value *lttng_live_query_list_sessions(struct bt_component_class *comp_class,
                struct bt_value *params)
 {
-       return BT_COMPONENT_STATUS_OK;
+       struct bt_value *url_value = NULL;
+       struct bt_value *results = NULL;
+       const char *url;
+       struct bt_live_viewer_connection *viewer_connection = NULL;
+       enum bt_value_status ret;
+
+       url_value = bt_value_map_get(params, "url");
+       if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) {
+               BT_LOGW("Mandatory \"url\" parameter missing");
+               goto error;
+       }
+
+       ret = bt_value_string_get(url_value, &url);
+       if (ret != BT_VALUE_STATUS_OK) {
+               BT_LOGW("\"url\" parameter is required to be a string value");
+               goto error;
+       }
+
+       viewer_connection = bt_live_viewer_connection_create(url, stderr);
+       if (!viewer_connection) {
+               ret = BT_COMPONENT_STATUS_NOMEM;
+               goto error;
+       }
+
+       results = bt_live_viewer_connection_list_sessions(viewer_connection);
+       goto end;
+error:
+       BT_PUT(results);
+end:
+       if (viewer_connection) {
+               bt_live_viewer_connection_destroy(viewer_connection);
+       }
+       BT_PUT(url_value);
+       return results;
+}
+
+BT_HIDDEN
+struct bt_value *lttng_live_query(struct bt_component_class *comp_class,
+               const char *object, struct bt_value *params)
+{
+       if (strcmp(object, "sessions") == 0) {
+               return lttng_live_query_list_sessions(comp_class,
+                       params);
+       }
+       BT_LOGW("Unknown query object `%s`", object);
+       return NULL;
+}
+
+static
+void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
+{
+       int ret;
+       struct lttng_live_session *session, *s;
+
+       bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
+               lttng_live_destroy_session(session);
+       }
+       BT_PUT(lttng_live->viewer_connection);
+       if (lttng_live->url) {
+               g_string_free(lttng_live->url, TRUE);
+       }
+       if (lttng_live->no_stream_port) {
+               ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
+               assert(!ret);
+               BT_PUT(lttng_live->no_stream_port);
+       }
+       if (lttng_live->no_stream_iter) {
+               g_free(lttng_live->no_stream_iter);
+       }
+       g_free(lttng_live);
+}
+
+BT_HIDDEN
+void lttng_live_component_finalize(struct bt_private_component *component)
+{
+       void *data = bt_private_component_get_user_data(component);
+
+       if (!data) {
+               return;
+       }
+       lttng_live_component_destroy_data(data);
+}
+
+static
+struct lttng_live_component *lttng_live_component_create(struct bt_value *params,
+               struct bt_private_component *private_component)
+{
+       struct lttng_live_component *lttng_live;
+       struct bt_value *value = NULL;
+       const char *url;
+       enum bt_value_status ret;
+
+       lttng_live = g_new0(struct lttng_live_component, 1);
+       if (!lttng_live) {
+               goto end;
+       }
+       /* TODO: make this an overridable parameter. */
+       lttng_live->max_query_size = MAX_QUERY_SIZE;
+       BT_INIT_LIST_HEAD(&lttng_live->sessions);
+       value = bt_value_map_get(params, "url");
+       if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) {
+               BT_LOGW("Mandatory \"url\" parameter missing");
+               goto error;
+       }
+       ret = bt_value_string_get(value, &url);
+       if (ret != BT_VALUE_STATUS_OK) {
+               BT_LOGW("\"url\" parameter is required to be a string value");
+               goto error;
+       }
+       lttng_live->url = g_string_new(url);
+       if (!lttng_live->url) {
+               goto error;
+       }
+       lttng_live->viewer_connection =
+               bt_live_viewer_connection_create(lttng_live->url->str,
+                       stderr);
+       if (!lttng_live->viewer_connection) {
+               ret = BT_COMPONENT_STATUS_NOMEM;
+               goto error;
+       }
+       if (lttng_live_create_viewer_session(lttng_live)) {
+               ret = BT_COMPONENT_STATUS_ERROR;
+               goto error;
+       }
+       lttng_live->private_component = private_component;
+
+       goto end;
+
+error:
+       lttng_live_component_destroy_data(lttng_live);
+       lttng_live = NULL;
+end:
+       return lttng_live;
+}
+
+BT_HIDDEN
+enum bt_component_status lttng_live_component_init(struct bt_private_component *component,
+               struct bt_value *params, void *init_method_data)
+{
+       struct lttng_live_component *lttng_live;
+       enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
+
+       /* Passes ownership of iter ref to lttng_live_component_create. */
+       lttng_live = lttng_live_component_create(params, component);
+       if (!lttng_live) {
+               ret = BT_COMPONENT_STATUS_NOMEM;
+               goto end;
+       }
+
+       lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1);
+       lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM;
+       lttng_live->no_stream_iter->lttng_live = lttng_live;
+
+       lttng_live->no_stream_port =
+               bt_private_component_source_add_output_private_port(
+                               lttng_live->private_component, "no-stream",
+                               lttng_live->no_stream_iter);
+       lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
+
+       ret = bt_private_component_set_user_data(component, lttng_live);
+       if (ret != BT_COMPONENT_STATUS_OK) {
+               goto error;
+       }
+
+end:
+       return ret;
+error:
+       (void) bt_private_component_set_user_data(component, NULL);
+       lttng_live_component_destroy_data(lttng_live);
+       return ret;
+}
+
+static
+void __attribute__((constructor)) bt_lttng_live_logging_ctor(void)
+{
+       enum bt_logging_level log_level = BT_LOG_NONE;
+       const char *log_level_env = getenv("BABELTRACE_PLUGIN_LTTNG_LIVE_LOG_LEVEL");
+
+       if (!log_level_env) {
+               return;
+       }
+
+       if (strcmp(log_level_env, "VERBOSE") == 0) {
+               log_level = BT_LOGGING_LEVEL_VERBOSE;
+       } else if (strcmp(log_level_env, "DEBUG") == 0) {
+               log_level = BT_LOGGING_LEVEL_DEBUG;
+       } else if (strcmp(log_level_env, "INFO") == 0) {
+               log_level = BT_LOGGING_LEVEL_INFO;
+       } else if (strcmp(log_level_env, "WARN") == 0) {
+               log_level = BT_LOGGING_LEVEL_WARN;
+       } else if (strcmp(log_level_env, "ERROR") == 0) {
+               log_level = BT_LOGGING_LEVEL_ERROR;
+       } else if (strcmp(log_level_env, "FATAL") == 0) {
+               log_level = BT_LOGGING_LEVEL_FATAL;
+       }
+
+        bt_lttng_live_log_level = log_level;
 }
This page took 0.038402 seconds and 4 git commands to generate.