lib: rename "clock value" -> "clock snapshot"
[babeltrace.git] / plugins / ctf / lttng-live / lttng-live.c
index 9cc2f87938d61e0cc576dda0f79cab8ad9f3050c..f703630411467901a4b544855a387d9d942589af 100644 (file)
  * SOFTWARE.
  */
 
-#include <babeltrace/ctf-ir/packet.h>
-#include <babeltrace/graph/component-source.h>
-#include <babeltrace/graph/private-port.h>
-#include <babeltrace/graph/port.h>
-#include <babeltrace/graph/private-component.h>
-#include <babeltrace/graph/private-component-source.h>
-#include <babeltrace/graph/private-notification-iterator.h>
-#include <babeltrace/graph/notification-stream.h>
-#include <babeltrace/graph/notification-packet.h>
-#include <babeltrace/graph/notification-event.h>
-#include <babeltrace/graph/notification-heap.h>
-#include <babeltrace/graph/notification-iterator.h>
-#include <babeltrace/graph/notification-inactivity.h>
+#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC"
+#include "logging.h"
+
+#include <babeltrace/babeltrace.h>
 #include <babeltrace/compiler-internal.h>
+#include <babeltrace/types.h>
 #include <inttypes.h>
 #include <glib.h>
-#include <assert.h>
+#include <babeltrace/assert-internal.h>
 #include <unistd.h>
 #include <plugins-common.h>
 
-#include "lttng-live-internal.h"
 #include "data-stream.h"
 #include "metadata.h"
+#include "lttng-live-internal.h"
 
-#define PRINT_ERR_STREAM       (lttng_live->error_fp)
-#define PRINT_PREFIX           "lttng-live"
-#define PRINT_DBG_CHECK                lttng_live_debug
 #define MAX_QUERY_SIZE         (256*1024)
-#include "../print.h"
 
-#ifdef LIVE_DEBUG
-#define print_dbg(fmt, args...)        \
-       fprintf(stderr, "%s() at " __FILE__ ":%d " fmt "\n",    \
-               __func__, __LINE__, ## args)
+#define print_dbg(fmt, ...)    BT_LOGD(fmt, ## __VA_ARGS__)
 
 static const char *print_state(struct lttng_live_stream_iterator *s)
 {
@@ -79,18 +64,39 @@ static const char *print_state(struct lttng_live_stream_iterator *s)
                return "ERROR";
        }
 }
-#else
-#define print_dbg(fmt, args...)
-#endif
 
-#define print_stream_state(stream)     \
-       print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64, \
-                       bt_port_get_name(bt_port_from_private_port(stream->port)),      \
-                       print_state(stream), stream->last_returned_inactivity_timestamp,        \
-                       stream->current_inactivity_timestamp)
+static
+void print_stream_state(struct lttng_live_stream_iterator *stream)
+{
+       const bt_port *port;
+
+       port = bt_port_from_private(stream->port);
+       print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64,
+               bt_port_get_name(port),
+               print_state(stream),
+               stream->last_returned_inactivity_timestamp,
+               stream->current_inactivity_timestamp);
+       bt_port_put_ref(port);
+}
 
 BT_HIDDEN
-bool lttng_live_debug;
+bt_bool lttng_live_is_canceled(struct lttng_live_component *lttng_live)
+{
+       bt_component *component;
+       const bt_graph *graph;
+       bt_bool ret;
+
+       if (!lttng_live) {
+               return BT_FALSE;
+       }
+
+       component = bt_component_from_private(lttng_live->private_component);
+       graph = bt_component_get_graph(component);
+       ret = bt_graph_is_canceled(graph);
+       bt_graph_put_ref(graph);
+       bt_component_put_ref(component);
+       return ret;
+}
 
 BT_HIDDEN
 int lttng_live_add_port(struct lttng_live_component *lttng_live,
@@ -99,23 +105,36 @@ int lttng_live_add_port(struct lttng_live_component *lttng_live,
        int ret;
        struct bt_private_port *private_port;
        char name[STREAM_NAME_MAX_LEN];
+       enum bt_component_status status;
 
        ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id);
-       assert(ret > 0);
+       BT_ASSERT(ret > 0);
        strcpy(stream_iter->name, name);
-       private_port = bt_private_component_source_add_output_private_port(
-                       lttng_live->private_component, name, stream_iter);
-       if (!private_port) {
+       if (lttng_live_is_canceled(lttng_live)) {
+               return 0;
+       }
+       status = bt_self_component_source_add_output_port(
+                       lttng_live->private_component, name, stream_iter,
+                       &private_port);
+       switch (status) {
+       case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
+               return 0;
+       case BT_COMPONENT_STATUS_OK:
+               break;
+       default:
                return -1;
        }
-       PDBG("Added port %s\n", name);
+       bt_object_put_ref(private_port);        /* weak */
+       BT_LOGI("Added port %s", name);
 
        if (lttng_live->no_stream_port) {
+               bt_object_get_ref(lttng_live->no_stream_port);
                ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
+               bt_object_put_ref(lttng_live->no_stream_port);
                if (ret) {
                        return -1;
                }
-               BT_PUT(lttng_live->no_stream_port);
+               lttng_live->no_stream_port = NULL;
                lttng_live->no_stream_iter->port = NULL;
        }
        stream_iter->port = private_port;
@@ -126,27 +145,41 @@ BT_HIDDEN
 int lttng_live_remove_port(struct lttng_live_component *lttng_live,
                struct bt_private_port *port)
 {
-       struct bt_component *component;
+       bt_component *component;
        int64_t nr_ports;
        int ret;
 
-       component = bt_component_from_private_component(lttng_live->private_component);
+       component = bt_component_from_private(lttng_live->private_component);
        nr_ports = bt_component_source_get_output_port_count(component);
        if (nr_ports < 0) {
                return -1;
        }
-       BT_PUT(component);
+       BT_COMPONENT_PUT_REF_AND_RESET(component);
        if (nr_ports == 1) {
-               assert(!lttng_live->no_stream_port);
-               lttng_live->no_stream_port =
-                       bt_private_component_source_add_output_private_port(lttng_live->private_component,
-                               "no-stream", lttng_live->no_stream_iter);
-               if (!lttng_live->no_stream_port) {
+               enum bt_component_status status;
+
+               BT_ASSERT(!lttng_live->no_stream_port);
+
+               if (lttng_live_is_canceled(lttng_live)) {
+                       return 0;
+               }
+               status = bt_self_component_source_add_output_port(lttng_live->private_component,
+                               "no-stream", lttng_live->no_stream_iter,
+                               &lttng_live->no_stream_port);
+               switch (status) {
+               case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
+                       return 0;
+               case BT_COMPONENT_STATUS_OK:
+                       break;
+               default:
                        return -1;
                }
+               bt_object_put_ref(lttng_live->no_stream_port);  /* weak */
                lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
        }
+       bt_object_get_ref(port);
        ret = bt_private_port_remove_from_component(port);
+       bt_object_put_ref(port);
        if (ret) {
                return -1;
        }
@@ -168,15 +201,23 @@ struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *sessio
 }
 
 static
-void lttng_live_destroy_trace(struct bt_object *obj)
+void lttng_live_destroy_trace(bt_object *obj)
 {
        struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj);
 
-       PDBG("Destroy trace\n");
-       assert(bt_list_empty(&trace->streams));
+       BT_LOGI("Destroy trace");
+       BT_ASSERT(bt_list_empty(&trace->streams));
        bt_list_del(&trace->node);
+
+       if (trace->trace) {
+               int retval;
+
+               retval = bt_trace_set_is_static(trace->trace);
+               BT_ASSERT(!retval);
+               BT_TRACE_PUT_REF_AND_RESET(trace->trace);
+       }
        lttng_live_metadata_fini(trace);
-       BT_PUT(trace->cc_prio_map);
+       BT_OBJECT_PUT_REF_AND_RESET(trace->cc_prio_map);
        g_free(trace);
 }
 
@@ -196,6 +237,7 @@ struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *sess
        trace->new_metadata_needed = true;
        bt_list_add(&trace->node, &session->traces);
        bt_object_init(&trace->obj, lttng_live_destroy_trace);
+       BT_LOGI("Create trace");
        goto end;
 error:
        g_free(trace);
@@ -212,7 +254,7 @@ struct lttng_live_trace *lttng_live_ref_trace(struct lttng_live_session *session
 
        trace = lttng_live_find_trace(session, trace_id);
        if (trace) {
-               bt_get(trace);
+               bt_object_get_ref(trace);
                return trace;
        }
        return lttng_live_create_trace(session, trace_id);
@@ -221,7 +263,7 @@ struct lttng_live_trace *lttng_live_ref_trace(struct lttng_live_session *session
 BT_HIDDEN
 void lttng_live_unref_trace(struct lttng_live_trace *trace)
 {
-       bt_put(trace);
+       bt_object_put_ref(trace);
 }
 
 static
@@ -236,7 +278,9 @@ void lttng_live_close_trace_streams(struct lttng_live_trace *trace)
 }
 
 BT_HIDDEN
-int lttng_live_add_session(struct lttng_live_component *lttng_live, uint64_t session_id)
+int lttng_live_add_session(struct lttng_live_component *lttng_live,
+               uint64_t session_id, const char *hostname,
+               const char *session_name)
 {
        int ret = 0;
        struct lttng_live_session *s;
@@ -250,12 +294,15 @@ int lttng_live_add_session(struct lttng_live_component *lttng_live, uint64_t ses
        BT_INIT_LIST_HEAD(&s->traces);
        s->lttng_live = lttng_live;
        s->new_streams_needed = true;
+       s->hostname = g_string_new(hostname);
+       s->session_name = g_string_new(session_name);
 
-       PDBG("Reading from session %" PRIu64 "\n", s->id);
+       BT_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s",
+               s->id, hostname, session_name);
        bt_list_add(&s->node, &lttng_live->sessions);
        goto end;
 error:
-       PERR("Error adding session\n");
+       BT_LOGE("Error adding session");
        g_free(s);
        ret = -1;
 end:
@@ -267,12 +314,14 @@ void lttng_live_destroy_session(struct lttng_live_session *session)
 {
        struct lttng_live_trace *trace, *t;
 
-       PDBG("Destroy session\n");
+       BT_LOGI("Destroy session");
        if (session->id != -1ULL) {
                if (lttng_live_detach_session(session)) {
-                       /* Old relayd cannot detach sessions. */
-                       PDBG("Unable to detach session %" PRIu64 "\n",
-                               session->id);
+                       if (!lttng_live_is_canceled(session->lttng_live)) {
+                               /* Old relayd cannot detach sessions. */
+                               BT_LOGD("Unable to detach session %" PRIu64,
+                                       session->id);
+                       }
                }
                session->id = -1ULL;
        }
@@ -280,14 +329,20 @@ void lttng_live_destroy_session(struct lttng_live_session *session)
                lttng_live_close_trace_streams(trace);
        }
        bt_list_del(&session->node);
+       if (session->hostname) {
+               g_string_free(session->hostname, TRUE);
+       }
+       if (session->session_name) {
+               g_string_free(session->session_name, TRUE);
+       }
        g_free(session);
 }
 
 BT_HIDDEN
-void lttng_live_iterator_finalize(struct bt_private_notification_iterator *it)
+void lttng_live_iterator_finalize(bt_self_message_iterator *it)
 {
        struct lttng_live_stream_iterator_generic *s =
-                       bt_private_notification_iterator_get_user_data(it);
+                       bt_self_message_iterator_get_user_data(it);
 
        switch (s->type) {
        case LIVE_STREAM_TYPE_NO_STREAM:
@@ -307,7 +362,7 @@ void lttng_live_iterator_finalize(struct bt_private_notification_iterator *it)
 }
 
 static
-enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
+enum bt_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
                struct lttng_live_component *lttng_live,
                struct lttng_live_stream_iterator *lttng_live_stream)
 {
@@ -317,16 +372,16 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_check_stream_sta
                break;
        case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
                /* Invalid state. */
-               PERR("Unexpected stream state \"ACTIVE_NO_DATA\"\n");
-               return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+               BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
+               abort();
        case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
                /* Invalid state. */
-               PERR("Unexpected stream state \"QUIESCENT_NO_DATA\"\n");
-               return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+               BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
+               abort();
        case LTTNG_LIVE_STREAM_EOF:
                break;
        }
-       return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+       return BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
 }
 
 /*
@@ -338,21 +393,21 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_check_stream_sta
  *   return EOF.
  */
 static
-enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
+enum bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
                struct lttng_live_component *lttng_live,
                struct lttng_live_stream_iterator *lttng_live_stream)
 {
-       enum bt_ctf_lttng_live_iterator_status ret =
-                       BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+       enum bt_lttng_live_iterator_status ret =
+                       BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
        struct packet_index index;
        enum lttng_live_stream_state orig_state = lttng_live_stream->state;
 
        if (lttng_live_stream->trace->new_metadata_needed) {
-               ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+               ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
                goto end;
        }
        if (lttng_live_stream->trace->session->new_streams_needed) {
-               ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+               ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
                goto end;
        }
        if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
@@ -360,18 +415,18 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_da
                goto end;
        }
        ret = lttng_live_get_next_index(lttng_live, lttng_live_stream, &index);
-       if (ret != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+       if (ret != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
                goto end;
        }
-       assert(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
+       BT_ASSERT(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
        if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
                if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
                                && lttng_live_stream->last_returned_inactivity_timestamp ==
                                        lttng_live_stream->current_inactivity_timestamp) {
-                       ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+                       ret = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
                        print_stream_state(lttng_live_stream);
                } else {
-                       ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+                       ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
                }
                goto end;
        }
@@ -379,7 +434,7 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_da
        lttng_live_stream->offset = index.offset;
        lttng_live_stream->len = index.packet_size / CHAR_BIT;
 end:
-       if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+       if (ret == BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
                ret = lttng_live_iterator_next_check_stream_state(
                                lttng_live, lttng_live_stream);
        }
@@ -387,40 +442,40 @@ end:
 }
 
 /*
- * Creation of the notification requires the ctf trace to be created
+ * Creation of the message requires the ctf trace to be created
  * beforehand, but the live protocol gives us all streams (including
  * metadata) at once. So we split it in three steps: getting streams,
  * getting metadata (which creates the ctf trace), and then creating the
- * per-stream notifications.
+ * per-stream messages.
  */
 static
-enum bt_ctf_lttng_live_iterator_status lttng_live_get_session(
+enum bt_lttng_live_iterator_status lttng_live_get_session(
                struct lttng_live_component *lttng_live,
                struct lttng_live_session *session)
 {
-       enum bt_ctf_lttng_live_iterator_status status;
+       enum bt_lttng_live_iterator_status status;
        struct lttng_live_trace *trace, *t;
 
        if (lttng_live_attach_session(session)) {
-               return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+               if (lttng_live_is_canceled(lttng_live)) {
+                       return BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+               } else {
+                       return BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+               }
        }
        status = lttng_live_get_new_streams(session);
-       if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
-                       status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
+       if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK &&
+                       status != BT_LTTNG_LIVE_ITERATOR_STATUS_END) {
                return status;
        }
        bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
                status = lttng_live_metadata_update(trace);
-               if (status == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
-                       int retval;
-
-                       retval = bt_ctf_trace_set_is_static(trace->trace);
-                       assert(!retval);
-               } else if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+               if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK &&
+                               status != BT_LTTNG_LIVE_ITERATOR_STATUS_END) {
                        return status;
                }
        }
-       return lttng_live_lazy_notif_init(session);
+       return lttng_live_lazy_msg_init(session);
 }
 
 BT_HIDDEN
@@ -449,11 +504,11 @@ void lttng_live_force_new_streams_and_metadata(struct lttng_live_component *lttn
 }
 
 static
-enum bt_notification_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
+enum bt_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
                struct lttng_live_component *lttng_live)
 {
-       enum bt_ctf_lttng_live_iterator_status ret =
-                       BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+       enum bt_lttng_live_iterator_status ret =
+                       BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
        unsigned int nr_sessions_opened = 0;
        struct lttng_live_session *session, *s;
 
@@ -472,16 +527,16 @@ enum bt_notification_iterator_status lttng_live_iterator_next_handle_new_streams
         * currently ongoing.
         */
        if (bt_list_empty(&lttng_live->sessions)) {
-               ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
+               ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
                goto end;
        }
        bt_list_for_each_entry(session, &lttng_live->sessions, node) {
                ret = lttng_live_get_session(lttng_live, session);
                switch (ret) {
-               case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
+               case BT_LTTNG_LIVE_ITERATOR_STATUS_OK:
                        break;
-               case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
-                       ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+               case BT_LTTNG_LIVE_ITERATOR_STATUS_END:
+                       ret = BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
                        break;
                default:
                        goto end;
@@ -491,25 +546,25 @@ enum bt_notification_iterator_status lttng_live_iterator_next_handle_new_streams
                }
        }
 end:
-       if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) {
-               ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
+       if (ret == BT_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) {
+               ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
        }
        return ret;
 }
 
 static
-enum bt_ctf_lttng_live_iterator_status emit_inactivity_notification(
+enum bt_lttng_live_iterator_status emit_inactivity_message(
                struct lttng_live_component *lttng_live,
                struct lttng_live_stream_iterator *lttng_live_stream,
-               struct bt_notification **notification,
+               const bt_message **message,
                uint64_t timestamp)
 {
-       enum bt_ctf_lttng_live_iterator_status ret =
-                       BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+       enum bt_lttng_live_iterator_status ret =
+                       BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
        struct lttng_live_trace *trace;
-       struct bt_ctf_clock_class *clock_class = NULL;
-       struct bt_ctf_clock_value *clock_value = NULL;
-       struct bt_notification *notif = NULL;
+       const bt_clock_class *clock_class = NULL;
+       bt_clock_snapshot *clock_snapshot = NULL;
+       const bt_message *msg = NULL;
        int retval;
 
        trace = lttng_live_stream->trace;
@@ -520,132 +575,132 @@ enum bt_ctf_lttng_live_iterator_status emit_inactivity_notification(
        if (!clock_class) {
                goto error;
        }
-       clock_value = bt_ctf_clock_value_create(clock_class, timestamp);
-       if (!clock_value) {
+       clock_snapshot = bt_clock_snapshot_create(clock_class, timestamp);
+       if (!clock_snapshot) {
                goto error;
        }
-       notif = bt_notification_inactivity_create(trace->cc_prio_map);
-       if (!notif) {
+       msg = bt_message_inactivity_create(trace->cc_prio_map);
+       if (!msg) {
                goto error;
        }
-       retval = bt_notification_inactivity_set_clock_value(notif, clock_value);
+       retval = bt_message_inactivity_set_clock_snapshot(msg, clock_snapshot);
        if (retval) {
                goto error;
        }
-       *notification = notif;
+       *message = msg;
 end:
-       bt_put(clock_value);
-       bt_put(clock_class);
+       bt_object_put_ref(clock_snapshot);
+       bt_clock_class_put_ref(clock_class);
        return ret;
 
 error:
-       ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
-       bt_put(notif);
+       ret = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+       bt_message_put_ref(msg);
        goto end;
 }
 
 static
-enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
+enum bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
                struct lttng_live_component *lttng_live,
                struct lttng_live_stream_iterator *lttng_live_stream,
-               struct bt_notification **notification)
+               const bt_message **message)
 {
-       enum bt_ctf_lttng_live_iterator_status ret =
-                       BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
-       struct bt_ctf_clock_class *clock_class = NULL;
-       struct bt_ctf_clock_value *clock_value = NULL;
+       enum bt_lttng_live_iterator_status ret =
+                       BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+       const bt_clock_class *clock_class = NULL;
+       bt_clock_snapshot *clock_snapshot = NULL;
 
        if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
-               return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+               return BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
        }
 
        if (lttng_live_stream->current_inactivity_timestamp ==
                        lttng_live_stream->last_returned_inactivity_timestamp) {
                lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
-               ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+               ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
                goto end;
        }
 
-       ret = emit_inactivity_notification(lttng_live, lttng_live_stream, notification,
+       ret = emit_inactivity_message(lttng_live, lttng_live_stream, message,
                        (uint64_t) lttng_live_stream->current_inactivity_timestamp);
 
        lttng_live_stream->last_returned_inactivity_timestamp =
                        lttng_live_stream->current_inactivity_timestamp;
 end:
-       bt_put(clock_value);
-       bt_put(clock_class);
+       bt_object_put_ref(clock_snapshot);
+       bt_clock_class_put_ref(clock_class);
        return ret;
 }
 
 static
-enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
+enum bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
                struct lttng_live_component *lttng_live,
                struct lttng_live_stream_iterator *lttng_live_stream,
-               struct bt_notification **notification)
+               const bt_message **message)
 {
-       enum bt_ctf_lttng_live_iterator_status ret =
-                       BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
-       enum bt_ctf_notif_iter_status status;
+       enum bt_lttng_live_iterator_status ret =
+                       BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+       enum bt_msg_iter_status status;
        struct lttng_live_session *session;
 
        bt_list_for_each_entry(session, &lttng_live->sessions, node) {
                struct lttng_live_trace *trace;
 
                if (session->new_streams_needed) {
-                       return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+                       return BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
                }
                bt_list_for_each_entry(trace, &session->traces, node) {
                        if (trace->new_metadata_needed) {
-                               return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+                               return BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
                        }
                }
        }
 
        if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
-               return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+               return BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
        }
-       if (lttng_live_stream->packet_end_notif_queue) {
-               *notification = lttng_live_stream->packet_end_notif_queue;
-               lttng_live_stream->packet_end_notif_queue = NULL;
-               status = BT_CTF_NOTIF_ITER_STATUS_OK;
+       if (lttng_live_stream->packet_end_msg_queue) {
+               *message = lttng_live_stream->packet_end_msg_queue;
+               lttng_live_stream->packet_end_msg_queue = NULL;
+               status = BT_MSG_ITER_STATUS_OK;
        } else {
-               status = bt_ctf_notif_iter_get_next_notification(
-                               lttng_live_stream->notif_iter,
+               status = bt_msg_iter_get_next_message(
+                               lttng_live_stream->msg_iter,
                                lttng_live_stream->trace->cc_prio_map,
-                               notification);
-               if (status == BT_CTF_NOTIF_ITER_STATUS_OK) {
+                               message);
+               if (status == BT_MSG_ITER_STATUS_OK) {
                        /*
                         * Consider empty packets as inactivity.
                         */
-                       if (bt_notification_get_type(*notification) == BT_NOTIFICATION_TYPE_PACKET_END) {
-                               lttng_live_stream->packet_end_notif_queue = *notification;
-                               *notification = NULL;
-                               return emit_inactivity_notification(lttng_live,
-                                               lttng_live_stream, notification,
+                       if (bt_message_get_type(*message) == BT_MESSAGE_TYPE_PACKET_END) {
+                               lttng_live_stream->packet_end_msg_queue = *message;
+                               *message = NULL;
+                               return emit_inactivity_message(lttng_live,
+                                               lttng_live_stream, message,
                                                lttng_live_stream->current_packet_end_timestamp);
                        }
                }
        }
        switch (status) {
-       case BT_CTF_NOTIF_ITER_STATUS_EOF:
-               ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
+       case BT_MSG_ITER_STATUS_EOF:
+               ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
                break;
-       case BT_CTF_NOTIF_ITER_STATUS_OK:
-               ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+       case BT_MSG_ITER_STATUS_OK:
+               ret = BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
                break;
-       case BT_CTF_NOTIF_ITER_STATUS_AGAIN:
+       case BT_MSG_ITER_STATUS_AGAIN:
                /*
                 * Continue immediately (end of packet). The next
                 * get_index may return AGAIN to delay the following
                 * attempt.
                 */
-               ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+               ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
                break;
-       case BT_CTF_NOTIF_ITER_STATUS_INVAL:
+       case BT_MSG_ITER_STATUS_INVAL:
                /* No argument provided by the user, so don't return INVAL. */
-       case BT_CTF_NOTIF_ITER_STATUS_ERROR:
+       case BT_MSG_ITER_STATUS_ERROR:
        default:
-               ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+               ret = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
                break;
        }
        return ret;
@@ -686,7 +741,7 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_activ
  *            handle_active_data_streams()
  *                - if at least one stream is ACTIVE_DATA:
  *                    - get stream event with lowest timestamp from heap
- *                    - make that stream event the current notification.
+ *                    - make that stream event the current message.
  *                    - move this stream heap position to its next event
  *                      - if we need to fetch data from relayd, move
  *                        stream to ACTIVE_NO_DATA.
@@ -700,134 +755,132 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_activ
  * When disconnected from relayd: try to re-connect endlessly.
  */
 static
-struct bt_notification_iterator_next_return lttng_live_iterator_next_stream(
-               struct bt_private_notification_iterator *iterator,
+bt_message_iterator_next_method_return lttng_live_iterator_next_stream(
+               bt_self_message_iterator *iterator,
                struct lttng_live_stream_iterator *stream_iter)
 {
-       enum bt_ctf_lttng_live_iterator_status status;
-       struct bt_notification_iterator_next_return next_return;
+       enum bt_lttng_live_iterator_status status;
+       bt_message_iterator_next_method_return next_return;
        struct lttng_live_component *lttng_live;
 
        lttng_live = stream_iter->trace->session->lttng_live;
 retry:
        print_stream_state(stream_iter);
-       next_return.notification = NULL;
+       next_return.message = NULL;
        status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
-       if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+       if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
                goto end;
        }
        status = lttng_live_iterator_next_handle_one_no_data_stream(
                        lttng_live, stream_iter);
-       if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+       if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
                goto end;
        }
        status = lttng_live_iterator_next_handle_one_quiescent_stream(
-                       lttng_live, stream_iter, &next_return.notification);
-       if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
-               assert(next_return.notification == NULL);
+                       lttng_live, stream_iter, &next_return.message);
+       if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+               BT_ASSERT(next_return.message == NULL);
                goto end;
        }
-       if (next_return.notification) {
+       if (next_return.message) {
                goto end;
        }
        status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live,
-                       stream_iter, &next_return.notification);
-       if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
-               assert(next_return.notification == NULL);
+                       stream_iter, &next_return.message);
+       if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+               BT_ASSERT(next_return.message == NULL);
        }
 
 end:
        switch (status) {
-       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
+       case BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
                print_dbg("continue");
                goto retry;
-       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
-               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
+       case BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
+               next_return.status = BT_MESSAGE_ITERATOR_STATUS_AGAIN;
                print_dbg("again");
                break;
-       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
-               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
+       case BT_LTTNG_LIVE_ITERATOR_STATUS_END:
+               next_return.status = BT_MESSAGE_ITERATOR_STATUS_END;
                print_dbg("end");
                break;
-       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
-               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
+       case BT_LTTNG_LIVE_ITERATOR_STATUS_OK:
+               next_return.status = BT_MESSAGE_ITERATOR_STATUS_OK;
                print_dbg("ok");
                break;
-       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
-               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
+       case BT_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
+               next_return.status = BT_MESSAGE_ITERATOR_STATUS_INVALID;
                break;
-       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
-               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
+       case BT_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
+               next_return.status = BT_MESSAGE_ITERATOR_STATUS_NOMEM;
                break;
-       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
-               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
+       case BT_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
+               next_return.status = BT_MESSAGE_ITERATOR_STATUS_UNSUPPORTED;
                break;
-       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
+       case BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
        default:        /* fall-through */
-               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+               next_return.status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
                break;
        }
        return next_return;
 }
 
 static
-struct bt_notification_iterator_next_return lttng_live_iterator_next_no_stream(
-               struct bt_private_notification_iterator *iterator,
+bt_message_iterator_next_method_return lttng_live_iterator_next_no_stream(
+               bt_self_message_iterator *iterator,
                struct lttng_live_no_stream_iterator *no_stream_iter)
 {
-       enum bt_ctf_lttng_live_iterator_status status;
-       struct bt_notification_iterator_next_return next_return;
+       enum bt_lttng_live_iterator_status status;
+       bt_message_iterator_next_method_return next_return;
        struct lttng_live_component *lttng_live;
 
        lttng_live = no_stream_iter->lttng_live;
 retry:
        lttng_live_force_new_streams_and_metadata(lttng_live);
+       next_return.message = NULL;
        status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
-       if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+       if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
                goto end;
        }
        if (no_stream_iter->port) {
-               status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+               status = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
        } else {
-               status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
+               status = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
        }
 end:
        switch (status) {
-       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
+       case BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
                goto retry;
-       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
-               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
+       case BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
+               next_return.status = BT_MESSAGE_ITERATOR_STATUS_AGAIN;
                break;
-       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
-               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
+       case BT_LTTNG_LIVE_ITERATOR_STATUS_END:
+               next_return.status = BT_MESSAGE_ITERATOR_STATUS_END;
                break;
-       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
-               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
+       case BT_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
+               next_return.status = BT_MESSAGE_ITERATOR_STATUS_INVALID;
                break;
-       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
-               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
+       case BT_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
+               next_return.status = BT_MESSAGE_ITERATOR_STATUS_NOMEM;
                break;
-       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
-               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
+       case BT_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
+               next_return.status = BT_MESSAGE_ITERATOR_STATUS_UNSUPPORTED;
                break;
-       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
-               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
-               break;
-       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
+       case BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
        default:        /* fall-through */
-               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+               next_return.status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
                break;
        }
        return next_return;
 }
 
 BT_HIDDEN
-struct bt_notification_iterator_next_return lttng_live_iterator_next(
-               struct bt_private_notification_iterator *iterator)
+bt_message_iterator_next_method_return lttng_live_iterator_next(
+               bt_self_message_iterator *iterator)
 {
        struct lttng_live_stream_iterator_generic *s =
-                       bt_private_notification_iterator_get_user_data(iterator);
-       struct bt_notification_iterator_next_return next_return;
+                       bt_self_message_iterator_get_user_data(iterator);
+       bt_message_iterator_next_method_return next_return;
 
        switch (s->type) {
        case LIVE_STREAM_TYPE_NO_STREAM:
@@ -839,33 +892,31 @@ struct bt_notification_iterator_next_return lttng_live_iterator_next(
                        container_of(s, struct lttng_live_stream_iterator, p));
                break;
        default:
-               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+               next_return.status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
                break;
        }
        return next_return;
 }
 
 BT_HIDDEN
-enum bt_notification_iterator_status lttng_live_iterator_init(
-               struct bt_private_notification_iterator *it,
+enum bt_message_iterator_status lttng_live_iterator_init(
+               bt_self_message_iterator *it,
                struct bt_private_port *port)
 {
-       enum bt_notification_iterator_status ret =
-                       BT_NOTIFICATION_ITERATOR_STATUS_OK;
+       enum bt_message_iterator_status ret =
+                       BT_MESSAGE_ITERATOR_STATUS_OK;
        struct lttng_live_stream_iterator_generic *s;
-       struct lttng_live_component *lttng_live;
 
-       assert(it);
+       BT_ASSERT(it);
 
        s = bt_private_port_get_user_data(port);
-       assert(s);
+       BT_ASSERT(s);
        switch (s->type) {
        case LIVE_STREAM_TYPE_NO_STREAM:
        {
                struct lttng_live_no_stream_iterator *no_stream_iter =
                        container_of(s, struct lttng_live_no_stream_iterator, p);
-               lttng_live = no_stream_iter->lttng_live;
-               ret = bt_private_notification_iterator_set_user_data(it, no_stream_iter);
+               ret = bt_self_message_iterator_set_user_data(it, no_stream_iter);
                if (ret) {
                        goto error;
                }
@@ -875,78 +926,101 @@ enum bt_notification_iterator_status lttng_live_iterator_init(
        {
                struct lttng_live_stream_iterator *stream_iter =
                        container_of(s, struct lttng_live_stream_iterator, p);
-               lttng_live = stream_iter->trace->session->lttng_live;
-               ret = bt_private_notification_iterator_set_user_data(it, stream_iter);
+               ret = bt_self_message_iterator_set_user_data(it, stream_iter);
                if (ret) {
                        goto error;
                }
                break;
        }
        default:
-               ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+               ret = BT_MESSAGE_ITERATOR_STATUS_ERROR;
                goto end;
        }
 
 end:
        return ret;
 error:
-       if (bt_private_notification_iterator_set_user_data(it, NULL)
-                       != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
-               PERR("Error setting private data to NULL\n");
+       if (bt_self_message_iterator_set_user_data(it, NULL)
+                       != BT_MESSAGE_ITERATOR_STATUS_OK) {
+               BT_LOGE("Error setting private data to NULL");
        }
        goto end;
 }
 
 static
-struct bt_value *lttng_live_query_list_sessions(struct bt_component_class *comp_class,
-               struct bt_value *params)
+bt_component_class_query_method_return lttng_live_query_list_sessions(
+               const bt_component_class *comp_class,
+               const bt_query_executor *query_exec,
+               bt_value *params)
 {
-       struct bt_value *url_value = NULL;
-       struct bt_value *results = NULL;
+       bt_component_class_query_method_return query_ret = {
+               .result = NULL,
+               .status = BT_QUERY_STATUS_OK,
+       };
+
+       bt_value *url_value = NULL;
        const char *url;
        struct bt_live_viewer_connection *viewer_connection = NULL;
-       enum bt_value_status ret;
 
        url_value = bt_value_map_get(params, "url");
        if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) {
-               fprintf(stderr, "Mandatory \"url\" parameter missing\n");
+               BT_LOGW("Mandatory \"url\" parameter missing");
+               query_ret.status = BT_QUERY_STATUS_INVALID_PARAMS;
                goto error;
        }
 
-       ret = bt_value_string_get(url_value, &url);
-       if (ret != BT_VALUE_STATUS_OK) {
-               fprintf(stderr, "\"url\" parameter is required to be a string value\n");
+       if (bt_value_string_get(url_value, &url) != BT_VALUE_STATUS_OK) {
+               BT_LOGW("\"url\" parameter is required to be a string value");
+               query_ret.status = BT_QUERY_STATUS_INVALID_PARAMS;
                goto error;
        }
 
-       viewer_connection = bt_live_viewer_connection_create(url, stderr);
+       viewer_connection = bt_live_viewer_connection_create(url, NULL);
        if (!viewer_connection) {
-               ret = BT_COMPONENT_STATUS_NOMEM;
                goto error;
        }
 
-       results = bt_live_viewer_connection_list_sessions(viewer_connection);
+       query_ret.result =
+               bt_live_viewer_connection_list_sessions(viewer_connection);
+       if (!query_ret.result) {
+               goto error;
+       }
+
        goto end;
+
 error:
-       BT_PUT(results);
+       BT_OBJECT_PUT_REF_AND_RESET(query_ret.result);
+
+       if (query_ret.status >= 0) {
+               query_ret.status = BT_QUERY_STATUS_ERROR;
+       }
+
 end:
        if (viewer_connection) {
                bt_live_viewer_connection_destroy(viewer_connection);
        }
-       BT_PUT(url_value);
-       return results;
+       BT_VALUE_PUT_REF_AND_RESET(url_value);
+       return query_ret;
 }
 
 BT_HIDDEN
-struct bt_value *lttng_live_query(struct bt_component_class *comp_class,
-               const char *object, struct bt_value *params)
+bt_component_class_query_method_return lttng_live_query(
+               const bt_component_class *comp_class,
+               const bt_query_executor *query_exec,
+               const char *object, bt_value *params)
 {
+       bt_component_class_query_method_return ret = {
+               .result = NULL,
+               .status = BT_QUERY_STATUS_OK,
+       };
+
        if (strcmp(object, "sessions") == 0) {
                return lttng_live_query_list_sessions(comp_class,
-                       params);
+                       query_exec, params);
        }
-       fprintf(stderr, "Unknown query object `%s`\n", object);
-       return NULL;
+       BT_LOGW("Unknown query object `%s`", object);
+       ret.status = BT_QUERY_STATUS_INVALID_OBJECT;
+       return ret;
 }
 
 static
@@ -958,14 +1032,15 @@ void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
        bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
                lttng_live_destroy_session(session);
        }
-       BT_PUT(lttng_live->viewer_connection);
+       BT_OBJECT_PUT_REF_AND_RESET(lttng_live->viewer_connection);
        if (lttng_live->url) {
                g_string_free(lttng_live->url, TRUE);
        }
        if (lttng_live->no_stream_port) {
+               bt_object_get_ref(lttng_live->no_stream_port);
                ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
-               assert(!ret);
-               BT_PUT(lttng_live->no_stream_port);
+               bt_object_put_ref(lttng_live->no_stream_port);
+               BT_ASSERT(!ret);
        }
        if (lttng_live->no_stream_iter) {
                g_free(lttng_live->no_stream_iter);
@@ -974,9 +1049,9 @@ void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
 }
 
 BT_HIDDEN
-void lttng_live_component_finalize(struct bt_private_component *component)
+void lttng_live_component_finalize(bt_self_component *component)
 {
-       void *data = bt_private_component_get_user_data(component);
+       void *data = bt_self_component_get_user_data(component);
 
        if (!data) {
                return;
@@ -985,11 +1060,11 @@ void lttng_live_component_finalize(struct bt_private_component *component)
 }
 
 static
-struct lttng_live_component *lttng_live_component_create(struct bt_value *params,
-               struct bt_private_component *private_component)
+struct lttng_live_component *lttng_live_component_create(bt_value *params,
+               bt_self_component *private_component)
 {
        struct lttng_live_component *lttng_live;
-       struct bt_value *value = NULL;
+       bt_value *value = NULL;
        const char *url;
        enum bt_value_status ret;
 
@@ -997,33 +1072,26 @@ struct lttng_live_component *lttng_live_component_create(struct bt_value *params
        if (!lttng_live) {
                goto end;
        }
-       lttng_live->error_fp = stderr;
        /* TODO: make this an overridable parameter. */
        lttng_live->max_query_size = MAX_QUERY_SIZE;
        BT_INIT_LIST_HEAD(&lttng_live->sessions);
        value = bt_value_map_get(params, "url");
        if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) {
-               fprintf(stderr, "Mandatory \"url\" parameter missing\n");
-               goto error;
-       }
-       ret = bt_value_string_get(value, &url);
-       if (ret != BT_VALUE_STATUS_OK) {
-               fprintf(stderr, "\"url\" parameter is required to be a string value\n");
+               BT_LOGW("Mandatory \"url\" parameter missing");
                goto error;
        }
+       url = bt_value_string_get(value);
        lttng_live->url = g_string_new(url);
        if (!lttng_live->url) {
                goto error;
        }
+       BT_VALUE_PUT_REF_AND_RESET(value);
        lttng_live->viewer_connection =
-               bt_live_viewer_connection_create(lttng_live->url->str,
-                       stderr);
+               bt_live_viewer_connection_create(lttng_live->url->str, lttng_live);
        if (!lttng_live->viewer_connection) {
-               ret = BT_COMPONENT_STATUS_NOMEM;
                goto error;
        }
        if (lttng_live_create_viewer_session(lttng_live)) {
-               ret = BT_COMPONENT_STATUS_ERROR;
                goto error;
        }
        lttng_live->private_component = private_component;
@@ -1038,17 +1106,18 @@ end:
 }
 
 BT_HIDDEN
-enum bt_component_status lttng_live_component_init(struct bt_private_component *component,
-               struct bt_value *params, void *init_method_data)
+enum bt_component_status lttng_live_component_init(
+               bt_self_component *private_component,
+               bt_value *params, void *init_method_data)
 {
        struct lttng_live_component *lttng_live;
        enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
 
-       lttng_live_debug = g_strcmp0(getenv("LTTNG_LIVE_DEBUG"), "1") == 0;
-
        /* Passes ownership of iter ref to lttng_live_component_create. */
-       lttng_live = lttng_live_component_create(params, component);
+       lttng_live = lttng_live_component_create(params, private_component);
        if (!lttng_live) {
+               //TODO : we need access to the application cancel state
+               //because we are not part of a graph yet.
                ret = BT_COMPONENT_STATUS_NOMEM;
                goto end;
        }
@@ -1056,14 +1125,20 @@ enum bt_component_status lttng_live_component_init(struct bt_private_component *
        lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1);
        lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM;
        lttng_live->no_stream_iter->lttng_live = lttng_live;
-
-       lttng_live->no_stream_port =
-               bt_private_component_source_add_output_private_port(
+       if (lttng_live_is_canceled(lttng_live)) {
+               goto end;
+       }
+       ret = bt_self_component_source_add_output_port(
                                lttng_live->private_component, "no-stream",
-                               lttng_live->no_stream_iter);
+                               lttng_live->no_stream_iter,
+                               &lttng_live->no_stream_port);
+       if (ret != BT_COMPONENT_STATUS_OK) {
+               goto end;
+       }
+       bt_object_put_ref(lttng_live->no_stream_port);  /* weak */
        lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
 
-       ret = bt_private_component_set_user_data(component, lttng_live);
+       ret = bt_self_component_set_user_data(private_component, lttng_live);
        if (ret != BT_COMPONENT_STATUS_OK) {
                goto error;
        }
@@ -1071,7 +1146,44 @@ enum bt_component_status lttng_live_component_init(struct bt_private_component *
 end:
        return ret;
 error:
-       (void) bt_private_component_set_user_data(component, NULL);
+       (void) bt_self_component_set_user_data(private_component, NULL);
        lttng_live_component_destroy_data(lttng_live);
        return ret;
 }
+
+BT_HIDDEN
+enum bt_component_status lttng_live_accept_port_connection(
+               bt_self_component *private_component,
+               struct bt_private_port *self_private_port,
+               const bt_port *other_port)
+{
+       struct lttng_live_component *lttng_live =
+                       bt_self_component_get_user_data(private_component);
+       bt_component *other_component;
+       enum bt_component_status status = BT_COMPONENT_STATUS_OK;
+       const bt_port *self_port = bt_port_from_private(self_private_port);
+
+       other_component = bt_port_get_component(other_port);
+       bt_component_put_ref(other_component);  /* weak */
+
+       if (!lttng_live->downstream_component) {
+               lttng_live->downstream_component = other_component;
+               goto end;
+       }
+
+       /*
+        * Compare prior component to ensure we are connected to the
+        * same downstream component as prior ports.
+        */
+       if (lttng_live->downstream_component != other_component) {
+               BT_LOGW("Cannot connect ctf.lttng-live component port \"%s\" to component \"%s\": already connected to component \"%s\".",
+                       bt_port_get_name(self_port),
+                       bt_component_get_name(other_component),
+                       bt_component_get_name(lttng_live->downstream_component));
+               status = BT_COMPONENT_STATUS_REFUSE_PORT_CONNECTION;
+               goto end;
+       }
+end:
+       bt_port_put_ref(self_port);
+       return status;
+}
This page took 0.045989 seconds and 4 git commands to generate.