Adapt `src.ctf.lttng-live` to current API
[babeltrace.git] / plugins / ctf / lttng-live / lttng-live.c
index 9fd35b2a21d15b9a410e741fc2633dc019953794..40a145607216b9bce6b856807abad69f272c677a 100644 (file)
@@ -3,6 +3,7 @@
  *
  * Babeltrace CTF LTTng-live Client Component
  *
+ * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
  * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
  * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
  *
 #define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC"
 #include "logging.h"
 
+#include <glib.h>
+#include <inttypes.h>
+#include <unistd.h>
+
+#include <babeltrace/assert-internal.h>
 #include <babeltrace/babeltrace.h>
 #include <babeltrace/compiler-internal.h>
 #include <babeltrace/types.h>
-#include <inttypes.h>
-#include <glib.h>
-#include <babeltrace/assert-internal.h>
-#include <unistd.h>
 #include <plugins-common.h>
 
 #include "data-stream.h"
 #include "metadata.h"
-#include "lttng-live-internal.h"
+#include "lttng-live.h"
 
-#define MAX_QUERY_SIZE         (256*1024)
+#define MAX_QUERY_SIZE                     (256*1024)
+#define URL_PARAM                          "url"
+#define SESS_NOT_FOUND_ACTION_PARAM        "session-not-found-action"
+#define SESS_NOT_FOUND_ACTION_CONTINUE_STR  "continue"
+#define SESS_NOT_FOUND_ACTION_FAIL_STR     "fail"
+#define SESS_NOT_FOUND_ACTION_END_STR      "end"
 
 #define print_dbg(fmt, ...)    BT_LOGD(fmt, ## __VA_ARGS__)
 
-static const char *print_state(struct lttng_live_stream_iterator *s)
+static
+const char *print_live_iterator_status(enum lttng_live_iterator_status status)
+{
+       switch (status) {
+       case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
+               return "LTTNG_LIVE_ITERATOR_STATUS_CONTINUE";
+       case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
+               return "LTTNG_LIVE_ITERATOR_STATUS_AGAIN";
+       case LTTNG_LIVE_ITERATOR_STATUS_END:
+               return "LTTNG_LIVE_ITERATOR_STATUS_END";
+       case LTTNG_LIVE_ITERATOR_STATUS_OK:
+               return "LTTNG_LIVE_ITERATOR_STATUS_OK";
+       case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
+               return "LTTNG_LIVE_ITERATOR_STATUS_INVAL";
+       case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
+               return "LTTNG_LIVE_ITERATOR_STATUS_ERROR";
+       case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
+               return "LTTNG_LIVE_ITERATOR_STATUS_NOMEM";
+       case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
+               return "LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED";
+       default:
+               abort();
+       }
+}
+
+static
+const char *print_state(struct lttng_live_stream_iterator *s)
 {
        switch (s->state) {
        case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
@@ -65,159 +98,68 @@ static const char *print_state(struct lttng_live_stream_iterator *s)
        }
 }
 
-static
-void print_stream_state(struct lttng_live_stream_iterator *stream)
-{
-       const bt_port *port;
-
-       port = bt_port_from_private(stream->port);
-       print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64,
-               bt_port_get_name(port),
-               print_state(stream),
-               stream->last_returned_inactivity_timestamp,
-               stream->current_inactivity_timestamp);
-       bt_port_put_ref(port);
-}
+#define print_stream_state(live_stream_iter) \
+       do { \
+               BT_LOGD("stream state %s last_inact_ts %" PRId64  \
+                       ", curr_inact_ts %" PRId64, \
+                       print_state(live_stream_iter), \
+                       live_stream_iter->last_inactivity_ts, \
+                       live_stream_iter->current_inactivity_ts); \
+       } while (0);
 
 BT_HIDDEN
-bt_bool lttng_live_is_canceled(struct lttng_live_component *lttng_live)
+bool lttng_live_is_canceled(struct lttng_live_component *lttng_live)
 {
-       bt_component *component;
-       const bt_graph *graph;
-       bt_bool ret;
+       const bt_component *component;
+       bool ret;
 
        if (!lttng_live) {
-               return BT_FALSE;
-       }
-
-       component = bt_component_from_private(lttng_live->private_component);
-       graph = bt_component_get_graph(component);
-       ret = bt_graph_is_canceled(graph);
-       bt_graph_put_ref(graph);
-       bt_component_put_ref(component);
-       return ret;
-}
-
-BT_HIDDEN
-int lttng_live_add_port(struct lttng_live_component *lttng_live,
-               struct lttng_live_stream_iterator *stream_iter)
-{
-       int ret;
-       struct bt_private_port *private_port;
-       char name[STREAM_NAME_MAX_LEN];
-       bt_component_status status;
-
-       ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id);
-       BT_ASSERT(ret > 0);
-       strcpy(stream_iter->name, name);
-       if (lttng_live_is_canceled(lttng_live)) {
-               return 0;
-       }
-       status = bt_self_component_source_add_output_port(
-                       lttng_live->private_component, name, stream_iter,
-                       &private_port);
-       switch (status) {
-       case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
-               return 0;
-       case BT_COMPONENT_STATUS_OK:
-               break;
-       default:
-               return -1;
-       }
-       bt_object_put_ref(private_port);        /* weak */
-       BT_LOGI("Added port %s", name);
-
-       if (lttng_live->no_stream_port) {
-               bt_object_get_ref(lttng_live->no_stream_port);
-               ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
-               bt_object_put_ref(lttng_live->no_stream_port);
-               if (ret) {
-                       return -1;
-               }
-               lttng_live->no_stream_port = NULL;
-               lttng_live->no_stream_iter->port = NULL;
+               ret = false;
+               goto end;
        }
-       stream_iter->port = private_port;
-       return 0;
-}
 
-BT_HIDDEN
-int lttng_live_remove_port(struct lttng_live_component *lttng_live,
-               struct bt_private_port *port)
-{
-       bt_component *component;
-       int64_t nr_ports;
-       int ret;
-
-       component = bt_component_from_private(lttng_live->private_component);
-       nr_ports = bt_component_source_get_output_port_count(component);
-       if (nr_ports < 0) {
-               return -1;
-       }
-       BT_COMPONENT_PUT_REF_AND_RESET(component);
-       if (nr_ports == 1) {
-               bt_component_status status;
+       component = bt_component_source_as_component_const(
+               bt_self_component_source_as_component_source(
+               lttng_live->self_comp));
 
-               BT_ASSERT(!lttng_live->no_stream_port);
+       ret = bt_component_graph_is_canceled(component);
 
-               if (lttng_live_is_canceled(lttng_live)) {
-                       return 0;
-               }
-               status = bt_self_component_source_add_output_port(lttng_live->private_component,
-                               "no-stream", lttng_live->no_stream_iter,
-                               &lttng_live->no_stream_port);
-               switch (status) {
-               case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
-                       return 0;
-               case BT_COMPONENT_STATUS_OK:
-                       break;
-               default:
-                       return -1;
-               }
-               bt_object_put_ref(lttng_live->no_stream_port);  /* weak */
-               lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
-       }
-       bt_object_get_ref(port);
-       ret = bt_private_port_remove_from_component(port);
-       bt_object_put_ref(port);
-       if (ret) {
-               return -1;
-       }
-       return 0;
+end:
+       return ret;
 }
 
 static
 struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
                uint64_t trace_id)
 {
-       struct lttng_live_trace *trace;
+       uint64_t trace_idx;
+       struct lttng_live_trace *ret_trace = NULL;
 
-       bt_list_for_each_entry(trace, &session->traces, node) {
+       for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
+               struct lttng_live_trace *trace =
+                       g_ptr_array_index(session->traces, trace_idx);
                if (trace->id == trace_id) {
-                       return trace;
+                       ret_trace = trace;
+                       goto end;
                }
        }
-       return NULL;
+
+end:
+       return ret_trace;
 }
 
 static
-void lttng_live_destroy_trace(bt_object *obj)
+void lttng_live_destroy_trace(struct lttng_live_trace *trace)
 {
-       struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj);
+       BT_LOGD("Destroy lttng_live_trace");
 
-       BT_LOGI("Destroy trace");
-       BT_ASSERT(bt_list_empty(&trace->streams));
-       bt_list_del(&trace->node);
+       BT_ASSERT(trace->stream_iterators);
+       g_ptr_array_free(trace->stream_iterators, TRUE);
 
-       if (trace->trace) {
-               int retval;
+       BT_TRACE_PUT_REF_AND_RESET(trace->trace);
+       BT_TRACE_CLASS_PUT_REF_AND_RESET(trace->trace_class);
 
-               retval = bt_trace_set_is_static(trace->trace);
-               BT_ASSERT(!retval);
-               BT_TRACE_PUT_REF_AND_RESET(trace->trace);
-       }
        lttng_live_metadata_fini(trace);
-       BT_OBJECT_PUT_REF_AND_RESET(trace->cc_prio_map);
        g_free(trace);
 }
 
@@ -233,10 +175,14 @@ struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *sess
        }
        trace->session = session;
        trace->id = trace_id;
-       BT_INIT_LIST_HEAD(&trace->streams);
+       trace->trace_class = NULL;
+       trace->trace = NULL;
+       trace->stream_iterators = g_ptr_array_new_with_free_func(
+                       (GDestroyNotify) lttng_live_stream_iterator_destroy);
+       BT_ASSERT(trace->stream_iterators);
        trace->new_metadata_needed = true;
-       bt_list_add(&trace->node, &session->traces);
-       bt_object_init(&trace->obj, lttng_live_destroy_trace);
+       g_ptr_array_add(session->traces, trace);
+
        BT_LOGI("Create trace");
        goto end;
 error:
@@ -247,63 +193,55 @@ end:
 }
 
 BT_HIDDEN
-struct lttng_live_trace *lttng_live_ref_trace(struct lttng_live_session *session,
-               uint64_t trace_id)
+struct lttng_live_trace *lttng_live_borrow_trace(
+               struct lttng_live_session *session, uint64_t trace_id)
 {
        struct lttng_live_trace *trace;
 
        trace = lttng_live_find_trace(session, trace_id);
        if (trace) {
-               bt_object_get_ref(trace);
-               return trace;
+               goto end;
        }
-       return lttng_live_create_trace(session, trace_id);
-}
-
-BT_HIDDEN
-void lttng_live_unref_trace(struct lttng_live_trace *trace)
-{
-       bt_object_put_ref(trace);
-}
 
-static
-void lttng_live_close_trace_streams(struct lttng_live_trace *trace)
-{
-       struct lttng_live_stream_iterator *stream, *s;
+       /* The session is the owner of the newly created trace. */
+       trace = lttng_live_create_trace(session, trace_id);
 
-       bt_list_for_each_entry_safe(stream, s, &trace->streams, node) {
-               lttng_live_stream_iterator_destroy(stream);
-       }
-       lttng_live_metadata_fini(trace);
+end:
+       return trace;
 }
 
 BT_HIDDEN
-int lttng_live_add_session(struct lttng_live_component *lttng_live,
+int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
                uint64_t session_id, const char *hostname,
                const char *session_name)
 {
        int ret = 0;
-       struct lttng_live_session *s;
+       struct lttng_live_session *session;
 
-       s = g_new0(struct lttng_live_session, 1);
-       if (!s) {
+       session = g_new0(struct lttng_live_session, 1);
+       if (!session) {
                goto error;
        }
 
-       s->id = session_id;
-       BT_INIT_LIST_HEAD(&s->traces);
-       s->lttng_live = lttng_live;
-       s->new_streams_needed = true;
-       s->hostname = g_string_new(hostname);
-       s->session_name = g_string_new(session_name);
+       session->id = session_id;
+       session->traces = g_ptr_array_new_with_free_func(
+                       (GDestroyNotify) lttng_live_destroy_trace);
+       BT_ASSERT(session->traces);
+       session->lttng_live_msg_iter = lttng_live_msg_iter;
+       session->new_streams_needed = true;
+       session->hostname = g_string_new(hostname);
+       BT_ASSERT(session->hostname);
+
+       session->session_name = g_string_new(session_name);
+       BT_ASSERT(session->session_name);
 
        BT_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s",
-               s->id, hostname, session_name);
-       bt_list_add(&s->node, &lttng_live->sessions);
+               session->id, hostname, session_name);
+       g_ptr_array_add(lttng_live_msg_iter->sessions, session);
        goto end;
 error:
        BT_LOGE("Error adding session");
-       g_free(s);
+       g_free(session);
        ret = -1;
 end:
        return ret;
@@ -312,23 +250,30 @@ end:
 static
 void lttng_live_destroy_session(struct lttng_live_session *session)
 {
-       struct lttng_live_trace *trace, *t;
+       struct lttng_live_component *live_comp;
+
+       if (!session) {
+               goto end;
+       }
 
-       BT_LOGI("Destroy session");
+       BT_LOGD("Destroy lttng live session");
        if (session->id != -1ULL) {
                if (lttng_live_detach_session(session)) {
-                       if (!lttng_live_is_canceled(session->lttng_live)) {
+                       live_comp = session->lttng_live_msg_iter->lttng_live_comp;
+                       if (session->lttng_live_msg_iter &&
+                                       !lttng_live_is_canceled(live_comp)) {
                                /* Old relayd cannot detach sessions. */
-                               BT_LOGD("Unable to detach session %" PRIu64,
+                               BT_LOGD("Unable to detach lttng live session %" PRIu64,
                                        session->id);
                        }
                }
                session->id = -1ULL;
        }
-       bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
-               lttng_live_close_trace_streams(trace);
+
+       if (session->traces) {
+               g_ptr_array_free(session->traces, TRUE);
        }
-       bt_list_del(&session->node);
+
        if (session->hostname) {
                g_string_free(session->hostname, TRUE);
        }
@@ -336,34 +281,50 @@ void lttng_live_destroy_session(struct lttng_live_session *session)
                g_string_free(session->session_name, TRUE);
        }
        g_free(session);
+
+end:
+       return;
 }
 
-BT_HIDDEN
-void lttng_live_iterator_finalize(bt_self_message_iterator *it)
+static
+void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter)
 {
-       struct lttng_live_stream_iterator_generic *s =
-                       bt_self_message_iterator_get_user_data(it);
-
-       switch (s->type) {
-       case LIVE_STREAM_TYPE_NO_STREAM:
-       {
-               /* Leave no_stream_iter in place when port is removed. */
-               break;
+       if (!lttng_live_msg_iter) {
+               goto end;
        }
-       case LIVE_STREAM_TYPE_STREAM:
-       {
-               struct lttng_live_stream_iterator *stream_iter =
-                       container_of(s, struct lttng_live_stream_iterator, p);
 
-               lttng_live_stream_iterator_destroy(stream_iter);
-               break;
-       }
+       if (lttng_live_msg_iter->sessions) {
+               g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
        }
+
+       BT_OBJECT_PUT_REF_AND_RESET(lttng_live_msg_iter->viewer_connection);
+       BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
+       BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
+
+       /* All stream iterators must be destroyed at this point. */
+       BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0);
+       lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false;
+
+       g_free(lttng_live_msg_iter);
+
+end:
+       return;
+}
+
+BT_HIDDEN
+void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
+{
+       struct lttng_live_msg_iter *lttng_live_msg_iter;
+
+       BT_ASSERT(self_msg_iter);
+
+       lttng_live_msg_iter = bt_self_message_iterator_get_data(self_msg_iter);
+       BT_ASSERT(lttng_live_msg_iter);
+       lttng_live_msg_iter_destroy(lttng_live_msg_iter);
 }
 
 static
-bt_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
-               struct lttng_live_component *lttng_live,
+enum lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
                struct lttng_live_stream_iterator *lttng_live_stream)
 {
        switch (lttng_live_stream->state) {
@@ -381,7 +342,7 @@ bt_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
        case LTTNG_LIVE_STREAM_EOF:
                break;
        }
-       return BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+       return LTTNG_LIVE_ITERATOR_STATUS_OK;
 }
 
 /*
@@ -393,40 +354,43 @@ bt_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
  *   return EOF.
  */
 static
-bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
-               struct lttng_live_component *lttng_live,
+enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
+               struct lttng_live_msg_iter *lttng_live_msg_iter,
                struct lttng_live_stream_iterator *lttng_live_stream)
 {
-       bt_lttng_live_iterator_status ret =
-                       BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+       enum lttng_live_iterator_status ret =
+                       LTTNG_LIVE_ITERATOR_STATUS_OK;
        struct packet_index index;
        enum lttng_live_stream_state orig_state = lttng_live_stream->state;
 
        if (lttng_live_stream->trace->new_metadata_needed) {
-               ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+               ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
                goto end;
        }
        if (lttng_live_stream->trace->session->new_streams_needed) {
-               ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+               ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
                goto end;
        }
-       if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
-                       && lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
+       if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
+                       lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
                goto end;
        }
-       ret = lttng_live_get_next_index(lttng_live, lttng_live_stream, &index);
-       if (ret != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+       ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream,
+               &index);
+       if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
                goto end;
        }
        BT_ASSERT(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
        if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
-               if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
-                               && lttng_live_stream->last_returned_inactivity_timestamp ==
-                                       lttng_live_stream->current_inactivity_timestamp) {
-                       ret = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+               uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts,
+                        curr_inact_ts = lttng_live_stream->current_inactivity_ts;
+
+               if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA &&
+                               last_inact_ts == curr_inact_ts) {
+                       ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
                        print_stream_state(lttng_live_stream);
                } else {
-                       ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+                       ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
                }
                goto end;
        }
@@ -434,109 +398,138 @@ bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream
        lttng_live_stream->offset = index.offset;
        lttng_live_stream->len = index.packet_size / CHAR_BIT;
 end:
-       if (ret == BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
-               ret = lttng_live_iterator_next_check_stream_state(
-                               lttng_live, lttng_live_stream);
+       if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
+               ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
        }
        return ret;
 }
 
 /*
- * Creation of the message requires the ctf trace to be created
- * beforehand, but the live protocol gives us all streams (including
- * metadata) at once. So we split it in three steps: getting streams,
- * getting metadata (which creates the ctf trace), and then creating the
- * per-stream messages.
+ * Creation of the message requires the ctf trace class to be created
+ * beforehand, but the live protocol gives us all streams (including metadata)
+ * at once. So we split it in three steps: getting streams, getting metadata
+ * (which creates the ctf trace class), and then creating the per-stream
+ * messages.
  */
 static
-bt_lttng_live_iterator_status lttng_live_get_session(
-               struct lttng_live_component *lttng_live,
+enum lttng_live_iterator_status lttng_live_get_session(
+               struct lttng_live_msg_iter *lttng_live_msg_iter,
                struct lttng_live_session *session)
 {
-       bt_lttng_live_iterator_status status;
-       struct lttng_live_trace *trace, *t;
+       enum lttng_live_iterator_status status;
+       uint64_t trace_idx;
+       int ret = 0;
 
-       if (lttng_live_attach_session(session)) {
-               if (lttng_live_is_canceled(lttng_live)) {
-                       return BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
-               } else {
-                       return BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+       if (!session->attached) {
+               ret = lttng_live_attach_session(session);
+               if (ret) {
+                       if (lttng_live_msg_iter && lttng_live_is_canceled(
+                                               lttng_live_msg_iter->lttng_live_comp)) {
+                               status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+                       } else {
+                               status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+                       }
+                       goto end;
                }
        }
+
        status = lttng_live_get_new_streams(session);
-       if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK &&
-                       status != BT_LTTNG_LIVE_ITERATOR_STATUS_END) {
-               return status;
+       if (status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
+                       status != LTTNG_LIVE_ITERATOR_STATUS_END) {
+               goto end;
        }
-       bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
+       for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
+               struct lttng_live_trace *trace =
+                       g_ptr_array_index(session->traces, trace_idx);
+
                status = lttng_live_metadata_update(trace);
-               if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK &&
-                               status != BT_LTTNG_LIVE_ITERATOR_STATUS_END) {
-                       return status;
+               if (status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
+                               status != LTTNG_LIVE_ITERATOR_STATUS_END) {
+                       goto end;
                }
        }
-       return lttng_live_lazy_msg_init(session);
+       status = lttng_live_lazy_msg_init(session);
+
+end:
+       return status;
 }
 
 BT_HIDDEN
-void lttng_live_need_new_streams(struct lttng_live_component *lttng_live)
+void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter)
 {
-       struct lttng_live_session *session;
+       uint64_t session_idx;
 
-       bt_list_for_each_entry(session, &lttng_live->sessions, node) {
+       for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
+                       session_idx++) {
+               struct lttng_live_session *session =
+                       g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
                session->new_streams_needed = true;
        }
 }
 
 static
-void lttng_live_force_new_streams_and_metadata(struct lttng_live_component *lttng_live)
+void lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
 {
-       struct lttng_live_session *session;
-
-       bt_list_for_each_entry(session, &lttng_live->sessions, node) {
-               struct lttng_live_trace *trace;
+       uint64_t session_idx, trace_idx;
 
+       for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
+                       session_idx++) {
+               struct lttng_live_session *session =
+                       g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
                session->new_streams_needed = true;
-               bt_list_for_each_entry(trace, &session->traces, node) {
+               for (trace_idx = 0; trace_idx < session->traces->len;
+                               trace_idx++) {
+                       struct lttng_live_trace *trace =
+                               g_ptr_array_index(session->traces, trace_idx);
                        trace->new_metadata_needed = true;
                }
        }
 }
 
 static
-bt_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
-               struct lttng_live_component *lttng_live)
+enum lttng_live_iterator_status
+lttng_live_iterator_handle_new_streams_and_metadata(
+               struct lttng_live_msg_iter *lttng_live_msg_iter)
 {
-       bt_lttng_live_iterator_status ret =
-                       BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
-       unsigned int nr_sessions_opened = 0;
-       struct lttng_live_session *session, *s;
-
-       bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
-               if (session->closed && bt_list_empty(&session->traces)) {
-                       lttng_live_destroy_session(session);
-               }
-       }
+       enum lttng_live_iterator_status ret =
+                       LTTNG_LIVE_ITERATOR_STATUS_OK;
+       uint64_t session_idx = 0, nr_sessions_opened = 0;
+       struct lttng_live_session *session;
+       enum session_not_found_action sess_not_found_act =
+               lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
+
        /*
-        * Currently, when there are no sessions, we quit immediately.
-        * We may want to add a component parameter to keep trying until
-        * we get data in the future.
-        * Also, in a remotely distant future, we could add a "new
+        * In a remotely distant future, we could add a "new
         * session" flag to the protocol, which would tell us that we
         * need to query for new sessions even though we have sessions
         * currently ongoing.
         */
-       if (bt_list_empty(&lttng_live->sessions)) {
-               ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
-               goto end;
+       if (lttng_live_msg_iter->sessions->len == 0) {
+               if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
+                       ret = LTTNG_LIVE_ITERATOR_STATUS_END;
+                       goto end;
+               } else {
+                       /*
+                        * Retry to create a viewer session for the requested
+                        * session name.
+                        */
+                       if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
+                               ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+                               goto end;
+                       }
+               }
        }
-       bt_list_for_each_entry(session, &lttng_live->sessions, node) {
-               ret = lttng_live_get_session(lttng_live, session);
+
+       for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
+                       session_idx++) {
+               session = g_ptr_array_index(lttng_live_msg_iter->sessions,
+                               session_idx);
+               ret = lttng_live_get_session(lttng_live_msg_iter, session);
                switch (ret) {
-               case BT_LTTNG_LIVE_ITERATOR_STATUS_OK:
+               case LTTNG_LIVE_ITERATOR_STATUS_OK:
                        break;
-               case BT_LTTNG_LIVE_ITERATOR_STATUS_END:
-                       ret = BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+               case LTTNG_LIVE_ITERATOR_STATUS_END:
+                       ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
                        break;
                default:
                        goto end;
@@ -546,147 +539,257 @@ bt_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_me
                }
        }
 end:
-       if (ret == BT_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) {
-               ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
+       if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK &&
+                       sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE &&
+                       nr_sessions_opened == 0) {
+               ret = LTTNG_LIVE_ITERATOR_STATUS_END;
        }
        return ret;
 }
 
 static
-bt_lttng_live_iterator_status emit_inactivity_message(
-               struct lttng_live_component *lttng_live,
-               struct lttng_live_stream_iterator *lttng_live_stream,
-               const bt_message **message,
-               uint64_t timestamp)
+enum lttng_live_iterator_status emit_inactivity_message(
+               struct lttng_live_msg_iter *lttng_live_msg_iter,
+               struct lttng_live_stream_iterator *stream_iter,
+               bt_message **message, uint64_t timestamp)
 {
-       bt_lttng_live_iterator_status ret =
-                       BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
-       struct lttng_live_trace *trace;
-       const bt_clock_class *clock_class = NULL;
-       bt_clock_snapshot *clock_snapshot = NULL;
-       const bt_message *msg = NULL;
-       int retval;
+       enum lttng_live_iterator_status ret =
+                       LTTNG_LIVE_ITERATOR_STATUS_OK;
+       bt_message *msg = NULL;
 
-       trace = lttng_live_stream->trace;
-       if (!trace) {
-               goto error;
-       }
-       clock_class = bt_clock_class_priority_map_get_clock_class_by_index(trace->cc_prio_map, 0);
-       if (!clock_class) {
-               goto error;
-       }
-       clock_snapshot = bt_clock_snapshot_create(clock_class, timestamp);
-       if (!clock_snapshot) {
-               goto error;
-       }
-       msg = bt_message_inactivity_create(trace->cc_prio_map);
+       BT_ASSERT(stream_iter->trace->clock_class);
+
+       msg = bt_message_message_iterator_inactivity_create(
+                       lttng_live_msg_iter->self_msg_iter,
+                       stream_iter->trace->clock_class,
+                       timestamp);
        if (!msg) {
                goto error;
        }
-       retval = bt_message_inactivity_set_clock_snapshot(msg, clock_snapshot);
-       if (retval) {
-               goto error;
-       }
+
        *message = msg;
 end:
-       bt_object_put_ref(clock_snapshot);
-       bt_clock_class_put_ref(clock_class);
        return ret;
 
 error:
-       ret = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+       ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
        bt_message_put_ref(msg);
        goto end;
 }
 
 static
-bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
-               struct lttng_live_component *lttng_live,
+enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
+               struct lttng_live_msg_iter *lttng_live_msg_iter,
                struct lttng_live_stream_iterator *lttng_live_stream,
-               const bt_message **message)
+               bt_message **message)
 {
-       bt_lttng_live_iterator_status ret =
-                       BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
-       const bt_clock_class *clock_class = NULL;
-       bt_clock_snapshot *clock_snapshot = NULL;
+       enum lttng_live_iterator_status ret =
+                       LTTNG_LIVE_ITERATOR_STATUS_OK;
 
        if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
-               return BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+               return LTTNG_LIVE_ITERATOR_STATUS_OK;
        }
 
-       if (lttng_live_stream->current_inactivity_timestamp ==
-                       lttng_live_stream->last_returned_inactivity_timestamp) {
+       if (lttng_live_stream->current_inactivity_ts ==
+                       lttng_live_stream->last_inactivity_ts) {
                lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
-               ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+               ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+               goto end;
+       }
+
+       ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream,
+                       message, lttng_live_stream->current_inactivity_ts);
+
+       lttng_live_stream->last_inactivity_ts =
+                       lttng_live_stream->current_inactivity_ts;
+end:
+       return ret;
+}
+
+static
+int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter,
+               struct lttng_live_msg_iter *lttng_live_msg_iter,
+               const bt_message *msg, int64_t last_msg_ts_ns,
+               int64_t *ts_ns)
+{
+       const bt_clock_class *clock_class = NULL;
+       const bt_clock_snapshot *clock_snapshot = NULL;
+       int ret = 0;
+       bt_clock_snapshot_state cs_state = BT_CLOCK_SNAPSHOT_STATE_KNOWN;
+       bt_message_stream_activity_clock_snapshot_state sa_cs_state;
+
+       BT_ASSERT(msg);
+       BT_ASSERT(ts_ns);
+
+       BT_LOGV("Getting message's timestamp: iter-data-addr=%p, msg-addr=%p, "
+               "last-msg-ts=%" PRId64, lttng_live_msg_iter, msg,
+               last_msg_ts_ns);
+
+       switch (bt_message_get_type(msg)) {
+       case BT_MESSAGE_TYPE_EVENT:
+               clock_class =
+                       bt_message_event_borrow_stream_class_default_clock_class_const(
+                               msg);
+               BT_ASSERT(clock_class);
+
+               cs_state = bt_message_event_borrow_default_clock_snapshot_const(
+                       msg, &clock_snapshot);
+               break;
+       case BT_MESSAGE_TYPE_PACKET_BEGINNING:
+               clock_class =
+                       bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
+                       msg);
+               BT_ASSERT(clock_class);
+
+               cs_state = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
+                       msg, &clock_snapshot);
+               break;
+       case BT_MESSAGE_TYPE_PACKET_END:
+               clock_class =
+                       bt_message_packet_end_borrow_stream_class_default_clock_class_const(
+                       msg);
+               BT_ASSERT(clock_class);
+
+               cs_state = bt_message_packet_end_borrow_default_clock_snapshot_const(
+                       msg, &clock_snapshot);
+               break;
+       case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
+               clock_class =
+                       bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
+                       msg);
+               BT_ASSERT(clock_class);
+
+               cs_state = bt_message_discarded_events_borrow_default_beginning_clock_snapshot_const(
+                       msg, &clock_snapshot);
+               break;
+       case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
+               clock_class =
+                       bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
+                       msg);
+               BT_ASSERT(clock_class);
+
+               cs_state = bt_message_discarded_packets_borrow_default_beginning_clock_snapshot_const(
+                       msg, &clock_snapshot);
+               break;
+       case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
+               clock_class =
+                       bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const(
+                       msg);
+               BT_ASSERT(clock_class);
+
+               sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
+                       msg, &clock_snapshot);
+               if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
+                       goto no_clock_snapshot;
+               }
+
+               break;
+       case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
+               clock_class =
+                       bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const(
+                       msg);
+               BT_ASSERT(clock_class);
+
+               sa_cs_state = bt_message_stream_activity_end_borrow_default_clock_snapshot_const(
+                       msg, &clock_snapshot);
+               if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
+                       goto no_clock_snapshot;
+               }
+
+               break;
+       case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
+               cs_state =
+                       bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
+                               msg, &clock_snapshot);
+               break;
+       default:
+               /* All the other messages have a higher priority */
+               BT_LOGV_STR("Message has no timestamp: using the last message timestamp.");
+               *ts_ns = last_msg_ts_ns;
                goto end;
        }
 
-       ret = emit_inactivity_message(lttng_live, lttng_live_stream, message,
-                       (uint64_t) lttng_live_stream->current_inactivity_timestamp);
+       if (cs_state != BT_CLOCK_SNAPSHOT_STATE_KNOWN) {
+               BT_LOGE_STR("Unsupported unknown clock snapshot.");
+               ret = -1;
+               goto end;
+       }
+
+       clock_class = bt_clock_snapshot_borrow_clock_class_const(clock_snapshot);
+       BT_ASSERT(clock_class);
+
+       ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
+       if (ret) {
+               BT_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: "
+                       "clock-snapshot-addr=%p", clock_snapshot);
+               goto error;
+       }
+
+       goto end;
+
+no_clock_snapshot:
+       BT_LOGV_STR("Message's default clock snapshot is missing: "
+               "using the last message timestamp.");
+       *ts_ns = last_msg_ts_ns;
+       goto end;
+
+error:
+       ret = -1;
 
-       lttng_live_stream->last_returned_inactivity_timestamp =
-                       lttng_live_stream->current_inactivity_timestamp;
 end:
-       bt_object_put_ref(clock_snapshot);
-       bt_clock_class_put_ref(clock_class);
+       if (ret == 0) {
+               BT_LOGV("Found message's timestamp: "
+                       "iter-data-addr=%p, msg-addr=%p, "
+                       "last-msg-ts=%" PRId64 ", ts=%" PRId64,
+                       lttng_live_msg_iter, msg, last_msg_ts_ns, *ts_ns);
+       }
+
        return ret;
 }
 
 static
-bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
-               struct lttng_live_component *lttng_live,
+enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
+               struct lttng_live_msg_iter *lttng_live_msg_iter,
                struct lttng_live_stream_iterator *lttng_live_stream,
-               const bt_message **message)
+               bt_message **message)
 {
-       bt_lttng_live_iterator_status ret =
-                       BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+       enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
        enum bt_msg_iter_status status;
-       struct lttng_live_session *session;
+       uint64_t session_idx, trace_idx;
 
-       bt_list_for_each_entry(session, &lttng_live->sessions, node) {
-               struct lttng_live_trace *trace;
+       for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
+                       session_idx++) {
+               struct lttng_live_session *session =
+                       g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
 
                if (session->new_streams_needed) {
-                       return BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+                       ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+                       goto end;
                }
-               bt_list_for_each_entry(trace, &session->traces, node) {
+               for (trace_idx = 0; trace_idx < session->traces->len;
+                               trace_idx++) {
+                       struct lttng_live_trace *trace =
+                               g_ptr_array_index(session->traces, trace_idx);
                        if (trace->new_metadata_needed) {
-                               return BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+                               ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+                               goto end;
                        }
                }
        }
 
        if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
-               return BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
-       }
-       if (lttng_live_stream->packet_end_msg_queue) {
-               *message = lttng_live_stream->packet_end_msg_queue;
-               lttng_live_stream->packet_end_msg_queue = NULL;
-               status = BT_MSG_ITER_STATUS_OK;
-       } else {
-               status = bt_msg_iter_get_next_message(
-                               lttng_live_stream->msg_iter,
-                               lttng_live_stream->trace->cc_prio_map,
-                               message);
-               if (status == BT_MSG_ITER_STATUS_OK) {
-                       /*
-                        * Consider empty packets as inactivity.
-                        */
-                       if (bt_message_get_type(*message) == BT_MESSAGE_TYPE_PACKET_END) {
-                               lttng_live_stream->packet_end_msg_queue = *message;
-                               *message = NULL;
-                               return emit_inactivity_message(lttng_live,
-                                               lttng_live_stream, message,
-                                               lttng_live_stream->current_packet_end_timestamp);
-                       }
-               }
+               ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+               goto end;
        }
+
+       status = bt_msg_iter_get_next_message(lttng_live_stream->msg_iter,
+               lttng_live_msg_iter->self_msg_iter, message);
        switch (status) {
        case BT_MSG_ITER_STATUS_EOF:
-               ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
+               ret = LTTNG_LIVE_ITERATOR_STATUS_END;
                break;
        case BT_MSG_ITER_STATUS_OK:
-               ret = BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+               ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
                break;
        case BT_MSG_ITER_STATUS_AGAIN:
                /*
@@ -694,15 +797,19 @@ bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_st
                 * get_index may return AGAIN to delay the following
                 * attempt.
                 */
-               ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+               ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
                break;
        case BT_MSG_ITER_STATUS_INVAL:
                /* No argument provided by the user, so don't return INVAL. */
        case BT_MSG_ITER_STATUS_ERROR:
        default:
-               ret = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+               ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+               BT_LOGW("CTF msg iterator return an error or failed msg_iter=%p",
+                       lttng_live_stream->msg_iter);
                break;
        }
+
+end:
        return ret;
 }
 
@@ -755,303 +862,626 @@ bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_st
  * When disconnected from relayd: try to re-connect endlessly.
  */
 static
-bt_message_iterator_next_method_return lttng_live_iterator_next_stream(
-               bt_self_message_iterator *iterator,
-               struct lttng_live_stream_iterator *stream_iter)
+enum lttng_live_iterator_status lttng_live_iterator_next_on_stream(
+               struct lttng_live_msg_iter *lttng_live_msg_iter,
+               struct lttng_live_stream_iterator *stream_iter,
+               bt_message **curr_msg)
 {
-       bt_lttng_live_iterator_status status;
-       bt_message_iterator_next_method_return next_return;
-       struct lttng_live_component *lttng_live;
+       enum lttng_live_iterator_status live_status;
 
-       lttng_live = stream_iter->trace->session->lttng_live;
 retry:
        print_stream_state(stream_iter);
-       next_return.message = NULL;
-       status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
-       if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+       live_status = lttng_live_iterator_handle_new_streams_and_metadata(
+                       lttng_live_msg_iter);
+       if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
                goto end;
        }
-       status = lttng_live_iterator_next_handle_one_no_data_stream(
-                       lttng_live, stream_iter);
-       if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+       live_status = lttng_live_iterator_next_handle_one_no_data_stream(
+                       lttng_live_msg_iter, stream_iter);
+       if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
                goto end;
        }
-       status = lttng_live_iterator_next_handle_one_quiescent_stream(
-                       lttng_live, stream_iter, &next_return.message);
-       if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
-               BT_ASSERT(next_return.message == NULL);
+       live_status = lttng_live_iterator_next_handle_one_quiescent_stream(
+                       lttng_live_msg_iter, stream_iter, curr_msg);
+       if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
+               BT_ASSERT(*curr_msg == NULL);
                goto end;
        }
-       if (next_return.message) {
+       if (*curr_msg) {
                goto end;
        }
-       status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live,
-                       stream_iter, &next_return.message);
-       if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
-               BT_ASSERT(next_return.message == NULL);
+       live_status = lttng_live_iterator_next_handle_one_active_data_stream(
+                       lttng_live_msg_iter, stream_iter, curr_msg);
+       if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
+               BT_ASSERT(*curr_msg == NULL);
        }
 
 end:
-       switch (status) {
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
-               print_dbg("continue");
+       if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
                goto retry;
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
-               next_return.status = BT_MESSAGE_ITERATOR_STATUS_AGAIN;
-               print_dbg("again");
-               break;
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_END:
-               next_return.status = BT_MESSAGE_ITERATOR_STATUS_END;
-               print_dbg("end");
-               break;
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_OK:
-               next_return.status = BT_MESSAGE_ITERATOR_STATUS_OK;
-               print_dbg("ok");
-               break;
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
-               next_return.status = BT_MESSAGE_ITERATOR_STATUS_INVALID;
-               break;
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
-               next_return.status = BT_MESSAGE_ITERATOR_STATUS_NOMEM;
-               break;
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
-               next_return.status = BT_MESSAGE_ITERATOR_STATUS_UNSUPPORTED;
-               break;
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
-       default:        /* fall-through */
-               next_return.status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
-               break;
        }
-       return next_return;
+
+       return live_status;
 }
 
 static
-bt_message_iterator_next_method_return lttng_live_iterator_next_no_stream(
-               bt_self_message_iterator *iterator,
-               struct lttng_live_no_stream_iterator *no_stream_iter)
+enum lttng_live_iterator_status next_stream_iterator_for_trace(
+               struct lttng_live_msg_iter *lttng_live_msg_iter,
+               struct lttng_live_trace *live_trace,
+               struct lttng_live_stream_iterator **candidate_stream_iter)
 {
-       bt_lttng_live_iterator_status status;
-       bt_message_iterator_next_method_return next_return;
-       struct lttng_live_component *lttng_live;
+       struct lttng_live_stream_iterator *curr_candidate_stream_iter = NULL;
+       enum lttng_live_iterator_status stream_iter_status;;
+       int64_t curr_candidate_msg_ts = INT64_MAX;
+       uint64_t stream_iter_idx;
 
-       lttng_live = no_stream_iter->lttng_live;
-retry:
-       lttng_live_force_new_streams_and_metadata(lttng_live);
-       next_return.message = NULL;
-       status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
-       if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+       BT_ASSERT(live_trace);
+       BT_ASSERT(live_trace->stream_iterators);
+       /*
+        * Update the current message of every stream iterators of this trace.
+        * The current msg of every stream must have a timestamp equal or
+        * larger than the last message returned by this iterator. We must
+        * ensure monotonicity.
+        */
+       stream_iter_idx = 0;
+       while (stream_iter_idx < live_trace->stream_iterators->len) {
+               bool stream_iter_is_ended = false;
+               struct lttng_live_stream_iterator *stream_iter =
+                       g_ptr_array_index(live_trace->stream_iterators,
+                                       stream_iter_idx);
+
+               /*
+                * Find if there is are now current message for this stream
+                * iterator get it.
+                */
+               while (!stream_iter->current_msg) {
+                       bt_message *msg = NULL;
+                       int64_t curr_msg_ts_ns = INT64_MAX;
+                       stream_iter_status = lttng_live_iterator_next_on_stream(
+                                       lttng_live_msg_iter, stream_iter, &msg);
+
+                       BT_LOGD("live stream iterator returned status :%s",
+                                       print_live_iterator_status(stream_iter_status));
+                       if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
+                               stream_iter_is_ended = true;
+                               break;
+                       }
+
+                       if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
+                               goto end;
+                       }
+
+                       BT_ASSERT(msg);
+
+                       /*
+                        * Get the timestamp in nanoseconds from origin of this
+                        * messsage.
+                        */
+                       live_get_msg_ts_ns(stream_iter, lttng_live_msg_iter,
+                               msg, lttng_live_msg_iter->last_msg_ts_ns,
+                               &curr_msg_ts_ns);
+
+                       /*
+                        * Check if the message of the current live stream
+                        * iterator occured at the exact same time or after the
+                        * last message returned by this component's message
+                        * iterator. If not, we return an error.
+                        */
+                       if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
+                               stream_iter->current_msg = msg;
+                               stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
+                       } else {
+                               /*
+                                * We received a message in the past. To ensure
+                                * monotonicity, we can't send it forward.
+                                */
+                               BT_LOGE("Message's timestamp is less than "
+                                       "lttng-live's message iterator's last "
+                                       "returned timestamp: "
+                                       "lttng-live-msg-iter-addr=%p, ts=%" PRId64 ", "
+                                       "last-msg-ts=%" PRId64,
+                                       lttng_live_msg_iter, curr_msg_ts_ns,
+                                       lttng_live_msg_iter->last_msg_ts_ns);
+                               stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+                               goto end;
+                       }
+               }
+
+               if (!stream_iter_is_ended &&
+                               stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) {
+                       /*
+                        * Update the current best candidate message for the
+                        * stream iterator of thise live trace to be forwarded
+                        * downstream.
+                        */
+                       curr_candidate_msg_ts = stream_iter->current_msg_ts_ns;
+                       curr_candidate_stream_iter = stream_iter;
+               }
+
+               if (stream_iter_is_ended) {
+                       /*
+                        * The live stream iterator is ENDed. We remove that
+                        * iterator from the list and we restart the iteration
+                        * at the beginning of the live stream iterator array
+                        * to because the removal will shuffle the array.
+                        */
+                       g_ptr_array_remove_index_fast(live_trace->stream_iterators,
+                               stream_iter_idx);
+                       stream_iter_idx = 0;
+               } else {
+                       stream_iter_idx++;
+               }
+       }
+
+       if (curr_candidate_stream_iter) {
+               *candidate_stream_iter = curr_candidate_stream_iter;
+               stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
+       } else {
+               /*
+                * The only case where we don't have a candidate for this trace
+                * is if we reached the end of all the iterators.
+                */
+               BT_ASSERT(live_trace->stream_iterators->len == 0);
+               stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
+       }
+
+end:
+       return stream_iter_status;
+}
+
+static
+enum lttng_live_iterator_status next_stream_iterator_for_session(
+               struct lttng_live_msg_iter *lttng_live_msg_iter,
+               struct lttng_live_session *session,
+               struct lttng_live_stream_iterator **candidate_session_stream_iter)
+{
+       enum lttng_live_iterator_status stream_iter_status;
+       uint64_t trace_idx = 0;
+       int64_t curr_candidate_msg_ts = INT64_MAX;
+       struct lttng_live_stream_iterator *curr_candidate_stream_iter = NULL;
+
+       /*
+        * Make sure we are attached to the session and look for new streams
+        * and metadata.
+        */
+       stream_iter_status = lttng_live_get_session(lttng_live_msg_iter, session);
+       if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
+                       stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE &&
+                       stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) {
                goto end;
        }
-       if (no_stream_iter->port) {
-               status = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+
+       BT_ASSERT(session->traces);
+
+       /*
+        * Use while loops here rather then for loops so we can restart the
+        * iteration if an element is removed from the array during the
+        * looping.
+        */
+       while (trace_idx < session->traces->len) {
+               bool trace_is_ended = false;
+               struct lttng_live_stream_iterator *stream_iter;
+               struct lttng_live_trace *trace =
+                       g_ptr_array_index(session->traces, trace_idx);
+
+               stream_iter_status = next_stream_iterator_for_trace(
+                       lttng_live_msg_iter, trace, &stream_iter);
+               if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
+                       /*
+                        * All the live stream iterators for this trace are
+                        * ENDed. Remove the trace from this session.
+                        */
+                       trace_is_ended = true;
+               } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
+                       goto end;
+               }
+
+               if (!trace_is_ended) {
+                       BT_ASSERT(stream_iter);
+
+                       if (stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) {
+                               curr_candidate_msg_ts = stream_iter->current_msg_ts_ns;
+                               curr_candidate_stream_iter = stream_iter;
+                       }
+                       trace_idx++;
+               } else {
+                       g_ptr_array_remove_index_fast(session->traces, trace_idx);
+                       trace_idx = 0;
+               }
+       }
+       if (curr_candidate_stream_iter) {
+               *candidate_session_stream_iter = curr_candidate_stream_iter;
+               stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
        } else {
-               status = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
+               /*
+                * The only cases where we don't have a candidate for this
+                * trace is:
+                *  1. if we reached the end of all the iterators of all the
+                *  traces of this session,
+                *  2. if we never had live stream iterator in the first place.
+                *
+                * In either cases, we return END.
+                */
+               BT_ASSERT(session->traces->len == 0);
+               stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
        }
 end:
-       switch (status) {
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
-               goto retry;
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
-               next_return.status = BT_MESSAGE_ITERATOR_STATUS_AGAIN;
-               break;
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_END:
-               next_return.status = BT_MESSAGE_ITERATOR_STATUS_END;
-               break;
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
-               next_return.status = BT_MESSAGE_ITERATOR_STATUS_INVALID;
-               break;
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
-               next_return.status = BT_MESSAGE_ITERATOR_STATUS_NOMEM;
-               break;
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
-               next_return.status = BT_MESSAGE_ITERATOR_STATUS_UNSUPPORTED;
-               break;
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
-       default:        /* fall-through */
-               next_return.status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
-               break;
+       return stream_iter_status;
+}
+
+static inline
+void put_messages(bt_message_array_const msgs, uint64_t count)
+{
+       uint64_t i;
+
+       for (i = 0; i < count; i++) {
+               BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
        }
-       return next_return;
 }
 
 BT_HIDDEN
-bt_message_iterator_next_method_return lttng_live_iterator_next(
-               bt_self_message_iterator *iterator)
+bt_self_message_iterator_status lttng_live_msg_iter_next(
+               bt_self_message_iterator *self_msg_it,
+               bt_message_array_const msgs, uint64_t capacity,
+               uint64_t *count)
 {
-       struct lttng_live_stream_iterator_generic *s =
-                       bt_self_message_iterator_get_user_data(iterator);
-       bt_message_iterator_next_method_return next_return;
-
-       switch (s->type) {
-       case LIVE_STREAM_TYPE_NO_STREAM:
-               next_return = lttng_live_iterator_next_no_stream(iterator,
-                       container_of(s, struct lttng_live_no_stream_iterator, p));
+       bt_self_message_iterator_status status;
+       struct lttng_live_msg_iter *lttng_live_msg_iter =
+               bt_self_message_iterator_get_data(self_msg_it);
+       struct lttng_live_component *lttng_live =
+               lttng_live_msg_iter->lttng_live_comp;
+       enum lttng_live_iterator_status stream_iter_status;
+       uint64_t session_idx;
+
+       *count = 0;
+
+       BT_ASSERT(lttng_live_msg_iter);
+
+       /*
+        * Clear all the invalid message reference that might be left over in
+        * the output array.
+        */
+       memset(msgs, 0, capacity * sizeof(*msgs));
+
+       /*
+        * If no session are exposed on the relay found at the url provided by
+        * the user, session count will be 0. In this case, we return status
+        * end to return gracefully.
+        */
+       if (lttng_live_msg_iter->sessions->len == 0) {
+               if (lttng_live->params.sess_not_found_act !=
+                               SESSION_NOT_FOUND_ACTION_CONTINUE) {
+                       status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
+                       goto no_session;
+               } else {
+                       /*
+                        * The are no more active session for this session
+                        * name. Retry to create a viewer session for the
+                        * requested session name.
+                        */
+                       if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
+                               status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+                               goto no_session;
+                       }
+               }
+       }
+
+       if (lttng_live_msg_iter->active_stream_iter == 0) {
+               lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter);
+       }
+
+       /*
+        * Here the muxing of message is done.
+        *
+        * We need to iterate over all the streams of all the traces of all the
+        * viewer sessions in order to get the message with the smallest
+        * timestamp. In this case, a session is a viewer session and there is
+        * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
+        * kernel). Each viewer session can have multiple traces, for example,
+        * 64bit UST viewer sessions could have multiple per-pid traces.
+        *
+        * We iterate over the streams of each traces to update and see what is
+        * their next message's timestamp. From those timestamps, we select the
+        * message with the smallest timestamp as the best candidate message
+        * for that trace and do the same thing across all the sessions.
+        *
+        * We then compare the timestamp of best candidate message of all the
+        * sessions to pick the message with the smallest timestamp and we
+        * return it.
+        */
+       while (*count < capacity) {
+               struct lttng_live_stream_iterator *next_stream_iter = NULL,
+                                                 *candidate_stream_iter = NULL;
+               int64_t next_msg_ts_ns = INT64_MAX;
+
+               BT_ASSERT(lttng_live_msg_iter->sessions);
+               session_idx = 0;
+               /*
+                * Use a while loop instead of a for loop so we can restart the
+                * iteration if we remove an element. We can safely call
+                * next_stream_iterator_for_session() multiple times on the
+                * same session as we only fetch a new message if there is no
+                * current next message for each live stream iterator.
+                * If all live stream iterator of that session already have a
+                * current next message, the function will simply exit return
+                * the same candidate live stream iterator every time.
+                */
+               while (session_idx < lttng_live_msg_iter->sessions->len) {
+                       struct lttng_live_session *session =
+                               g_ptr_array_index(lttng_live_msg_iter->sessions,
+                                       session_idx);
+
+                       /* Find the best candidate message to send downstream. */
+                       stream_iter_status = next_stream_iterator_for_session(
+                               lttng_live_msg_iter, session,
+                               &candidate_stream_iter);
+
+                       /* If we receive an END status, it means that either:
+                        * - Those traces never had active streams (UST with no
+                        *   data produced yet),
+                        * - All live stream iterators have ENDed.*/
+                       if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
+                               if (session->closed && session->traces->len == 0) {
+                                       /*
+                                        * Remove the session from the list and restart the
+                                        * iteration at the beginning of the array since the
+                                        * removal shuffle the elements of the array.
+                                        */
+                                       g_ptr_array_remove_index_fast(
+                                               lttng_live_msg_iter->sessions,
+                                               session_idx);
+                                       session_idx = 0;
+                               } else {
+                                       session_idx++;
+                               }
+                               continue;
+                       }
+
+                       if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
+                               goto end;
+                       }
+
+                       if (candidate_stream_iter->current_msg_ts_ns <= next_msg_ts_ns) {
+                               next_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
+                               next_stream_iter = candidate_stream_iter;
+                       }
+
+                       session_idx++;
+               }
+
+               if (!next_stream_iter) {
+                       stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+                       goto end;
+               }
+
+               BT_ASSERT(next_stream_iter->current_msg);
+               /* Ensure monotonicity. */
+               BT_ASSERT(lttng_live_msg_iter->last_msg_ts_ns <=
+                       next_stream_iter->current_msg_ts_ns);
+
+               /*
+                * Insert the next message to the message batch. This will set
+                * stream iterator current messsage to NULL so that next time
+                * we fetch the next message of that stream iterator
+                */
+               BT_MESSAGE_MOVE_REF(msgs[*count], next_stream_iter->current_msg);
+               (*count)++;
+
+               /* Update the last timestamp in nanoseconds sent downstream. */
+               lttng_live_msg_iter->last_msg_ts_ns = next_msg_ts_ns;
+               next_stream_iter->current_msg_ts_ns = INT64_MAX;
+
+               stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
+       }
+end:
+       switch (stream_iter_status) {
+       case LTTNG_LIVE_ITERATOR_STATUS_OK:
+       case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
+               if (*count > 0) {
+                       /*
+                        * We received a again status but we have some messages
+                        * to send downstream. We send them and return OK for
+                        * now. On the next call we return again if there are
+                        * still no new message to send.
+                        */
+                       status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
+               } else {
+                       status = BT_SELF_MESSAGE_ITERATOR_STATUS_AGAIN;
+               }
                break;
-       case LIVE_STREAM_TYPE_STREAM:
-               next_return = lttng_live_iterator_next_stream(iterator,
-                       container_of(s, struct lttng_live_stream_iterator, p));
+       case LTTNG_LIVE_ITERATOR_STATUS_END:
+               status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
                break;
-       default:
-               next_return.status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
+       case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
+               status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
+               break;
+       case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
+       case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
+       case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
+               status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+               /* Put all existing messages on error. */
+               put_messages(msgs, *count);
                break;
+       default:
+               abort();
        }
-       return next_return;
+
+no_session:
+       return status;
 }
 
 BT_HIDDEN
-bt_message_iterator_status lttng_live_iterator_init(
-               bt_self_message_iterator *it,
-               struct bt_private_port *port)
+bt_self_message_iterator_status lttng_live_msg_iter_init(
+               bt_self_message_iterator *self_msg_it,
+               bt_self_component_source *self_comp_src,
+               bt_self_component_port_output *self_port)
 {
-       bt_message_iterator_status ret =
-                       BT_MESSAGE_ITERATOR_STATUS_OK;
-       struct lttng_live_stream_iterator_generic *s;
-
-       BT_ASSERT(it);
-
-       s = bt_private_port_get_user_data(port);
-       BT_ASSERT(s);
-       switch (s->type) {
-       case LIVE_STREAM_TYPE_NO_STREAM:
-       {
-               struct lttng_live_no_stream_iterator *no_stream_iter =
-                       container_of(s, struct lttng_live_no_stream_iterator, p);
-               ret = bt_self_message_iterator_set_user_data(it, no_stream_iter);
-               if (ret) {
-                       goto error;
-               }
-               break;
+       bt_self_message_iterator_status ret =
+                       BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
+       bt_self_component *self_comp =
+                       bt_self_component_source_as_self_component(self_comp_src);
+       struct lttng_live_component *lttng_live;
+       struct lttng_live_msg_iter *lttng_live_msg_iter;
+
+       BT_ASSERT(self_msg_it);
+
+       lttng_live = bt_self_component_get_data(self_comp);
+
+       /* There can be only one downstream iterator at the same time. */
+       BT_ASSERT(!lttng_live->has_msg_iter);
+       lttng_live->has_msg_iter = true;
+
+       lttng_live_msg_iter = g_new0(struct lttng_live_msg_iter, 1);
+       if (!lttng_live_msg_iter) {
+               ret = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+               goto end;
        }
-       case LIVE_STREAM_TYPE_STREAM:
-       {
-               struct lttng_live_stream_iterator *stream_iter =
-                       container_of(s, struct lttng_live_stream_iterator, p);
-               ret = bt_self_message_iterator_set_user_data(it, stream_iter);
-               if (ret) {
+
+       lttng_live_msg_iter->lttng_live_comp = lttng_live;
+       lttng_live_msg_iter->self_msg_iter = self_msg_it;
+
+       lttng_live_msg_iter->active_stream_iter = 0;
+       lttng_live_msg_iter->last_msg_ts_ns = INT64_MIN;
+       lttng_live_msg_iter->sessions = g_ptr_array_new_with_free_func(
+               (GDestroyNotify) lttng_live_destroy_session);
+       BT_ASSERT(lttng_live_msg_iter->sessions);
+
+       lttng_live_msg_iter->viewer_connection =
+               live_viewer_connection_create(lttng_live->params.url->str, false,
+                       lttng_live_msg_iter);
+       if (!lttng_live_msg_iter->viewer_connection) {
+               goto error;
+       }
+
+       if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
+               goto error;
+       }
+       if (lttng_live_msg_iter->sessions->len == 0) {
+               switch (lttng_live->params.sess_not_found_act) {
+               case SESSION_NOT_FOUND_ACTION_CONTINUE:
+                       BT_LOGI("Unable to connect to the requested live viewer "
+                               "session. Keep trying to connect because of "
+                               "%s=\"%s\" component parameter: url=\"%s\"",
+                               SESS_NOT_FOUND_ACTION_PARAM,
+                               SESS_NOT_FOUND_ACTION_CONTINUE_STR,
+                               lttng_live->params.url->str);
+                       break;
+               case SESSION_NOT_FOUND_ACTION_FAIL:
+                       BT_LOGE("Unable to connect to the requested live viewer "
+                               "session. Fail the message iterator"
+                               "initialization because of %s=\"%s\" "
+                               "component parameter: url =\"%s\"",
+                               SESS_NOT_FOUND_ACTION_PARAM,
+                               SESS_NOT_FOUND_ACTION_FAIL_STR,
+                               lttng_live->params.url->str);
                        goto error;
+               case SESSION_NOT_FOUND_ACTION_END:
+                       BT_LOGI("Unable to connect to the requested live viewer "
+                               "session. End gracefully at the first _next() "
+                               "call because of %s=\"%s\" component parameter: "
+                               "url=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM,
+                               SESS_NOT_FOUND_ACTION_END_STR,
+                               lttng_live->params.url->str);
+                       break;
                }
-               break;
-       }
-       default:
-               ret = BT_MESSAGE_ITERATOR_STATUS_ERROR;
-               goto end;
        }
 
+       bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
+
+       goto end;
+error:
+       ret = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+       lttng_live_msg_iter_destroy(lttng_live_msg_iter);
 end:
        return ret;
-error:
-       if (bt_self_message_iterator_set_user_data(it, NULL)
-                       != BT_MESSAGE_ITERATOR_STATUS_OK) {
-               BT_LOGE("Error setting private data to NULL");
-       }
-       goto end;
 }
 
 static
-bt_component_class_query_method_return lttng_live_query_list_sessions(
-               const bt_component_class *comp_class,
-               const bt_query_executor *query_exec,
-               bt_value *params)
+bt_query_status lttng_live_query_list_sessions(const bt_value *params,
+               const bt_value **result)
 {
-       bt_component_class_query_method_return query_ret = {
-               .result = NULL,
-               .status = BT_QUERY_STATUS_OK,
-       };
-
-       bt_value *url_value = NULL;
+       bt_query_status status = BT_QUERY_STATUS_OK;
+       const bt_value *url_value = NULL;
        const char *url;
-       struct bt_live_viewer_connection *viewer_connection = NULL;
+       struct live_viewer_connection *viewer_connection = NULL;
 
-       url_value = bt_value_map_get(params, "url");
-       if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) {
-               BT_LOGW("Mandatory \"url\" parameter missing");
-               query_ret.status = BT_QUERY_STATUS_INVALID_PARAMS;
+       url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
+       if (!url_value) {
+               BT_LOGW("Mandatory `%s` parameter missing", URL_PARAM);
+               status = BT_QUERY_STATUS_INVALID_PARAMS;
                goto error;
        }
 
-       if (bt_value_string_get(url_value, &url) != BT_VALUE_STATUS_OK) {
-               BT_LOGW("\"url\" parameter is required to be a string value");
-               query_ret.status = BT_QUERY_STATUS_INVALID_PARAMS;
+       if (!bt_value_is_string(url_value)) {
+               BT_LOGW("`%s` parameter is required to be a string value",
+                       URL_PARAM);
+               status = BT_QUERY_STATUS_INVALID_PARAMS;
                goto error;
        }
 
-       viewer_connection = bt_live_viewer_connection_create(url, NULL);
+       url = bt_value_string_get(url_value);
+
+       viewer_connection = live_viewer_connection_create(url, true, NULL);
        if (!viewer_connection) {
                goto error;
        }
 
-       query_ret.result =
-               bt_live_viewer_connection_list_sessions(viewer_connection);
-       if (!query_ret.result) {
+       status = live_viewer_connection_list_sessions(viewer_connection,
+                       result);
+       if (status != BT_QUERY_STATUS_OK) {
                goto error;
        }
 
        goto end;
 
 error:
-       BT_OBJECT_PUT_REF_AND_RESET(query_ret.result);
+       BT_VALUE_PUT_REF_AND_RESET(*result);
 
-       if (query_ret.status >= 0) {
-               query_ret.status = BT_QUERY_STATUS_ERROR;
+       if (status >= 0) {
+               status = BT_QUERY_STATUS_ERROR;
        }
 
 end:
        if (viewer_connection) {
-               bt_live_viewer_connection_destroy(viewer_connection);
+               live_viewer_connection_destroy(viewer_connection);
        }
-       BT_VALUE_PUT_REF_AND_RESET(url_value);
-       return query_ret;
+       return status;
 }
 
 BT_HIDDEN
-bt_component_class_query_method_return lttng_live_query(
-               const bt_component_class *comp_class,
+bt_query_status lttng_live_query(bt_self_component_class_source *comp_class,
                const bt_query_executor *query_exec,
-               const char *object, bt_value *params)
+               const char *object, const bt_value *params,
+               const bt_value **result)
 {
-       bt_component_class_query_method_return ret = {
-               .result = NULL,
-               .status = BT_QUERY_STATUS_OK,
-       };
+       bt_query_status status = BT_QUERY_STATUS_OK;
 
        if (strcmp(object, "sessions") == 0) {
-               return lttng_live_query_list_sessions(comp_class,
-                       query_exec, params);
+               status = lttng_live_query_list_sessions(params, result);
+       } else {
+               BT_LOGW("Unknown query object `%s`", object);
+               status = BT_QUERY_STATUS_INVALID_OBJECT;
+               goto end;
        }
-       BT_LOGW("Unknown query object `%s`", object);
-       ret.status = BT_QUERY_STATUS_INVALID_OBJECT;
-       return ret;
+
+end:
+       return status;
 }
 
 static
 void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
 {
-       int ret;
-       struct lttng_live_session *session, *s;
-
-       bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
-               lttng_live_destroy_session(session);
-       }
-       BT_OBJECT_PUT_REF_AND_RESET(lttng_live->viewer_connection);
-       if (lttng_live->url) {
-               g_string_free(lttng_live->url, TRUE);
-       }
-       if (lttng_live->no_stream_port) {
-               bt_object_get_ref(lttng_live->no_stream_port);
-               ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
-               bt_object_put_ref(lttng_live->no_stream_port);
-               BT_ASSERT(!ret);
-       }
-       if (lttng_live->no_stream_iter) {
-               g_free(lttng_live->no_stream_iter);
+       if (lttng_live->params.url) {
+               g_string_free(lttng_live->params.url, TRUE);
        }
        g_free(lttng_live);
 }
 
 BT_HIDDEN
-void lttng_live_component_finalize(bt_self_component *component)
+void lttng_live_component_finalize(bt_self_component_source *component)
 {
-       void *data = bt_self_component_get_user_data(component);
+       void *data = bt_self_component_get_data(
+                       bt_self_component_source_as_self_component(component));
 
        if (!data) {
                return;
@@ -1060,41 +1490,71 @@ void lttng_live_component_finalize(bt_self_component *component)
 }
 
 static
-struct lttng_live_component *lttng_live_component_create(bt_value *params,
-               bt_self_component *private_component)
+enum session_not_found_action parse_session_not_found_action_param(
+       const bt_value *no_session_param)
+{
+       enum session_not_found_action action;
+       const char *no_session_act_str;
+       no_session_act_str = bt_value_string_get(no_session_param);
+       if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) {
+               action = SESSION_NOT_FOUND_ACTION_CONTINUE;
+       } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) {
+               action = SESSION_NOT_FOUND_ACTION_FAIL;
+       } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0) {
+               action = SESSION_NOT_FOUND_ACTION_END;
+       } else {
+               action = -1;
+       }
+
+       return action;
+}
+
+struct lttng_live_component *lttng_live_component_create(const bt_value *params)
 {
        struct lttng_live_component *lttng_live;
-       bt_value *value = NULL;
+       const bt_value *value = NULL;
        const char *url;
-       bt_value_status ret;
 
        lttng_live = g_new0(struct lttng_live_component, 1);
        if (!lttng_live) {
                goto end;
        }
-       /* TODO: make this an overridable parameter. */
        lttng_live->max_query_size = MAX_QUERY_SIZE;
-       BT_INIT_LIST_HEAD(&lttng_live->sessions);
-       value = bt_value_map_get(params, "url");
-       if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) {
-               BT_LOGW("Mandatory \"url\" parameter missing");
+       lttng_live->has_msg_iter = false;
+
+       value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
+       if (!value || !bt_value_is_string(value)) {
+               BT_LOGW("Mandatory `%s` parameter missing or not a string",
+                       URL_PARAM);
                goto error;
        }
        url = bt_value_string_get(value);
-       lttng_live->url = g_string_new(url);
-       if (!lttng_live->url) {
-               goto error;
-       }
-       BT_VALUE_PUT_REF_AND_RESET(value);
-       lttng_live->viewer_connection =
-               bt_live_viewer_connection_create(lttng_live->url->str, lttng_live);
-       if (!lttng_live->viewer_connection) {
+       lttng_live->params.url = g_string_new(url);
+       if (!lttng_live->params.url) {
                goto error;
        }
-       if (lttng_live_create_viewer_session(lttng_live)) {
-               goto error;
+
+       value = bt_value_map_borrow_entry_value_const(params,
+               SESS_NOT_FOUND_ACTION_PARAM);
+
+       if (value && bt_value_is_string(value)) {
+               lttng_live->params.sess_not_found_act =
+                       parse_session_not_found_action_param(value);
+               if (lttng_live->params.sess_not_found_act == -1) {
+                       BT_LOGE("Unexpected value for `%s` parameter: "
+                               "value=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM,
+                               bt_value_string_get(value));
+                       goto error;
+               }
+       } else {
+               BT_LOGW("Optional `%s` parameter is missing or "
+                       "not a string value. Defaulting to %s=\"%s\".",
+                       SESS_NOT_FOUND_ACTION_PARAM,
+                       SESS_NOT_FOUND_ACTION_PARAM,
+                       SESS_NOT_FOUND_ACTION_CONTINUE_STR);
+               lttng_live->params.sess_not_found_act =
+                       SESSION_NOT_FOUND_ACTION_CONTINUE;
        }
-       lttng_live->private_component = private_component;
 
        goto end;
 
@@ -1106,84 +1566,35 @@ end:
 }
 
 BT_HIDDEN
-bt_component_status lttng_live_component_init(
-               bt_self_component *private_component,
-               bt_value *params, void *init_method_data)
+bt_self_component_status lttng_live_component_init(
+               bt_self_component_source *self_comp,
+               const bt_value *params, UNUSED_VAR void *init_method_data)
 {
        struct lttng_live_component *lttng_live;
-       bt_component_status ret = BT_COMPONENT_STATUS_OK;
+       bt_self_component_status ret = BT_SELF_COMPONENT_STATUS_OK;
 
-       /* Passes ownership of iter ref to lttng_live_component_create. */
-       lttng_live = lttng_live_component_create(params, private_component);
+       lttng_live = lttng_live_component_create(params);
        if (!lttng_live) {
-               //TODO : we need access to the application cancel state
-               //because we are not part of a graph yet.
-               ret = BT_COMPONENT_STATUS_NOMEM;
+               ret = BT_SELF_COMPONENT_STATUS_NOMEM;
                goto end;
        }
+       lttng_live->self_comp = self_comp;
 
-       lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1);
-       lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM;
-       lttng_live->no_stream_iter->lttng_live = lttng_live;
        if (lttng_live_is_canceled(lttng_live)) {
                goto end;
        }
+
        ret = bt_self_component_source_add_output_port(
-                               lttng_live->private_component, "no-stream",
-                               lttng_live->no_stream_iter,
-                               &lttng_live->no_stream_port);
-       if (ret != BT_COMPONENT_STATUS_OK) {
+                               lttng_live->self_comp, "out",
+                               NULL, NULL);
+       if (ret != BT_SELF_COMPONENT_STATUS_OK) {
                goto end;
        }
-       bt_object_put_ref(lttng_live->no_stream_port);  /* weak */
-       lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
 
-       ret = bt_self_component_set_user_data(private_component, lttng_live);
-       if (ret != BT_COMPONENT_STATUS_OK) {
-               goto error;
-       }
+       bt_self_component_set_data(
+                       bt_self_component_source_as_self_component(self_comp),
+                       lttng_live);
 
 end:
        return ret;
-error:
-       (void) bt_self_component_set_user_data(private_component, NULL);
-       lttng_live_component_destroy_data(lttng_live);
-       return ret;
-}
-
-BT_HIDDEN
-bt_component_status lttng_live_accept_port_connection(
-               bt_self_component *private_component,
-               struct bt_private_port *self_private_port,
-               const bt_port *other_port)
-{
-       struct lttng_live_component *lttng_live =
-                       bt_self_component_get_user_data(private_component);
-       bt_component *other_component;
-       bt_component_status status = BT_COMPONENT_STATUS_OK;
-       const bt_port *self_port = bt_port_from_private(self_private_port);
-
-       other_component = bt_port_get_component(other_port);
-       bt_component_put_ref(other_component);  /* weak */
-
-       if (!lttng_live->downstream_component) {
-               lttng_live->downstream_component = other_component;
-               goto end;
-       }
-
-       /*
-        * Compare prior component to ensure we are connected to the
-        * same downstream component as prior ports.
-        */
-       if (lttng_live->downstream_component != other_component) {
-               BT_LOGW("Cannot connect ctf.lttng-live component port \"%s\" to component \"%s\": already connected to component \"%s\".",
-                       bt_port_get_name(self_port),
-                       bt_component_get_name(other_component),
-                       bt_component_get_name(lttng_live->downstream_component));
-               status = BT_COMPONENT_STATUS_REFUSE_PORT_CONNECTION;
-               goto end;
-       }
-end:
-       bt_port_put_ref(self_port);
-       return status;
 }
This page took 0.047025 seconds and 4 git commands to generate.