Adapt `src.ctf.lttng-live` to current API
authorFrancis Deslauriers <francis.deslauriers@efficios.com>
Mon, 4 Mar 2019 16:02:10 +0000 (11:02 -0500)
committerPhilippe Proulx <eeppeliteloop@gmail.com>
Thu, 2 May 2019 04:12:55 +0000 (00:12 -0400)
A `src.ctf.lttng-live` component now ensures the monotonic ordering of
the timestamps its outgoing messages. The component can receive messages
from multiples viewer session (UST 64bit, UST 32bit and/or Kernel) that
in turn may contain one or more traces that in turn may contain one or
more streams of messages. Messages from all these streams have to be
ordered into a single flow of messages where every message has a
timestamp equal or larger than the previous one.. This muxing is done by
tracking the next message of each of the live stream iterator and
returning the message with the smallest timestamp.

The live viewer sessions and everything required to get data and
metadata from the LTTng Relay daemon are now owned by the message
iterator and not the component as it was before.

Parameters
==========
The `url` mandatory string parameter controls the URL of the LTTng Relay
Daemon to which the component should register to received traces.

The `session-not-found-action` optional string parameter controls the
behavior of the component in case the requested session is not
configured on the LTTng Relay Daemon. The three accepted value of this
parameter are: "continue", "fail", and "end".

  continue: The message iterator will try to connect (or reconnect) to
  the requested session for ever.

  fail: The message iterator will return an ERROR status if the session
  is not available at the message iterator initialization stage and
  return END status when the session is no longer available.

  end: The message iterator always returns END status if the session is
  not found or is no longer available.

The `session-not-found-action` defaults to "continue" if not specified.

Future improvement
==================
Using a priority queue to order the messages would speed up the muxing
of messages.

Known limitation
================
If a new live stream iterator returns a message that has a timestamp
smaller (that happened before) than the last message forwarded
downstream but the `src.ctf.lttng-live` message iterator. The message
iterator will return with an error status. I am not sure if this
situation can happen in a single host configuration (which is the only
supported configuration).

Signed-off-by: Francis Deslauriers <francis.deslauriers@efficios.com>
12 files changed:
plugins/ctf/Makefile.am
plugins/ctf/lttng-live/Makefile.am
plugins/ctf/lttng-live/data-stream.c
plugins/ctf/lttng-live/data-stream.h
plugins/ctf/lttng-live/lttng-live-internal.h [deleted file]
plugins/ctf/lttng-live/lttng-live.c
plugins/ctf/lttng-live/lttng-live.h [new file with mode: 0644]
plugins/ctf/lttng-live/metadata.c
plugins/ctf/lttng-live/metadata.h
plugins/ctf/lttng-live/viewer-connection.c
plugins/ctf/lttng-live/viewer-connection.h
plugins/ctf/plugin.c

index de407fdae7bd09f8a7de7deeb9a8ac9980a8b194..3577090dbc9c56f3b587f01f2f64aab096e0f299 100644 (file)
@@ -1,5 +1,7 @@
-SUBDIRS = common fs-src fs-sink
-# lttng-live
+SUBDIRS = common \
+       fs-src \
+       fs-sink \
+       lttng-live
 
 noinst_HEADERS = print.h
 
@@ -14,9 +16,10 @@ babeltrace_plugin_ctf_la_LDFLAGS = \
        -avoid-version -module
 
 babeltrace_plugin_ctf_la_LIBADD = \
-       fs-src/libbabeltrace-plugin-ctf-fs-src.la \
        common/libbabeltrace-plugin-ctf-common.la \
-       fs-sink/libbabeltrace-plugin-ctf-fs-sink.la
+       fs-sink/libbabeltrace-plugin-ctf-fs-sink.la \
+       fs-src/libbabeltrace-plugin-ctf-fs-src.la \
+       lttng-live/libbabeltrace-plugin-ctf-lttng-live.la
 
 if !ENABLE_BUILT_IN_PLUGINS
 babeltrace_plugin_ctf_la_LIBADD += \
index 3b40353e8fc39cb1d66fe7e2157e36a8c1083c69..f69bb8e296c028da7fd0aa9f9ab07afb66004db4 100644 (file)
@@ -1,11 +1,17 @@
 AM_CPPFLAGS += -I$(top_srcdir)/plugins
 
-libbabeltrace_plugin_ctf_lttng_live_la_SOURCES = lttng-live.c \
-               data-stream.c lttng-live-internal.h \
-               data-stream.h metadata.c metadata.h \
-               viewer-connection.c viewer-connection.h \
+libbabeltrace_plugin_ctf_lttng_live_la_SOURCES = \
+               lttng-live.c \
+               lttng-live.h \
+               data-stream.c \
+               data-stream.h \
+               metadata.c \
+               metadata.h \
+               viewer-connection.c \
+               viewer-connection.h \
                lttng-viewer-abi.h \
-               logging.c logging.h
+               logging.c \
+               logging.h
 
 if BABELTRACE_BUILD_WITH_MINGW
 libbabeltrace_plugin_ctf_lttng_live_la_LIBADD = -lws2_32
index fde80b64c208b15f8d4f1bc552f1e3e0d5057c06..9d093cb58b7c8940b1bde0815a2c8ac56e0c7563 100644 (file)
@@ -1,4 +1,5 @@
 /*
+ * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
  * Copyright 2016 - Philippe Proulx <pproulx@efficios.com>
  * Copyright 2016 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
  * Copyright 2010-2011 - EfficiOS Inc. and Linux Foundation
@@ -38,6 +39,8 @@
 
 #include "data-stream.h"
 
+#define STREAM_NAME_PREFIX     "stream-"
+
 static
 enum bt_msg_iter_medium_status medop_request_bytes(
                size_t request_sz, uint8_t **buffer_addr,
@@ -48,11 +51,10 @@ enum bt_msg_iter_medium_status medop_request_bytes(
        struct lttng_live_stream_iterator *stream = data;
        struct lttng_live_trace *trace = stream->trace;
        struct lttng_live_session *session = trace->session;
-       struct lttng_live_component *lttng_live = session->lttng_live;
+       struct lttng_live_msg_iter *live_msg_iter = session->lttng_live_msg_iter;
        uint64_t recv_len = 0;
        uint64_t len_left;
        uint64_t read_len;
-       //int i;
 
        len_left = stream->base_offset + stream->len - stream->offset;
        if (!len_left) {
@@ -62,7 +64,7 @@ enum bt_msg_iter_medium_status medop_request_bytes(
        }
        read_len = MIN(request_sz, stream->buflen);
        read_len = MIN(read_len, len_left);
-       status = lttng_live_get_stream_bytes(lttng_live,
+       status = lttng_live_get_stream_bytes(live_msg_iter,
                        stream, stream->buf, stream->offset,
                        read_len, &recv_len);
        *buffer_addr = stream->buf;
@@ -72,34 +74,42 @@ enum bt_msg_iter_medium_status medop_request_bytes(
 }
 
 static
-const bt_stream *medop_get_stream(
-               const bt_stream_class *stream_class,
-               uint64_t stream_id, void *data)
+bt_stream *medop_borrow_stream(bt_stream_class *stream_class,
+               int64_t stream_id, void *data)
 {
        struct lttng_live_stream_iterator *lttng_live_stream = data;
 
        if (!lttng_live_stream->stream) {
-               int64_t stream_class_id =
-                       bt_stream_class_get_id(stream_class);
+               uint64_t stream_class_id = bt_stream_class_get_id(stream_class);
 
-               BT_LOGD("Creating stream %s (ID: %" PRIu64 ") out of stream class %" PRId64,
-                       lttng_live_stream->name, stream_id, stream_class_id);
+               BT_LOGD("Creating stream %s (ID: %" PRIu64 ") out of stream "
+                       "class %" PRId64, lttng_live_stream->name->str,
+                       stream_id, stream_class_id);
 
-               if (stream_id == -1ULL) {
-                       /* No stream ID */
-                       lttng_live_stream->stream = bt_stream_create(
-                               stream_class, lttng_live_stream->name);
+               if (stream_id < 0) {
+                       /*
+                        * No stream instance ID in the stream. It's possible
+                        * to encounter this situation with older version of
+                        * LTTng. In these cases, use the viewer_stream_id that
+                        * is unique for a live viewer session.
+                        */
+                       lttng_live_stream->stream = bt_stream_create_with_id(
+                               stream_class, lttng_live_stream->trace->trace,
+                               lttng_live_stream->viewer_stream_id);
                } else {
-                       lttng_live_stream->stream =
-                               bt_stream_create_with_id(stream_class,
-                                       lttng_live_stream->name, stream_id);
+                       lttng_live_stream->stream = bt_stream_create_with_id(
+                               stream_class, lttng_live_stream->trace->trace,
+                               (uint64_t) stream_id);
                }
 
                if (!lttng_live_stream->stream) {
-                       BT_LOGE("Cannot create stream %s (stream class %" PRId64 ", stream ID %" PRIu64 ")",
-                                       lttng_live_stream->name,
-                                       stream_class_id, stream_id);
+                       BT_LOGE("Cannot create stream %s (stream class ID "
+                               "%" PRId64 ", stream ID %" PRIu64 ")",
+                               lttng_live_stream->name->str,
+                               stream_class_id, stream_id);
                }
+               bt_stream_set_name(lttng_live_stream->stream,
+                       lttng_live_stream->name->str);
        }
 
        return lttng_live_stream->stream;
@@ -107,31 +117,43 @@ const bt_stream *medop_get_stream(
 
 static struct bt_msg_iter_medium_ops medops = {
        .request_bytes = medop_request_bytes,
-       .get_stream = medop_get_stream,
+       .seek = NULL,
+       .borrow_stream = medop_borrow_stream,
 };
 
 BT_HIDDEN
-bt_lttng_live_iterator_status lttng_live_lazy_msg_init(
+enum lttng_live_iterator_status lttng_live_lazy_msg_init(
                struct lttng_live_session *session)
 {
-       struct lttng_live_component *lttng_live = session->lttng_live;
-       struct lttng_live_trace *trace;
+       struct lttng_live_component *lttng_live =
+               session->lttng_live_msg_iter->lttng_live_comp;
+       uint64_t trace_idx, stream_iter_idx;
 
        if (!session->lazy_stream_msg_init) {
-               return BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+               return LTTNG_LIVE_ITERATOR_STATUS_OK;
        }
 
-       bt_list_for_each_entry(trace, &session->traces, node) {
-               struct lttng_live_stream_iterator *stream;
+       for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
+               struct lttng_live_trace *trace =
+                       g_ptr_array_index(session->traces, trace_idx);
+
+               for (stream_iter_idx = 0;
+                               stream_iter_idx < trace->stream_iterators->len;
+                               stream_iter_idx++) {
+                       struct ctf_trace_class *ctf_tc;
+                       struct lttng_live_stream_iterator *stream_iter =
+                               g_ptr_array_index(trace->stream_iterators,
+                                               stream_iter_idx);
 
-               bt_list_for_each_entry(stream, &trace->streams, node) {
-                       if (stream->msg_iter) {
+                       if (stream_iter->msg_iter) {
                                continue;
                        }
-                       stream->msg_iter = bt_msg_iter_create(trace->trace,
+                       ctf_tc = ctf_metadata_decoder_borrow_ctf_trace_class(
+                                               trace->metadata->decoder);
+                       stream_iter->msg_iter = bt_msg_iter_create(ctf_tc,
                                        lttng_live->max_query_size, medops,
-                                       stream);
-                       if (!stream->msg_iter) {
+                                       stream_iter);
+                       if (!stream_iter->msg_iter) {
                                goto error;
                        }
                }
@@ -139,10 +161,10 @@ bt_lttng_live_iterator_status lttng_live_lazy_msg_init(
 
        session->lazy_stream_msg_init = false;
 
-       return BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+       return LTTNG_LIVE_ITERATOR_STATUS_OK;
 
 error:
-       return BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+       return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
 }
 
 BT_HIDDEN
@@ -151,78 +173,101 @@ struct lttng_live_stream_iterator *lttng_live_stream_iterator_create(
                uint64_t ctf_trace_id,
                uint64_t stream_id)
 {
-       struct lttng_live_component *lttng_live = session->lttng_live;
-       struct lttng_live_stream_iterator *stream =
-                       g_new0(struct lttng_live_stream_iterator, 1);
+       struct lttng_live_stream_iterator *stream_iter;
+       struct lttng_live_component *lttng_live;
        struct lttng_live_trace *trace;
-       int ret;
 
-       trace = lttng_live_ref_trace(session, ctf_trace_id);
+       BT_ASSERT(session);
+       BT_ASSERT(session->lttng_live_msg_iter);
+       BT_ASSERT(session->lttng_live_msg_iter->lttng_live_comp);
+
+       lttng_live = session->lttng_live_msg_iter->lttng_live_comp;
+
+       stream_iter = g_new0(struct lttng_live_stream_iterator, 1);
+       if (!stream_iter) {
+               goto error;
+       }
+
+       trace = lttng_live_borrow_trace(session, ctf_trace_id);
        if (!trace) {
                goto error;
        }
 
-       stream->p.type = LIVE_STREAM_TYPE_STREAM;
-       stream->trace = trace;
-       stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
-       stream->viewer_stream_id = stream_id;
-       stream->ctf_stream_class_id = -1ULL;
-       stream->last_returned_inactivity_timestamp = INT64_MIN;
+       stream_iter->trace = trace;
+       stream_iter->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
+       stream_iter->viewer_stream_id = stream_id;
+       stream_iter->ctf_stream_class_id = -1ULL;
+       stream_iter->last_inactivity_ts = INT64_MIN;
 
        if (trace->trace) {
-               stream->msg_iter = bt_msg_iter_create(trace->trace,
+               struct ctf_trace_class *ctf_tc =
+                       ctf_metadata_decoder_borrow_ctf_trace_class(
+                                       trace->metadata->decoder);
+               BT_ASSERT(!stream_iter->msg_iter);
+               stream_iter->msg_iter = bt_msg_iter_create(ctf_tc,
                                lttng_live->max_query_size, medops,
-                               stream);
-               if (!stream->msg_iter) {
+                               stream_iter);
+               if (!stream_iter->msg_iter) {
                        goto error;
                }
        }
-       stream->buf = g_new0(uint8_t, session->lttng_live->max_query_size);
-       stream->buflen = session->lttng_live->max_query_size;
+       stream_iter->buf = g_new0(uint8_t, lttng_live->max_query_size);
+       if (!stream_iter->buf) {
+               goto error;
+       }
 
-       ret = lttng_live_add_port(lttng_live, stream);
-       BT_ASSERT(!ret);
+       stream_iter->buflen = lttng_live->max_query_size;
+       stream_iter->name = g_string_new(NULL);
+       if (!stream_iter->name) {
+               goto error;
+       }
+
+       g_string_printf(stream_iter->name, STREAM_NAME_PREFIX "%" PRIu64,
+                       stream_iter->viewer_stream_id);
+       g_ptr_array_add(trace->stream_iterators, stream_iter);
 
-       bt_list_add(&stream->node, &trace->streams);
+       /* Track the number of active stream iterator. */
+       session->lttng_live_msg_iter->active_stream_iter++;
 
        goto end;
 error:
-       /* Do not touch "borrowed" file. */
-       lttng_live_stream_iterator_destroy(stream);
-       stream = NULL;
+       lttng_live_stream_iterator_destroy(stream_iter);
+       stream_iter = NULL;
 end:
-       return stream;
+       return stream_iter;
 }
 
 BT_HIDDEN
-void lttng_live_stream_iterator_destroy(struct lttng_live_stream_iterator *stream)
+void lttng_live_stream_iterator_destroy(
+               struct lttng_live_stream_iterator *stream_iter)
 {
-       struct lttng_live_component *lttng_live;
-       int ret;
-
-       if (!stream) {
+       if (!stream_iter) {
                return;
        }
 
-       lttng_live = stream->trace->session->lttng_live;
-       ret = lttng_live_remove_port(lttng_live, stream->port);
-       BT_ASSERT(!ret);
-
-       if (stream->stream) {
-               BT_OBJECT_PUT_REF_AND_RESET(stream->stream);
+       if (stream_iter->stream) {
+               BT_STREAM_PUT_REF_AND_RESET(stream_iter->stream);
        }
 
-       if (stream->msg_iter) {
-               bt_msg_iter_destroy(stream->msg_iter);
+       if (stream_iter->msg_iter) {
+               bt_msg_iter_destroy(stream_iter->msg_iter);
        }
-       g_free(stream->buf);
-       BT_OBJECT_PUT_REF_AND_RESET(stream->packet_end_msg_queue);
-       bt_list_del(&stream->node);
+       if (stream_iter->buf) {
+               g_free(stream_iter->buf);
+       }
+       if (stream_iter->name) {
+               g_string_free(stream_iter->name, TRUE);
+       }
+
+       bt_message_put_ref(stream_iter->current_msg);
+
+       /* Track the number of active stream iterator. */
+       stream_iter->trace->session->lttng_live_msg_iter->active_stream_iter--;
+
        /*
         * Ensure we poke the trace metadata in the future, which is
         * required to release the metadata reference on the trace.
         */
-       stream->trace->new_metadata_needed = true;
-       lttng_live_unref_trace(stream->trace);
-       g_free(stream);
+       stream_iter->trace->new_metadata_needed = true;
+       g_free(stream_iter);
 }
index 5c219c32d8a8bb393cf79e2a6468a56482999aca..4f60cf12f16bf5714655202e5807b6cdc919cb61 100644 (file)
 #include <babeltrace/babeltrace-internal.h>
 #include <babeltrace/babeltrace.h>
 
-#include "lttng-live-internal.h"
+#include "lttng-live.h"
 #include "../common/msg-iter/msg-iter.h"
 
-bt_lttng_live_iterator_status lttng_live_lazy_msg_init(
+enum lttng_live_iterator_status lttng_live_lazy_msg_init(
                struct lttng_live_session *session);
 
 struct lttng_live_stream_iterator *lttng_live_stream_iterator_create(
@@ -39,6 +39,7 @@ struct lttng_live_stream_iterator *lttng_live_stream_iterator_create(
                uint64_t ctf_trace_id,
                uint64_t stream_id);
 
-void lttng_live_stream_iterator_destroy(struct lttng_live_stream_iterator *stream);
+void lttng_live_stream_iterator_destroy(
+               struct lttng_live_stream_iterator *stream);
 
 #endif /* LTTNG_LIVE_DATA_STREAM_H */
diff --git a/plugins/ctf/lttng-live/lttng-live-internal.h b/plugins/ctf/lttng-live/lttng-live-internal.h
deleted file mode 100644 (file)
index fe35314..0000000
+++ /dev/null
@@ -1,270 +0,0 @@
-#ifndef BABELTRACE_PLUGIN_CTF_LTTNG_LIVE_INTERNAL_H
-#define BABELTRACE_PLUGIN_CTF_LTTNG_LIVE_INTERNAL_H
-
-/*
- * BabelTrace - LTTng-live client Component
- *
- * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
- * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- *
- * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-#include <stdbool.h>
-
-#include <babeltrace/babeltrace-internal.h>
-#include <babeltrace/babeltrace.h>
-#include "viewer-connection.h"
-
-//TODO: this should not be used by plugins. Should copy code into plugin
-//instead.
-#include "babeltrace/object-internal.h"
-#include "babeltrace/list-internal.h"
-#include "../common/metadata/decoder.h"
-
-#define STREAM_NAME_PREFIX     "stream-"
-/* Account for u64 max string length. */
-#define U64_STR_MAX_LEN                20
-#define STREAM_NAME_MAX_LEN    (sizeof(STREAM_NAME_PREFIX) + U64_STR_MAX_LEN)
-
-struct lttng_live_component;
-struct lttng_live_session;
-
-enum lttng_live_stream_state {
-       LTTNG_LIVE_STREAM_ACTIVE_NO_DATA,
-       LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA,
-       LTTNG_LIVE_STREAM_QUIESCENT,
-       LTTNG_LIVE_STREAM_ACTIVE_DATA,
-       LTTNG_LIVE_STREAM_EOF,
-};
-
-enum live_stream_type {
-       LIVE_STREAM_TYPE_NO_STREAM,
-       LIVE_STREAM_TYPE_STREAM,
-};
-
-struct lttng_live_stream_iterator_generic {
-       enum live_stream_type type;
-};
-
-/* Iterator over a live stream. */
-struct lttng_live_stream_iterator {
-       struct lttng_live_stream_iterator_generic p;
-
-       const bt_stream *stream;
-       struct lttng_live_trace *trace;
-       struct bt_private_port *port;   /* weak ref. */
-
-       /* Node of stream list within the trace. */
-       struct bt_list_head node;
-
-       /*
-        * Since only a single iterator per viewer connection, we have
-        * only a single message iterator per stream.
-        */
-       struct bt_msg_iter *msg_iter;
-
-       uint64_t viewer_stream_id;
-
-       uint64_t ctf_stream_class_id;
-       uint64_t base_offset;           /* base offset in current index. */
-       uint64_t len;                   /* len to read in current index. */
-       uint64_t offset;                /* offset in current index. */
-
-       int64_t last_returned_inactivity_timestamp;
-       int64_t current_inactivity_timestamp;
-
-       enum lttng_live_stream_state state;
-
-       uint64_t current_packet_end_timestamp;
-       const bt_message *packet_end_msg_queue;
-
-       uint8_t *buf;
-       size_t buflen;
-
-       char name[STREAM_NAME_MAX_LEN];
-};
-
-struct lttng_live_no_stream_iterator {
-       struct lttng_live_stream_iterator_generic p;
-
-       struct lttng_live_component *lttng_live;
-       struct bt_private_port *port;   /* weak ref. */
-};
-
-struct lttng_live_component_options {
-       bool opt_dummy : 1;
-};
-
-struct lttng_live_metadata {
-       struct lttng_live_trace *trace;
-       uint64_t stream_id;
-       uint8_t uuid[16];
-       bool is_uuid_set;
-       int bo;
-       char *text;
-
-       struct ctf_metadata_decoder *decoder;
-
-       bool closed;
-};
-
-struct lttng_live_trace {
-       bt_object obj;
-
-       /* Node of trace list within the session. */
-       struct bt_list_head node;
-
-       /* Back reference to session. */
-       struct lttng_live_session *session;
-
-       uint64_t id;    /* ctf trace ID within the session. */
-
-       const bt_trace *trace;
-
-       struct lttng_live_metadata *metadata;
-       bt_clock_class_priority_map *cc_prio_map;
-
-       /* List of struct lttng_live_stream_iterator */
-       struct bt_list_head streams;
-
-       bool new_metadata_needed;
-};
-
-struct lttng_live_session {
-       /* Node of session list within the component. */
-       struct bt_list_head node;
-
-       struct lttng_live_component *lttng_live;
-
-       GString *hostname;
-       GString *session_name;
-
-       uint64_t id;
-
-       /* List of struct lttng_live_trace */
-       struct bt_list_head traces;
-
-       bool attached;
-       bool new_streams_needed;
-       bool lazy_stream_msg_init;
-       bool closed;
-};
-
-/*
- * A component instance is an iterator on a single session.
- */
-struct lttng_live_component {
-       bt_object obj;
-       bt_self_component *private_component;   /* weak */
-       struct bt_live_viewer_connection *viewer_connection;
-
-       /* List of struct lttng_live_session */
-       struct bt_list_head sessions;
-
-       GString *url;
-       size_t max_query_size;
-       struct lttng_live_component_options options;
-
-       struct bt_private_port *no_stream_port; /* weak */
-       struct lttng_live_no_stream_iterator *no_stream_iter;
-
-       bt_component *downstream_component;
-};
-
-bt_lttng_live_iterator_status {
-       /** Iterator state has progressed. Continue iteration immediately. */
-       BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE = 3,
-       /** No message available for now. Try again later. */
-       BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN = 2,
-       /** No more CTF_LTTNG_LIVEs to be delivered. */
-       BT_LTTNG_LIVE_ITERATOR_STATUS_END = 1,
-       /** No error, okay. */
-       BT_LTTNG_LIVE_ITERATOR_STATUS_OK = 0,
-       /** Invalid arguments. */
-       BT_LTTNG_LIVE_ITERATOR_STATUS_INVAL = -1,
-       /** General error. */
-       BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR = -2,
-       /** Out of memory. */
-       BT_LTTNG_LIVE_ITERATOR_STATUS_NOMEM = -3,
-       /** Unsupported iterator feature. */
-       BT_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED = -4,
-};
-
-bt_component_status lttng_live_component_init(bt_self_component *source,
-               bt_value *params, void *init_method_data);
-
-bt_component_class_query_method_return lttng_live_query(
-               const bt_component_class *comp_class,
-               const bt_query_executor *query_exec,
-               const char *object, bt_value *params);
-
-void lttng_live_component_finalize(bt_self_component *component);
-
-bt_message_iterator_next_method_return lttng_live_iterator_next(
-        bt_self_message_iterator *iterator);
-
-bt_component_status lttng_live_accept_port_connection(
-               bt_self_component *private_component,
-               struct bt_private_port *self_private_port,
-               const bt_port *other_port);
-
-bt_message_iterator_status lttng_live_iterator_init(
-               bt_self_message_iterator *it,
-               struct bt_private_port *port);
-
-void lttng_live_iterator_finalize(bt_self_message_iterator *it);
-
-int lttng_live_create_viewer_session(struct lttng_live_component *lttng_live);
-int lttng_live_attach_session(struct lttng_live_session *session);
-int lttng_live_detach_session(struct lttng_live_session *session);
-bt_lttng_live_iterator_status lttng_live_get_new_streams(
-               struct lttng_live_session *session);
-
-int lttng_live_add_session(struct lttng_live_component *lttng_live,
-               uint64_t session_id,
-               const char *hostname,
-               const char *session_name);
-
-ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace,
-               FILE *fp);
-bt_lttng_live_iterator_status lttng_live_get_next_index(
-               struct lttng_live_component *lttng_live,
-               struct lttng_live_stream_iterator *stream,
-               struct packet_index *index);
-enum bt_msg_iter_medium_status lttng_live_get_stream_bytes(
-               struct lttng_live_component *lttng_live,
-               struct lttng_live_stream_iterator *stream, uint8_t *buf, uint64_t offset,
-               uint64_t req_len, uint64_t *recv_len);
-
-int lttng_live_add_port(struct lttng_live_component *lttng_live,
-               struct lttng_live_stream_iterator *stream_iter);
-int lttng_live_remove_port(struct lttng_live_component *lttng_live,
-               struct bt_private_port *port);
-
-struct lttng_live_trace *lttng_live_ref_trace(
-               struct lttng_live_session *session, uint64_t trace_id);
-void lttng_live_unref_trace(struct lttng_live_trace *trace);
-void lttng_live_need_new_streams(struct lttng_live_component *lttng_live);
-
-bt_bool lttng_live_is_canceled(struct lttng_live_component *lttng_live);
-
-#endif /* BABELTRACE_PLUGIN_CTF_LTTNG_LIVE_INTERNAL_H */
index 9fd35b2a21d15b9a410e741fc2633dc019953794..40a145607216b9bce6b856807abad69f272c677a 100644 (file)
@@ -3,6 +3,7 @@
  *
  * Babeltrace CTF LTTng-live Client Component
  *
+ * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
  * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
  * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
  *
 #define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC"
 #include "logging.h"
 
+#include <glib.h>
+#include <inttypes.h>
+#include <unistd.h>
+
+#include <babeltrace/assert-internal.h>
 #include <babeltrace/babeltrace.h>
 #include <babeltrace/compiler-internal.h>
 #include <babeltrace/types.h>
-#include <inttypes.h>
-#include <glib.h>
-#include <babeltrace/assert-internal.h>
-#include <unistd.h>
 #include <plugins-common.h>
 
 #include "data-stream.h"
 #include "metadata.h"
-#include "lttng-live-internal.h"
+#include "lttng-live.h"
 
-#define MAX_QUERY_SIZE         (256*1024)
+#define MAX_QUERY_SIZE                     (256*1024)
+#define URL_PARAM                          "url"
+#define SESS_NOT_FOUND_ACTION_PARAM        "session-not-found-action"
+#define SESS_NOT_FOUND_ACTION_CONTINUE_STR  "continue"
+#define SESS_NOT_FOUND_ACTION_FAIL_STR     "fail"
+#define SESS_NOT_FOUND_ACTION_END_STR      "end"
 
 #define print_dbg(fmt, ...)    BT_LOGD(fmt, ## __VA_ARGS__)
 
-static const char *print_state(struct lttng_live_stream_iterator *s)
+static
+const char *print_live_iterator_status(enum lttng_live_iterator_status status)
+{
+       switch (status) {
+       case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
+               return "LTTNG_LIVE_ITERATOR_STATUS_CONTINUE";
+       case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
+               return "LTTNG_LIVE_ITERATOR_STATUS_AGAIN";
+       case LTTNG_LIVE_ITERATOR_STATUS_END:
+               return "LTTNG_LIVE_ITERATOR_STATUS_END";
+       case LTTNG_LIVE_ITERATOR_STATUS_OK:
+               return "LTTNG_LIVE_ITERATOR_STATUS_OK";
+       case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
+               return "LTTNG_LIVE_ITERATOR_STATUS_INVAL";
+       case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
+               return "LTTNG_LIVE_ITERATOR_STATUS_ERROR";
+       case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
+               return "LTTNG_LIVE_ITERATOR_STATUS_NOMEM";
+       case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
+               return "LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED";
+       default:
+               abort();
+       }
+}
+
+static
+const char *print_state(struct lttng_live_stream_iterator *s)
 {
        switch (s->state) {
        case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
@@ -65,159 +98,68 @@ static const char *print_state(struct lttng_live_stream_iterator *s)
        }
 }
 
-static
-void print_stream_state(struct lttng_live_stream_iterator *stream)
-{
-       const bt_port *port;
-
-       port = bt_port_from_private(stream->port);
-       print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64,
-               bt_port_get_name(port),
-               print_state(stream),
-               stream->last_returned_inactivity_timestamp,
-               stream->current_inactivity_timestamp);
-       bt_port_put_ref(port);
-}
+#define print_stream_state(live_stream_iter) \
+       do { \
+               BT_LOGD("stream state %s last_inact_ts %" PRId64  \
+                       ", curr_inact_ts %" PRId64, \
+                       print_state(live_stream_iter), \
+                       live_stream_iter->last_inactivity_ts, \
+                       live_stream_iter->current_inactivity_ts); \
+       } while (0);
 
 BT_HIDDEN
-bt_bool lttng_live_is_canceled(struct lttng_live_component *lttng_live)
+bool lttng_live_is_canceled(struct lttng_live_component *lttng_live)
 {
-       bt_component *component;
-       const bt_graph *graph;
-       bt_bool ret;
+       const bt_component *component;
+       bool ret;
 
        if (!lttng_live) {
-               return BT_FALSE;
-       }
-
-       component = bt_component_from_private(lttng_live->private_component);
-       graph = bt_component_get_graph(component);
-       ret = bt_graph_is_canceled(graph);
-       bt_graph_put_ref(graph);
-       bt_component_put_ref(component);
-       return ret;
-}
-
-BT_HIDDEN
-int lttng_live_add_port(struct lttng_live_component *lttng_live,
-               struct lttng_live_stream_iterator *stream_iter)
-{
-       int ret;
-       struct bt_private_port *private_port;
-       char name[STREAM_NAME_MAX_LEN];
-       bt_component_status status;
-
-       ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id);
-       BT_ASSERT(ret > 0);
-       strcpy(stream_iter->name, name);
-       if (lttng_live_is_canceled(lttng_live)) {
-               return 0;
-       }
-       status = bt_self_component_source_add_output_port(
-                       lttng_live->private_component, name, stream_iter,
-                       &private_port);
-       switch (status) {
-       case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
-               return 0;
-       case BT_COMPONENT_STATUS_OK:
-               break;
-       default:
-               return -1;
-       }
-       bt_object_put_ref(private_port);        /* weak */
-       BT_LOGI("Added port %s", name);
-
-       if (lttng_live->no_stream_port) {
-               bt_object_get_ref(lttng_live->no_stream_port);
-               ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
-               bt_object_put_ref(lttng_live->no_stream_port);
-               if (ret) {
-                       return -1;
-               }
-               lttng_live->no_stream_port = NULL;
-               lttng_live->no_stream_iter->port = NULL;
+               ret = false;
+               goto end;
        }
-       stream_iter->port = private_port;
-       return 0;
-}
 
-BT_HIDDEN
-int lttng_live_remove_port(struct lttng_live_component *lttng_live,
-               struct bt_private_port *port)
-{
-       bt_component *component;
-       int64_t nr_ports;
-       int ret;
-
-       component = bt_component_from_private(lttng_live->private_component);
-       nr_ports = bt_component_source_get_output_port_count(component);
-       if (nr_ports < 0) {
-               return -1;
-       }
-       BT_COMPONENT_PUT_REF_AND_RESET(component);
-       if (nr_ports == 1) {
-               bt_component_status status;
+       component = bt_component_source_as_component_const(
+               bt_self_component_source_as_component_source(
+               lttng_live->self_comp));
 
-               BT_ASSERT(!lttng_live->no_stream_port);
+       ret = bt_component_graph_is_canceled(component);
 
-               if (lttng_live_is_canceled(lttng_live)) {
-                       return 0;
-               }
-               status = bt_self_component_source_add_output_port(lttng_live->private_component,
-                               "no-stream", lttng_live->no_stream_iter,
-                               &lttng_live->no_stream_port);
-               switch (status) {
-               case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
-                       return 0;
-               case BT_COMPONENT_STATUS_OK:
-                       break;
-               default:
-                       return -1;
-               }
-               bt_object_put_ref(lttng_live->no_stream_port);  /* weak */
-               lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
-       }
-       bt_object_get_ref(port);
-       ret = bt_private_port_remove_from_component(port);
-       bt_object_put_ref(port);
-       if (ret) {
-               return -1;
-       }
-       return 0;
+end:
+       return ret;
 }
 
 static
 struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
                uint64_t trace_id)
 {
-       struct lttng_live_trace *trace;
+       uint64_t trace_idx;
+       struct lttng_live_trace *ret_trace = NULL;
 
-       bt_list_for_each_entry(trace, &session->traces, node) {
+       for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
+               struct lttng_live_trace *trace =
+                       g_ptr_array_index(session->traces, trace_idx);
                if (trace->id == trace_id) {
-                       return trace;
+                       ret_trace = trace;
+                       goto end;
                }
        }
-       return NULL;
+
+end:
+       return ret_trace;
 }
 
 static
-void lttng_live_destroy_trace(bt_object *obj)
+void lttng_live_destroy_trace(struct lttng_live_trace *trace)
 {
-       struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj);
+       BT_LOGD("Destroy lttng_live_trace");
 
-       BT_LOGI("Destroy trace");
-       BT_ASSERT(bt_list_empty(&trace->streams));
-       bt_list_del(&trace->node);
+       BT_ASSERT(trace->stream_iterators);
+       g_ptr_array_free(trace->stream_iterators, TRUE);
 
-       if (trace->trace) {
-               int retval;
+       BT_TRACE_PUT_REF_AND_RESET(trace->trace);
+       BT_TRACE_CLASS_PUT_REF_AND_RESET(trace->trace_class);
 
-               retval = bt_trace_set_is_static(trace->trace);
-               BT_ASSERT(!retval);
-               BT_TRACE_PUT_REF_AND_RESET(trace->trace);
-       }
        lttng_live_metadata_fini(trace);
-       BT_OBJECT_PUT_REF_AND_RESET(trace->cc_prio_map);
        g_free(trace);
 }
 
@@ -233,10 +175,14 @@ struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *sess
        }
        trace->session = session;
        trace->id = trace_id;
-       BT_INIT_LIST_HEAD(&trace->streams);
+       trace->trace_class = NULL;
+       trace->trace = NULL;
+       trace->stream_iterators = g_ptr_array_new_with_free_func(
+                       (GDestroyNotify) lttng_live_stream_iterator_destroy);
+       BT_ASSERT(trace->stream_iterators);
        trace->new_metadata_needed = true;
-       bt_list_add(&trace->node, &session->traces);
-       bt_object_init(&trace->obj, lttng_live_destroy_trace);
+       g_ptr_array_add(session->traces, trace);
+
        BT_LOGI("Create trace");
        goto end;
 error:
@@ -247,63 +193,55 @@ end:
 }
 
 BT_HIDDEN
-struct lttng_live_trace *lttng_live_ref_trace(struct lttng_live_session *session,
-               uint64_t trace_id)
+struct lttng_live_trace *lttng_live_borrow_trace(
+               struct lttng_live_session *session, uint64_t trace_id)
 {
        struct lttng_live_trace *trace;
 
        trace = lttng_live_find_trace(session, trace_id);
        if (trace) {
-               bt_object_get_ref(trace);
-               return trace;
+               goto end;
        }
-       return lttng_live_create_trace(session, trace_id);
-}
-
-BT_HIDDEN
-void lttng_live_unref_trace(struct lttng_live_trace *trace)
-{
-       bt_object_put_ref(trace);
-}
 
-static
-void lttng_live_close_trace_streams(struct lttng_live_trace *trace)
-{
-       struct lttng_live_stream_iterator *stream, *s;
+       /* The session is the owner of the newly created trace. */
+       trace = lttng_live_create_trace(session, trace_id);
 
-       bt_list_for_each_entry_safe(stream, s, &trace->streams, node) {
-               lttng_live_stream_iterator_destroy(stream);
-       }
-       lttng_live_metadata_fini(trace);
+end:
+       return trace;
 }
 
 BT_HIDDEN
-int lttng_live_add_session(struct lttng_live_component *lttng_live,
+int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
                uint64_t session_id, const char *hostname,
                const char *session_name)
 {
        int ret = 0;
-       struct lttng_live_session *s;
+       struct lttng_live_session *session;
 
-       s = g_new0(struct lttng_live_session, 1);
-       if (!s) {
+       session = g_new0(struct lttng_live_session, 1);
+       if (!session) {
                goto error;
        }
 
-       s->id = session_id;
-       BT_INIT_LIST_HEAD(&s->traces);
-       s->lttng_live = lttng_live;
-       s->new_streams_needed = true;
-       s->hostname = g_string_new(hostname);
-       s->session_name = g_string_new(session_name);
+       session->id = session_id;
+       session->traces = g_ptr_array_new_with_free_func(
+                       (GDestroyNotify) lttng_live_destroy_trace);
+       BT_ASSERT(session->traces);
+       session->lttng_live_msg_iter = lttng_live_msg_iter;
+       session->new_streams_needed = true;
+       session->hostname = g_string_new(hostname);
+       BT_ASSERT(session->hostname);
+
+       session->session_name = g_string_new(session_name);
+       BT_ASSERT(session->session_name);
 
        BT_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s",
-               s->id, hostname, session_name);
-       bt_list_add(&s->node, &lttng_live->sessions);
+               session->id, hostname, session_name);
+       g_ptr_array_add(lttng_live_msg_iter->sessions, session);
        goto end;
 error:
        BT_LOGE("Error adding session");
-       g_free(s);
+       g_free(session);
        ret = -1;
 end:
        return ret;
@@ -312,23 +250,30 @@ end:
 static
 void lttng_live_destroy_session(struct lttng_live_session *session)
 {
-       struct lttng_live_trace *trace, *t;
+       struct lttng_live_component *live_comp;
+
+       if (!session) {
+               goto end;
+       }
 
-       BT_LOGI("Destroy session");
+       BT_LOGD("Destroy lttng live session");
        if (session->id != -1ULL) {
                if (lttng_live_detach_session(session)) {
-                       if (!lttng_live_is_canceled(session->lttng_live)) {
+                       live_comp = session->lttng_live_msg_iter->lttng_live_comp;
+                       if (session->lttng_live_msg_iter &&
+                                       !lttng_live_is_canceled(live_comp)) {
                                /* Old relayd cannot detach sessions. */
-                               BT_LOGD("Unable to detach session %" PRIu64,
+                               BT_LOGD("Unable to detach lttng live session %" PRIu64,
                                        session->id);
                        }
                }
                session->id = -1ULL;
        }
-       bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
-               lttng_live_close_trace_streams(trace);
+
+       if (session->traces) {
+               g_ptr_array_free(session->traces, TRUE);
        }
-       bt_list_del(&session->node);
+
        if (session->hostname) {
                g_string_free(session->hostname, TRUE);
        }
@@ -336,34 +281,50 @@ void lttng_live_destroy_session(struct lttng_live_session *session)
                g_string_free(session->session_name, TRUE);
        }
        g_free(session);
+
+end:
+       return;
 }
 
-BT_HIDDEN
-void lttng_live_iterator_finalize(bt_self_message_iterator *it)
+static
+void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter)
 {
-       struct lttng_live_stream_iterator_generic *s =
-                       bt_self_message_iterator_get_user_data(it);
-
-       switch (s->type) {
-       case LIVE_STREAM_TYPE_NO_STREAM:
-       {
-               /* Leave no_stream_iter in place when port is removed. */
-               break;
+       if (!lttng_live_msg_iter) {
+               goto end;
        }
-       case LIVE_STREAM_TYPE_STREAM:
-       {
-               struct lttng_live_stream_iterator *stream_iter =
-                       container_of(s, struct lttng_live_stream_iterator, p);
 
-               lttng_live_stream_iterator_destroy(stream_iter);
-               break;
-       }
+       if (lttng_live_msg_iter->sessions) {
+               g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
        }
+
+       BT_OBJECT_PUT_REF_AND_RESET(lttng_live_msg_iter->viewer_connection);
+       BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
+       BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
+
+       /* All stream iterators must be destroyed at this point. */
+       BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0);
+       lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false;
+
+       g_free(lttng_live_msg_iter);
+
+end:
+       return;
+}
+
+BT_HIDDEN
+void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
+{
+       struct lttng_live_msg_iter *lttng_live_msg_iter;
+
+       BT_ASSERT(self_msg_iter);
+
+       lttng_live_msg_iter = bt_self_message_iterator_get_data(self_msg_iter);
+       BT_ASSERT(lttng_live_msg_iter);
+       lttng_live_msg_iter_destroy(lttng_live_msg_iter);
 }
 
 static
-bt_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
-               struct lttng_live_component *lttng_live,
+enum lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
                struct lttng_live_stream_iterator *lttng_live_stream)
 {
        switch (lttng_live_stream->state) {
@@ -381,7 +342,7 @@ bt_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
        case LTTNG_LIVE_STREAM_EOF:
                break;
        }
-       return BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+       return LTTNG_LIVE_ITERATOR_STATUS_OK;
 }
 
 /*
@@ -393,40 +354,43 @@ bt_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
  *   return EOF.
  */
 static
-bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
-               struct lttng_live_component *lttng_live,
+enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
+               struct lttng_live_msg_iter *lttng_live_msg_iter,
                struct lttng_live_stream_iterator *lttng_live_stream)
 {
-       bt_lttng_live_iterator_status ret =
-                       BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+       enum lttng_live_iterator_status ret =
+                       LTTNG_LIVE_ITERATOR_STATUS_OK;
        struct packet_index index;
        enum lttng_live_stream_state orig_state = lttng_live_stream->state;
 
        if (lttng_live_stream->trace->new_metadata_needed) {
-               ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+               ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
                goto end;
        }
        if (lttng_live_stream->trace->session->new_streams_needed) {
-               ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+               ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
                goto end;
        }
-       if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
-                       && lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
+       if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
+                       lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
                goto end;
        }
-       ret = lttng_live_get_next_index(lttng_live, lttng_live_stream, &index);
-       if (ret != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+       ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream,
+               &index);
+       if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
                goto end;
        }
        BT_ASSERT(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
        if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
-               if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
-                               && lttng_live_stream->last_returned_inactivity_timestamp ==
-                                       lttng_live_stream->current_inactivity_timestamp) {
-                       ret = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+               uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts,
+                        curr_inact_ts = lttng_live_stream->current_inactivity_ts;
+
+               if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA &&
+                               last_inact_ts == curr_inact_ts) {
+                       ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
                        print_stream_state(lttng_live_stream);
                } else {
-                       ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+                       ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
                }
                goto end;
        }
@@ -434,109 +398,138 @@ bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream
        lttng_live_stream->offset = index.offset;
        lttng_live_stream->len = index.packet_size / CHAR_BIT;
 end:
-       if (ret == BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
-               ret = lttng_live_iterator_next_check_stream_state(
-                               lttng_live, lttng_live_stream);
+       if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
+               ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
        }
        return ret;
 }
 
 /*
- * Creation of the message requires the ctf trace to be created
- * beforehand, but the live protocol gives us all streams (including
- * metadata) at once. So we split it in three steps: getting streams,
- * getting metadata (which creates the ctf trace), and then creating the
- * per-stream messages.
+ * Creation of the message requires the ctf trace class to be created
+ * beforehand, but the live protocol gives us all streams (including metadata)
+ * at once. So we split it in three steps: getting streams, getting metadata
+ * (which creates the ctf trace class), and then creating the per-stream
+ * messages.
  */
 static
-bt_lttng_live_iterator_status lttng_live_get_session(
-               struct lttng_live_component *lttng_live,
+enum lttng_live_iterator_status lttng_live_get_session(
+               struct lttng_live_msg_iter *lttng_live_msg_iter,
                struct lttng_live_session *session)
 {
-       bt_lttng_live_iterator_status status;
-       struct lttng_live_trace *trace, *t;
+       enum lttng_live_iterator_status status;
+       uint64_t trace_idx;
+       int ret = 0;
 
-       if (lttng_live_attach_session(session)) {
-               if (lttng_live_is_canceled(lttng_live)) {
-                       return BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
-               } else {
-                       return BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+       if (!session->attached) {
+               ret = lttng_live_attach_session(session);
+               if (ret) {
+                       if (lttng_live_msg_iter && lttng_live_is_canceled(
+                                               lttng_live_msg_iter->lttng_live_comp)) {
+                               status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+                       } else {
+                               status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+                       }
+                       goto end;
                }
        }
+
        status = lttng_live_get_new_streams(session);
-       if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK &&
-                       status != BT_LTTNG_LIVE_ITERATOR_STATUS_END) {
-               return status;
+       if (status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
+                       status != LTTNG_LIVE_ITERATOR_STATUS_END) {
+               goto end;
        }
-       bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
+       for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
+               struct lttng_live_trace *trace =
+                       g_ptr_array_index(session->traces, trace_idx);
+
                status = lttng_live_metadata_update(trace);
-               if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK &&
-                               status != BT_LTTNG_LIVE_ITERATOR_STATUS_END) {
-                       return status;
+               if (status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
+                               status != LTTNG_LIVE_ITERATOR_STATUS_END) {
+                       goto end;
                }
        }
-       return lttng_live_lazy_msg_init(session);
+       status = lttng_live_lazy_msg_init(session);
+
+end:
+       return status;
 }
 
 BT_HIDDEN
-void lttng_live_need_new_streams(struct lttng_live_component *lttng_live)
+void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter)
 {
-       struct lttng_live_session *session;
+       uint64_t session_idx;
 
-       bt_list_for_each_entry(session, &lttng_live->sessions, node) {
+       for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
+                       session_idx++) {
+               struct lttng_live_session *session =
+                       g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
                session->new_streams_needed = true;
        }
 }
 
 static
-void lttng_live_force_new_streams_and_metadata(struct lttng_live_component *lttng_live)
+void lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
 {
-       struct lttng_live_session *session;
-
-       bt_list_for_each_entry(session, &lttng_live->sessions, node) {
-               struct lttng_live_trace *trace;
+       uint64_t session_idx, trace_idx;
 
+       for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
+                       session_idx++) {
+               struct lttng_live_session *session =
+                       g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
                session->new_streams_needed = true;
-               bt_list_for_each_entry(trace, &session->traces, node) {
+               for (trace_idx = 0; trace_idx < session->traces->len;
+                               trace_idx++) {
+                       struct lttng_live_trace *trace =
+                               g_ptr_array_index(session->traces, trace_idx);
                        trace->new_metadata_needed = true;
                }
        }
 }
 
 static
-bt_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
-               struct lttng_live_component *lttng_live)
+enum lttng_live_iterator_status
+lttng_live_iterator_handle_new_streams_and_metadata(
+               struct lttng_live_msg_iter *lttng_live_msg_iter)
 {
-       bt_lttng_live_iterator_status ret =
-                       BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
-       unsigned int nr_sessions_opened = 0;
-       struct lttng_live_session *session, *s;
-
-       bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
-               if (session->closed && bt_list_empty(&session->traces)) {
-                       lttng_live_destroy_session(session);
-               }
-       }
+       enum lttng_live_iterator_status ret =
+                       LTTNG_LIVE_ITERATOR_STATUS_OK;
+       uint64_t session_idx = 0, nr_sessions_opened = 0;
+       struct lttng_live_session *session;
+       enum session_not_found_action sess_not_found_act =
+               lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
+
        /*
-        * Currently, when there are no sessions, we quit immediately.
-        * We may want to add a component parameter to keep trying until
-        * we get data in the future.
-        * Also, in a remotely distant future, we could add a "new
+        * In a remotely distant future, we could add a "new
         * session" flag to the protocol, which would tell us that we
         * need to query for new sessions even though we have sessions
         * currently ongoing.
         */
-       if (bt_list_empty(&lttng_live->sessions)) {
-               ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
-               goto end;
+       if (lttng_live_msg_iter->sessions->len == 0) {
+               if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
+                       ret = LTTNG_LIVE_ITERATOR_STATUS_END;
+                       goto end;
+               } else {
+                       /*
+                        * Retry to create a viewer session for the requested
+                        * session name.
+                        */
+                       if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
+                               ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+                               goto end;
+                       }
+               }
        }
-       bt_list_for_each_entry(session, &lttng_live->sessions, node) {
-               ret = lttng_live_get_session(lttng_live, session);
+
+       for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
+                       session_idx++) {
+               session = g_ptr_array_index(lttng_live_msg_iter->sessions,
+                               session_idx);
+               ret = lttng_live_get_session(lttng_live_msg_iter, session);
                switch (ret) {
-               case BT_LTTNG_LIVE_ITERATOR_STATUS_OK:
+               case LTTNG_LIVE_ITERATOR_STATUS_OK:
                        break;
-               case BT_LTTNG_LIVE_ITERATOR_STATUS_END:
-                       ret = BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+               case LTTNG_LIVE_ITERATOR_STATUS_END:
+                       ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
                        break;
                default:
                        goto end;
@@ -546,147 +539,257 @@ bt_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_me
                }
        }
 end:
-       if (ret == BT_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) {
-               ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
+       if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK &&
+                       sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE &&
+                       nr_sessions_opened == 0) {
+               ret = LTTNG_LIVE_ITERATOR_STATUS_END;
        }
        return ret;
 }
 
 static
-bt_lttng_live_iterator_status emit_inactivity_message(
-               struct lttng_live_component *lttng_live,
-               struct lttng_live_stream_iterator *lttng_live_stream,
-               const bt_message **message,
-               uint64_t timestamp)
+enum lttng_live_iterator_status emit_inactivity_message(
+               struct lttng_live_msg_iter *lttng_live_msg_iter,
+               struct lttng_live_stream_iterator *stream_iter,
+               bt_message **message, uint64_t timestamp)
 {
-       bt_lttng_live_iterator_status ret =
-                       BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
-       struct lttng_live_trace *trace;
-       const bt_clock_class *clock_class = NULL;
-       bt_clock_snapshot *clock_snapshot = NULL;
-       const bt_message *msg = NULL;
-       int retval;
+       enum lttng_live_iterator_status ret =
+                       LTTNG_LIVE_ITERATOR_STATUS_OK;
+       bt_message *msg = NULL;
 
-       trace = lttng_live_stream->trace;
-       if (!trace) {
-               goto error;
-       }
-       clock_class = bt_clock_class_priority_map_get_clock_class_by_index(trace->cc_prio_map, 0);
-       if (!clock_class) {
-               goto error;
-       }
-       clock_snapshot = bt_clock_snapshot_create(clock_class, timestamp);
-       if (!clock_snapshot) {
-               goto error;
-       }
-       msg = bt_message_inactivity_create(trace->cc_prio_map);
+       BT_ASSERT(stream_iter->trace->clock_class);
+
+       msg = bt_message_message_iterator_inactivity_create(
+                       lttng_live_msg_iter->self_msg_iter,
+                       stream_iter->trace->clock_class,
+                       timestamp);
        if (!msg) {
                goto error;
        }
-       retval = bt_message_inactivity_set_clock_snapshot(msg, clock_snapshot);
-       if (retval) {
-               goto error;
-       }
+
        *message = msg;
 end:
-       bt_object_put_ref(clock_snapshot);
-       bt_clock_class_put_ref(clock_class);
        return ret;
 
 error:
-       ret = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+       ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
        bt_message_put_ref(msg);
        goto end;
 }
 
 static
-bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
-               struct lttng_live_component *lttng_live,
+enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
+               struct lttng_live_msg_iter *lttng_live_msg_iter,
                struct lttng_live_stream_iterator *lttng_live_stream,
-               const bt_message **message)
+               bt_message **message)
 {
-       bt_lttng_live_iterator_status ret =
-                       BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
-       const bt_clock_class *clock_class = NULL;
-       bt_clock_snapshot *clock_snapshot = NULL;
+       enum lttng_live_iterator_status ret =
+                       LTTNG_LIVE_ITERATOR_STATUS_OK;
 
        if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
-               return BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+               return LTTNG_LIVE_ITERATOR_STATUS_OK;
        }
 
-       if (lttng_live_stream->current_inactivity_timestamp ==
-                       lttng_live_stream->last_returned_inactivity_timestamp) {
+       if (lttng_live_stream->current_inactivity_ts ==
+                       lttng_live_stream->last_inactivity_ts) {
                lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
-               ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+               ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+               goto end;
+       }
+
+       ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream,
+                       message, lttng_live_stream->current_inactivity_ts);
+
+       lttng_live_stream->last_inactivity_ts =
+                       lttng_live_stream->current_inactivity_ts;
+end:
+       return ret;
+}
+
+static
+int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter,
+               struct lttng_live_msg_iter *lttng_live_msg_iter,
+               const bt_message *msg, int64_t last_msg_ts_ns,
+               int64_t *ts_ns)
+{
+       const bt_clock_class *clock_class = NULL;
+       const bt_clock_snapshot *clock_snapshot = NULL;
+       int ret = 0;
+       bt_clock_snapshot_state cs_state = BT_CLOCK_SNAPSHOT_STATE_KNOWN;
+       bt_message_stream_activity_clock_snapshot_state sa_cs_state;
+
+       BT_ASSERT(msg);
+       BT_ASSERT(ts_ns);
+
+       BT_LOGV("Getting message's timestamp: iter-data-addr=%p, msg-addr=%p, "
+               "last-msg-ts=%" PRId64, lttng_live_msg_iter, msg,
+               last_msg_ts_ns);
+
+       switch (bt_message_get_type(msg)) {
+       case BT_MESSAGE_TYPE_EVENT:
+               clock_class =
+                       bt_message_event_borrow_stream_class_default_clock_class_const(
+                               msg);
+               BT_ASSERT(clock_class);
+
+               cs_state = bt_message_event_borrow_default_clock_snapshot_const(
+                       msg, &clock_snapshot);
+               break;
+       case BT_MESSAGE_TYPE_PACKET_BEGINNING:
+               clock_class =
+                       bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
+                       msg);
+               BT_ASSERT(clock_class);
+
+               cs_state = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
+                       msg, &clock_snapshot);
+               break;
+       case BT_MESSAGE_TYPE_PACKET_END:
+               clock_class =
+                       bt_message_packet_end_borrow_stream_class_default_clock_class_const(
+                       msg);
+               BT_ASSERT(clock_class);
+
+               cs_state = bt_message_packet_end_borrow_default_clock_snapshot_const(
+                       msg, &clock_snapshot);
+               break;
+       case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
+               clock_class =
+                       bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
+                       msg);
+               BT_ASSERT(clock_class);
+
+               cs_state = bt_message_discarded_events_borrow_default_beginning_clock_snapshot_const(
+                       msg, &clock_snapshot);
+               break;
+       case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
+               clock_class =
+                       bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
+                       msg);
+               BT_ASSERT(clock_class);
+
+               cs_state = bt_message_discarded_packets_borrow_default_beginning_clock_snapshot_const(
+                       msg, &clock_snapshot);
+               break;
+       case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
+               clock_class =
+                       bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const(
+                       msg);
+               BT_ASSERT(clock_class);
+
+               sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
+                       msg, &clock_snapshot);
+               if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
+                       goto no_clock_snapshot;
+               }
+
+               break;
+       case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
+               clock_class =
+                       bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const(
+                       msg);
+               BT_ASSERT(clock_class);
+
+               sa_cs_state = bt_message_stream_activity_end_borrow_default_clock_snapshot_const(
+                       msg, &clock_snapshot);
+               if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
+                       goto no_clock_snapshot;
+               }
+
+               break;
+       case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
+               cs_state =
+                       bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
+                               msg, &clock_snapshot);
+               break;
+       default:
+               /* All the other messages have a higher priority */
+               BT_LOGV_STR("Message has no timestamp: using the last message timestamp.");
+               *ts_ns = last_msg_ts_ns;
                goto end;
        }
 
-       ret = emit_inactivity_message(lttng_live, lttng_live_stream, message,
-                       (uint64_t) lttng_live_stream->current_inactivity_timestamp);
+       if (cs_state != BT_CLOCK_SNAPSHOT_STATE_KNOWN) {
+               BT_LOGE_STR("Unsupported unknown clock snapshot.");
+               ret = -1;
+               goto end;
+       }
+
+       clock_class = bt_clock_snapshot_borrow_clock_class_const(clock_snapshot);
+       BT_ASSERT(clock_class);
+
+       ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
+       if (ret) {
+               BT_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: "
+                       "clock-snapshot-addr=%p", clock_snapshot);
+               goto error;
+       }
+
+       goto end;
+
+no_clock_snapshot:
+       BT_LOGV_STR("Message's default clock snapshot is missing: "
+               "using the last message timestamp.");
+       *ts_ns = last_msg_ts_ns;
+       goto end;
+
+error:
+       ret = -1;
 
-       lttng_live_stream->last_returned_inactivity_timestamp =
-                       lttng_live_stream->current_inactivity_timestamp;
 end:
-       bt_object_put_ref(clock_snapshot);
-       bt_clock_class_put_ref(clock_class);
+       if (ret == 0) {
+               BT_LOGV("Found message's timestamp: "
+                       "iter-data-addr=%p, msg-addr=%p, "
+                       "last-msg-ts=%" PRId64 ", ts=%" PRId64,
+                       lttng_live_msg_iter, msg, last_msg_ts_ns, *ts_ns);
+       }
+
        return ret;
 }
 
 static
-bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
-               struct lttng_live_component *lttng_live,
+enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
+               struct lttng_live_msg_iter *lttng_live_msg_iter,
                struct lttng_live_stream_iterator *lttng_live_stream,
-               const bt_message **message)
+               bt_message **message)
 {
-       bt_lttng_live_iterator_status ret =
-                       BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+       enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
        enum bt_msg_iter_status status;
-       struct lttng_live_session *session;
+       uint64_t session_idx, trace_idx;
 
-       bt_list_for_each_entry(session, &lttng_live->sessions, node) {
-               struct lttng_live_trace *trace;
+       for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
+                       session_idx++) {
+               struct lttng_live_session *session =
+                       g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
 
                if (session->new_streams_needed) {
-                       return BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+                       ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+                       goto end;
                }
-               bt_list_for_each_entry(trace, &session->traces, node) {
+               for (trace_idx = 0; trace_idx < session->traces->len;
+                               trace_idx++) {
+                       struct lttng_live_trace *trace =
+                               g_ptr_array_index(session->traces, trace_idx);
                        if (trace->new_metadata_needed) {
-                               return BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+                               ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+                               goto end;
                        }
                }
        }
 
        if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
-               return BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
-       }
-       if (lttng_live_stream->packet_end_msg_queue) {
-               *message = lttng_live_stream->packet_end_msg_queue;
-               lttng_live_stream->packet_end_msg_queue = NULL;
-               status = BT_MSG_ITER_STATUS_OK;
-       } else {
-               status = bt_msg_iter_get_next_message(
-                               lttng_live_stream->msg_iter,
-                               lttng_live_stream->trace->cc_prio_map,
-                               message);
-               if (status == BT_MSG_ITER_STATUS_OK) {
-                       /*
-                        * Consider empty packets as inactivity.
-                        */
-                       if (bt_message_get_type(*message) == BT_MESSAGE_TYPE_PACKET_END) {
-                               lttng_live_stream->packet_end_msg_queue = *message;
-                               *message = NULL;
-                               return emit_inactivity_message(lttng_live,
-                                               lttng_live_stream, message,
-                                               lttng_live_stream->current_packet_end_timestamp);
-                       }
-               }
+               ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+               goto end;
        }
+
+       status = bt_msg_iter_get_next_message(lttng_live_stream->msg_iter,
+               lttng_live_msg_iter->self_msg_iter, message);
        switch (status) {
        case BT_MSG_ITER_STATUS_EOF:
-               ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
+               ret = LTTNG_LIVE_ITERATOR_STATUS_END;
                break;
        case BT_MSG_ITER_STATUS_OK:
-               ret = BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+               ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
                break;
        case BT_MSG_ITER_STATUS_AGAIN:
                /*
@@ -694,15 +797,19 @@ bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_st
                 * get_index may return AGAIN to delay the following
                 * attempt.
                 */
-               ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+               ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
                break;
        case BT_MSG_ITER_STATUS_INVAL:
                /* No argument provided by the user, so don't return INVAL. */
        case BT_MSG_ITER_STATUS_ERROR:
        default:
-               ret = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+               ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+               BT_LOGW("CTF msg iterator return an error or failed msg_iter=%p",
+                       lttng_live_stream->msg_iter);
                break;
        }
+
+end:
        return ret;
 }
 
@@ -755,303 +862,626 @@ bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_st
  * When disconnected from relayd: try to re-connect endlessly.
  */
 static
-bt_message_iterator_next_method_return lttng_live_iterator_next_stream(
-               bt_self_message_iterator *iterator,
-               struct lttng_live_stream_iterator *stream_iter)
+enum lttng_live_iterator_status lttng_live_iterator_next_on_stream(
+               struct lttng_live_msg_iter *lttng_live_msg_iter,
+               struct lttng_live_stream_iterator *stream_iter,
+               bt_message **curr_msg)
 {
-       bt_lttng_live_iterator_status status;
-       bt_message_iterator_next_method_return next_return;
-       struct lttng_live_component *lttng_live;
+       enum lttng_live_iterator_status live_status;
 
-       lttng_live = stream_iter->trace->session->lttng_live;
 retry:
        print_stream_state(stream_iter);
-       next_return.message = NULL;
-       status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
-       if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+       live_status = lttng_live_iterator_handle_new_streams_and_metadata(
+                       lttng_live_msg_iter);
+       if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
                goto end;
        }
-       status = lttng_live_iterator_next_handle_one_no_data_stream(
-                       lttng_live, stream_iter);
-       if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+       live_status = lttng_live_iterator_next_handle_one_no_data_stream(
+                       lttng_live_msg_iter, stream_iter);
+       if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
                goto end;
        }
-       status = lttng_live_iterator_next_handle_one_quiescent_stream(
-                       lttng_live, stream_iter, &next_return.message);
-       if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
-               BT_ASSERT(next_return.message == NULL);
+       live_status = lttng_live_iterator_next_handle_one_quiescent_stream(
+                       lttng_live_msg_iter, stream_iter, curr_msg);
+       if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
+               BT_ASSERT(*curr_msg == NULL);
                goto end;
        }
-       if (next_return.message) {
+       if (*curr_msg) {
                goto end;
        }
-       status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live,
-                       stream_iter, &next_return.message);
-       if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
-               BT_ASSERT(next_return.message == NULL);
+       live_status = lttng_live_iterator_next_handle_one_active_data_stream(
+                       lttng_live_msg_iter, stream_iter, curr_msg);
+       if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
+               BT_ASSERT(*curr_msg == NULL);
        }
 
 end:
-       switch (status) {
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
-               print_dbg("continue");
+       if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
                goto retry;
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
-               next_return.status = BT_MESSAGE_ITERATOR_STATUS_AGAIN;
-               print_dbg("again");
-               break;
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_END:
-               next_return.status = BT_MESSAGE_ITERATOR_STATUS_END;
-               print_dbg("end");
-               break;
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_OK:
-               next_return.status = BT_MESSAGE_ITERATOR_STATUS_OK;
-               print_dbg("ok");
-               break;
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
-               next_return.status = BT_MESSAGE_ITERATOR_STATUS_INVALID;
-               break;
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
-               next_return.status = BT_MESSAGE_ITERATOR_STATUS_NOMEM;
-               break;
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
-               next_return.status = BT_MESSAGE_ITERATOR_STATUS_UNSUPPORTED;
-               break;
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
-       default:        /* fall-through */
-               next_return.status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
-               break;
        }
-       return next_return;
+
+       return live_status;
 }
 
 static
-bt_message_iterator_next_method_return lttng_live_iterator_next_no_stream(
-               bt_self_message_iterator *iterator,
-               struct lttng_live_no_stream_iterator *no_stream_iter)
+enum lttng_live_iterator_status next_stream_iterator_for_trace(
+               struct lttng_live_msg_iter *lttng_live_msg_iter,
+               struct lttng_live_trace *live_trace,
+               struct lttng_live_stream_iterator **candidate_stream_iter)
 {
-       bt_lttng_live_iterator_status status;
-       bt_message_iterator_next_method_return next_return;
-       struct lttng_live_component *lttng_live;
+       struct lttng_live_stream_iterator *curr_candidate_stream_iter = NULL;
+       enum lttng_live_iterator_status stream_iter_status;;
+       int64_t curr_candidate_msg_ts = INT64_MAX;
+       uint64_t stream_iter_idx;
 
-       lttng_live = no_stream_iter->lttng_live;
-retry:
-       lttng_live_force_new_streams_and_metadata(lttng_live);
-       next_return.message = NULL;
-       status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
-       if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+       BT_ASSERT(live_trace);
+       BT_ASSERT(live_trace->stream_iterators);
+       /*
+        * Update the current message of every stream iterators of this trace.
+        * The current msg of every stream must have a timestamp equal or
+        * larger than the last message returned by this iterator. We must
+        * ensure monotonicity.
+        */
+       stream_iter_idx = 0;
+       while (stream_iter_idx < live_trace->stream_iterators->len) {
+               bool stream_iter_is_ended = false;
+               struct lttng_live_stream_iterator *stream_iter =
+                       g_ptr_array_index(live_trace->stream_iterators,
+                                       stream_iter_idx);
+
+               /*
+                * Find if there is are now current message for this stream
+                * iterator get it.
+                */
+               while (!stream_iter->current_msg) {
+                       bt_message *msg = NULL;
+                       int64_t curr_msg_ts_ns = INT64_MAX;
+                       stream_iter_status = lttng_live_iterator_next_on_stream(
+                                       lttng_live_msg_iter, stream_iter, &msg);
+
+                       BT_LOGD("live stream iterator returned status :%s",
+                                       print_live_iterator_status(stream_iter_status));
+                       if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
+                               stream_iter_is_ended = true;
+                               break;
+                       }
+
+                       if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
+                               goto end;
+                       }
+
+                       BT_ASSERT(msg);
+
+                       /*
+                        * Get the timestamp in nanoseconds from origin of this
+                        * messsage.
+                        */
+                       live_get_msg_ts_ns(stream_iter, lttng_live_msg_iter,
+                               msg, lttng_live_msg_iter->last_msg_ts_ns,
+                               &curr_msg_ts_ns);
+
+                       /*
+                        * Check if the message of the current live stream
+                        * iterator occured at the exact same time or after the
+                        * last message returned by this component's message
+                        * iterator. If not, we return an error.
+                        */
+                       if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
+                               stream_iter->current_msg = msg;
+                               stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
+                       } else {
+                               /*
+                                * We received a message in the past. To ensure
+                                * monotonicity, we can't send it forward.
+                                */
+                               BT_LOGE("Message's timestamp is less than "
+                                       "lttng-live's message iterator's last "
+                                       "returned timestamp: "
+                                       "lttng-live-msg-iter-addr=%p, ts=%" PRId64 ", "
+                                       "last-msg-ts=%" PRId64,
+                                       lttng_live_msg_iter, curr_msg_ts_ns,
+                                       lttng_live_msg_iter->last_msg_ts_ns);
+                               stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+                               goto end;
+                       }
+               }
+
+               if (!stream_iter_is_ended &&
+                               stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) {
+                       /*
+                        * Update the current best candidate message for the
+                        * stream iterator of thise live trace to be forwarded
+                        * downstream.
+                        */
+                       curr_candidate_msg_ts = stream_iter->current_msg_ts_ns;
+                       curr_candidate_stream_iter = stream_iter;
+               }
+
+               if (stream_iter_is_ended) {
+                       /*
+                        * The live stream iterator is ENDed. We remove that
+                        * iterator from the list and we restart the iteration
+                        * at the beginning of the live stream iterator array
+                        * to because the removal will shuffle the array.
+                        */
+                       g_ptr_array_remove_index_fast(live_trace->stream_iterators,
+                               stream_iter_idx);
+                       stream_iter_idx = 0;
+               } else {
+                       stream_iter_idx++;
+               }
+       }
+
+       if (curr_candidate_stream_iter) {
+               *candidate_stream_iter = curr_candidate_stream_iter;
+               stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
+       } else {
+               /*
+                * The only case where we don't have a candidate for this trace
+                * is if we reached the end of all the iterators.
+                */
+               BT_ASSERT(live_trace->stream_iterators->len == 0);
+               stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
+       }
+
+end:
+       return stream_iter_status;
+}
+
+static
+enum lttng_live_iterator_status next_stream_iterator_for_session(
+               struct lttng_live_msg_iter *lttng_live_msg_iter,
+               struct lttng_live_session *session,
+               struct lttng_live_stream_iterator **candidate_session_stream_iter)
+{
+       enum lttng_live_iterator_status stream_iter_status;
+       uint64_t trace_idx = 0;
+       int64_t curr_candidate_msg_ts = INT64_MAX;
+       struct lttng_live_stream_iterator *curr_candidate_stream_iter = NULL;
+
+       /*
+        * Make sure we are attached to the session and look for new streams
+        * and metadata.
+        */
+       stream_iter_status = lttng_live_get_session(lttng_live_msg_iter, session);
+       if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
+                       stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE &&
+                       stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) {
                goto end;
        }
-       if (no_stream_iter->port) {
-               status = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+
+       BT_ASSERT(session->traces);
+
+       /*
+        * Use while loops here rather then for loops so we can restart the
+        * iteration if an element is removed from the array during the
+        * looping.
+        */
+       while (trace_idx < session->traces->len) {
+               bool trace_is_ended = false;
+               struct lttng_live_stream_iterator *stream_iter;
+               struct lttng_live_trace *trace =
+                       g_ptr_array_index(session->traces, trace_idx);
+
+               stream_iter_status = next_stream_iterator_for_trace(
+                       lttng_live_msg_iter, trace, &stream_iter);
+               if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
+                       /*
+                        * All the live stream iterators for this trace are
+                        * ENDed. Remove the trace from this session.
+                        */
+                       trace_is_ended = true;
+               } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
+                       goto end;
+               }
+
+               if (!trace_is_ended) {
+                       BT_ASSERT(stream_iter);
+
+                       if (stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) {
+                               curr_candidate_msg_ts = stream_iter->current_msg_ts_ns;
+                               curr_candidate_stream_iter = stream_iter;
+                       }
+                       trace_idx++;
+               } else {
+                       g_ptr_array_remove_index_fast(session->traces, trace_idx);
+                       trace_idx = 0;
+               }
+       }
+       if (curr_candidate_stream_iter) {
+               *candidate_session_stream_iter = curr_candidate_stream_iter;
+               stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
        } else {
-               status = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
+               /*
+                * The only cases where we don't have a candidate for this
+                * trace is:
+                *  1. if we reached the end of all the iterators of all the
+                *  traces of this session,
+                *  2. if we never had live stream iterator in the first place.
+                *
+                * In either cases, we return END.
+                */
+               BT_ASSERT(session->traces->len == 0);
+               stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
        }
 end:
-       switch (status) {
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
-               goto retry;
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
-               next_return.status = BT_MESSAGE_ITERATOR_STATUS_AGAIN;
-               break;
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_END:
-               next_return.status = BT_MESSAGE_ITERATOR_STATUS_END;
-               break;
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
-               next_return.status = BT_MESSAGE_ITERATOR_STATUS_INVALID;
-               break;
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
-               next_return.status = BT_MESSAGE_ITERATOR_STATUS_NOMEM;
-               break;
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
-               next_return.status = BT_MESSAGE_ITERATOR_STATUS_UNSUPPORTED;
-               break;
-       case BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
-       default:        /* fall-through */
-               next_return.status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
-               break;
+       return stream_iter_status;
+}
+
+static inline
+void put_messages(bt_message_array_const msgs, uint64_t count)
+{
+       uint64_t i;
+
+       for (i = 0; i < count; i++) {
+               BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
        }
-       return next_return;
 }
 
 BT_HIDDEN
-bt_message_iterator_next_method_return lttng_live_iterator_next(
-               bt_self_message_iterator *iterator)
+bt_self_message_iterator_status lttng_live_msg_iter_next(
+               bt_self_message_iterator *self_msg_it,
+               bt_message_array_const msgs, uint64_t capacity,
+               uint64_t *count)
 {
-       struct lttng_live_stream_iterator_generic *s =
-                       bt_self_message_iterator_get_user_data(iterator);
-       bt_message_iterator_next_method_return next_return;
-
-       switch (s->type) {
-       case LIVE_STREAM_TYPE_NO_STREAM:
-               next_return = lttng_live_iterator_next_no_stream(iterator,
-                       container_of(s, struct lttng_live_no_stream_iterator, p));
+       bt_self_message_iterator_status status;
+       struct lttng_live_msg_iter *lttng_live_msg_iter =
+               bt_self_message_iterator_get_data(self_msg_it);
+       struct lttng_live_component *lttng_live =
+               lttng_live_msg_iter->lttng_live_comp;
+       enum lttng_live_iterator_status stream_iter_status;
+       uint64_t session_idx;
+
+       *count = 0;
+
+       BT_ASSERT(lttng_live_msg_iter);
+
+       /*
+        * Clear all the invalid message reference that might be left over in
+        * the output array.
+        */
+       memset(msgs, 0, capacity * sizeof(*msgs));
+
+       /*
+        * If no session are exposed on the relay found at the url provided by
+        * the user, session count will be 0. In this case, we return status
+        * end to return gracefully.
+        */
+       if (lttng_live_msg_iter->sessions->len == 0) {
+               if (lttng_live->params.sess_not_found_act !=
+                               SESSION_NOT_FOUND_ACTION_CONTINUE) {
+                       status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
+                       goto no_session;
+               } else {
+                       /*
+                        * The are no more active session for this session
+                        * name. Retry to create a viewer session for the
+                        * requested session name.
+                        */
+                       if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
+                               status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+                               goto no_session;
+                       }
+               }
+       }
+
+       if (lttng_live_msg_iter->active_stream_iter == 0) {
+               lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter);
+       }
+
+       /*
+        * Here the muxing of message is done.
+        *
+        * We need to iterate over all the streams of all the traces of all the
+        * viewer sessions in order to get the message with the smallest
+        * timestamp. In this case, a session is a viewer session and there is
+        * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
+        * kernel). Each viewer session can have multiple traces, for example,
+        * 64bit UST viewer sessions could have multiple per-pid traces.
+        *
+        * We iterate over the streams of each traces to update and see what is
+        * their next message's timestamp. From those timestamps, we select the
+        * message with the smallest timestamp as the best candidate message
+        * for that trace and do the same thing across all the sessions.
+        *
+        * We then compare the timestamp of best candidate message of all the
+        * sessions to pick the message with the smallest timestamp and we
+        * return it.
+        */
+       while (*count < capacity) {
+               struct lttng_live_stream_iterator *next_stream_iter = NULL,
+                                                 *candidate_stream_iter = NULL;
+               int64_t next_msg_ts_ns = INT64_MAX;
+
+               BT_ASSERT(lttng_live_msg_iter->sessions);
+               session_idx = 0;
+               /*
+                * Use a while loop instead of a for loop so we can restart the
+                * iteration if we remove an element. We can safely call
+                * next_stream_iterator_for_session() multiple times on the
+                * same session as we only fetch a new message if there is no
+                * current next message for each live stream iterator.
+                * If all live stream iterator of that session already have a
+                * current next message, the function will simply exit return
+                * the same candidate live stream iterator every time.
+                */
+               while (session_idx < lttng_live_msg_iter->sessions->len) {
+                       struct lttng_live_session *session =
+                               g_ptr_array_index(lttng_live_msg_iter->sessions,
+                                       session_idx);
+
+                       /* Find the best candidate message to send downstream. */
+                       stream_iter_status = next_stream_iterator_for_session(
+                               lttng_live_msg_iter, session,
+                               &candidate_stream_iter);
+
+                       /* If we receive an END status, it means that either:
+                        * - Those traces never had active streams (UST with no
+                        *   data produced yet),
+                        * - All live stream iterators have ENDed.*/
+                       if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
+                               if (session->closed && session->traces->len == 0) {
+                                       /*
+                                        * Remove the session from the list and restart the
+                                        * iteration at the beginning of the array since the
+                                        * removal shuffle the elements of the array.
+                                        */
+                                       g_ptr_array_remove_index_fast(
+                                               lttng_live_msg_iter->sessions,
+                                               session_idx);
+                                       session_idx = 0;
+                               } else {
+                                       session_idx++;
+                               }
+                               continue;
+                       }
+
+                       if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
+                               goto end;
+                       }
+
+                       if (candidate_stream_iter->current_msg_ts_ns <= next_msg_ts_ns) {
+                               next_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
+                               next_stream_iter = candidate_stream_iter;
+                       }
+
+                       session_idx++;
+               }
+
+               if (!next_stream_iter) {
+                       stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+                       goto end;
+               }
+
+               BT_ASSERT(next_stream_iter->current_msg);
+               /* Ensure monotonicity. */
+               BT_ASSERT(lttng_live_msg_iter->last_msg_ts_ns <=
+                       next_stream_iter->current_msg_ts_ns);
+
+               /*
+                * Insert the next message to the message batch. This will set
+                * stream iterator current messsage to NULL so that next time
+                * we fetch the next message of that stream iterator
+                */
+               BT_MESSAGE_MOVE_REF(msgs[*count], next_stream_iter->current_msg);
+               (*count)++;
+
+               /* Update the last timestamp in nanoseconds sent downstream. */
+               lttng_live_msg_iter->last_msg_ts_ns = next_msg_ts_ns;
+               next_stream_iter->current_msg_ts_ns = INT64_MAX;
+
+               stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
+       }
+end:
+       switch (stream_iter_status) {
+       case LTTNG_LIVE_ITERATOR_STATUS_OK:
+       case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
+               if (*count > 0) {
+                       /*
+                        * We received a again status but we have some messages
+                        * to send downstream. We send them and return OK for
+                        * now. On the next call we return again if there are
+                        * still no new message to send.
+                        */
+                       status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
+               } else {
+                       status = BT_SELF_MESSAGE_ITERATOR_STATUS_AGAIN;
+               }
                break;
-       case LIVE_STREAM_TYPE_STREAM:
-               next_return = lttng_live_iterator_next_stream(iterator,
-                       container_of(s, struct lttng_live_stream_iterator, p));
+       case LTTNG_LIVE_ITERATOR_STATUS_END:
+               status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
                break;
-       default:
-               next_return.status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
+       case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
+               status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
+               break;
+       case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
+       case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
+       case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
+               status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+               /* Put all existing messages on error. */
+               put_messages(msgs, *count);
                break;
+       default:
+               abort();
        }
-       return next_return;
+
+no_session:
+       return status;
 }
 
 BT_HIDDEN
-bt_message_iterator_status lttng_live_iterator_init(
-               bt_self_message_iterator *it,
-               struct bt_private_port *port)
+bt_self_message_iterator_status lttng_live_msg_iter_init(
+               bt_self_message_iterator *self_msg_it,
+               bt_self_component_source *self_comp_src,
+               bt_self_component_port_output *self_port)
 {
-       bt_message_iterator_status ret =
-                       BT_MESSAGE_ITERATOR_STATUS_OK;
-       struct lttng_live_stream_iterator_generic *s;
-
-       BT_ASSERT(it);
-
-       s = bt_private_port_get_user_data(port);
-       BT_ASSERT(s);
-       switch (s->type) {
-       case LIVE_STREAM_TYPE_NO_STREAM:
-       {
-               struct lttng_live_no_stream_iterator *no_stream_iter =
-                       container_of(s, struct lttng_live_no_stream_iterator, p);
-               ret = bt_self_message_iterator_set_user_data(it, no_stream_iter);
-               if (ret) {
-                       goto error;
-               }
-               break;
+       bt_self_message_iterator_status ret =
+                       BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
+       bt_self_component *self_comp =
+                       bt_self_component_source_as_self_component(self_comp_src);
+       struct lttng_live_component *lttng_live;
+       struct lttng_live_msg_iter *lttng_live_msg_iter;
+
+       BT_ASSERT(self_msg_it);
+
+       lttng_live = bt_self_component_get_data(self_comp);
+
+       /* There can be only one downstream iterator at the same time. */
+       BT_ASSERT(!lttng_live->has_msg_iter);
+       lttng_live->has_msg_iter = true;
+
+       lttng_live_msg_iter = g_new0(struct lttng_live_msg_iter, 1);
+       if (!lttng_live_msg_iter) {
+               ret = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+               goto end;
        }
-       case LIVE_STREAM_TYPE_STREAM:
-       {
-               struct lttng_live_stream_iterator *stream_iter =
-                       container_of(s, struct lttng_live_stream_iterator, p);
-               ret = bt_self_message_iterator_set_user_data(it, stream_iter);
-               if (ret) {
+
+       lttng_live_msg_iter->lttng_live_comp = lttng_live;
+       lttng_live_msg_iter->self_msg_iter = self_msg_it;
+
+       lttng_live_msg_iter->active_stream_iter = 0;
+       lttng_live_msg_iter->last_msg_ts_ns = INT64_MIN;
+       lttng_live_msg_iter->sessions = g_ptr_array_new_with_free_func(
+               (GDestroyNotify) lttng_live_destroy_session);
+       BT_ASSERT(lttng_live_msg_iter->sessions);
+
+       lttng_live_msg_iter->viewer_connection =
+               live_viewer_connection_create(lttng_live->params.url->str, false,
+                       lttng_live_msg_iter);
+       if (!lttng_live_msg_iter->viewer_connection) {
+               goto error;
+       }
+
+       if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
+               goto error;
+       }
+       if (lttng_live_msg_iter->sessions->len == 0) {
+               switch (lttng_live->params.sess_not_found_act) {
+               case SESSION_NOT_FOUND_ACTION_CONTINUE:
+                       BT_LOGI("Unable to connect to the requested live viewer "
+                               "session. Keep trying to connect because of "
+                               "%s=\"%s\" component parameter: url=\"%s\"",
+                               SESS_NOT_FOUND_ACTION_PARAM,
+                               SESS_NOT_FOUND_ACTION_CONTINUE_STR,
+                               lttng_live->params.url->str);
+                       break;
+               case SESSION_NOT_FOUND_ACTION_FAIL:
+                       BT_LOGE("Unable to connect to the requested live viewer "
+                               "session. Fail the message iterator"
+                               "initialization because of %s=\"%s\" "
+                               "component parameter: url =\"%s\"",
+                               SESS_NOT_FOUND_ACTION_PARAM,
+                               SESS_NOT_FOUND_ACTION_FAIL_STR,
+                               lttng_live->params.url->str);
                        goto error;
+               case SESSION_NOT_FOUND_ACTION_END:
+                       BT_LOGI("Unable to connect to the requested live viewer "
+                               "session. End gracefully at the first _next() "
+                               "call because of %s=\"%s\" component parameter: "
+                               "url=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM,
+                               SESS_NOT_FOUND_ACTION_END_STR,
+                               lttng_live->params.url->str);
+                       break;
                }
-               break;
-       }
-       default:
-               ret = BT_MESSAGE_ITERATOR_STATUS_ERROR;
-               goto end;
        }
 
+       bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
+
+       goto end;
+error:
+       ret = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+       lttng_live_msg_iter_destroy(lttng_live_msg_iter);
 end:
        return ret;
-error:
-       if (bt_self_message_iterator_set_user_data(it, NULL)
-                       != BT_MESSAGE_ITERATOR_STATUS_OK) {
-               BT_LOGE("Error setting private data to NULL");
-       }
-       goto end;
 }
 
 static
-bt_component_class_query_method_return lttng_live_query_list_sessions(
-               const bt_component_class *comp_class,
-               const bt_query_executor *query_exec,
-               bt_value *params)
+bt_query_status lttng_live_query_list_sessions(const bt_value *params,
+               const bt_value **result)
 {
-       bt_component_class_query_method_return query_ret = {
-               .result = NULL,
-               .status = BT_QUERY_STATUS_OK,
-       };
-
-       bt_value *url_value = NULL;
+       bt_query_status status = BT_QUERY_STATUS_OK;
+       const bt_value *url_value = NULL;
        const char *url;
-       struct bt_live_viewer_connection *viewer_connection = NULL;
+       struct live_viewer_connection *viewer_connection = NULL;
 
-       url_value = bt_value_map_get(params, "url");
-       if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) {
-               BT_LOGW("Mandatory \"url\" parameter missing");
-               query_ret.status = BT_QUERY_STATUS_INVALID_PARAMS;
+       url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
+       if (!url_value) {
+               BT_LOGW("Mandatory `%s` parameter missing", URL_PARAM);
+               status = BT_QUERY_STATUS_INVALID_PARAMS;
                goto error;
        }
 
-       if (bt_value_string_get(url_value, &url) != BT_VALUE_STATUS_OK) {
-               BT_LOGW("\"url\" parameter is required to be a string value");
-               query_ret.status = BT_QUERY_STATUS_INVALID_PARAMS;
+       if (!bt_value_is_string(url_value)) {
+               BT_LOGW("`%s` parameter is required to be a string value",
+                       URL_PARAM);
+               status = BT_QUERY_STATUS_INVALID_PARAMS;
                goto error;
        }
 
-       viewer_connection = bt_live_viewer_connection_create(url, NULL);
+       url = bt_value_string_get(url_value);
+
+       viewer_connection = live_viewer_connection_create(url, true, NULL);
        if (!viewer_connection) {
                goto error;
        }
 
-       query_ret.result =
-               bt_live_viewer_connection_list_sessions(viewer_connection);
-       if (!query_ret.result) {
+       status = live_viewer_connection_list_sessions(viewer_connection,
+                       result);
+       if (status != BT_QUERY_STATUS_OK) {
                goto error;
        }
 
        goto end;
 
 error:
-       BT_OBJECT_PUT_REF_AND_RESET(query_ret.result);
+       BT_VALUE_PUT_REF_AND_RESET(*result);
 
-       if (query_ret.status >= 0) {
-               query_ret.status = BT_QUERY_STATUS_ERROR;
+       if (status >= 0) {
+               status = BT_QUERY_STATUS_ERROR;
        }
 
 end:
        if (viewer_connection) {
-               bt_live_viewer_connection_destroy(viewer_connection);
+               live_viewer_connection_destroy(viewer_connection);
        }
-       BT_VALUE_PUT_REF_AND_RESET(url_value);
-       return query_ret;
+       return status;
 }
 
 BT_HIDDEN
-bt_component_class_query_method_return lttng_live_query(
-               const bt_component_class *comp_class,
+bt_query_status lttng_live_query(bt_self_component_class_source *comp_class,
                const bt_query_executor *query_exec,
-               const char *object, bt_value *params)
+               const char *object, const bt_value *params,
+               const bt_value **result)
 {
-       bt_component_class_query_method_return ret = {
-               .result = NULL,
-               .status = BT_QUERY_STATUS_OK,
-       };
+       bt_query_status status = BT_QUERY_STATUS_OK;
 
        if (strcmp(object, "sessions") == 0) {
-               return lttng_live_query_list_sessions(comp_class,
-                       query_exec, params);
+               status = lttng_live_query_list_sessions(params, result);
+       } else {
+               BT_LOGW("Unknown query object `%s`", object);
+               status = BT_QUERY_STATUS_INVALID_OBJECT;
+               goto end;
        }
-       BT_LOGW("Unknown query object `%s`", object);
-       ret.status = BT_QUERY_STATUS_INVALID_OBJECT;
-       return ret;
+
+end:
+       return status;
 }
 
 static
 void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
 {
-       int ret;
-       struct lttng_live_session *session, *s;
-
-       bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
-               lttng_live_destroy_session(session);
-       }
-       BT_OBJECT_PUT_REF_AND_RESET(lttng_live->viewer_connection);
-       if (lttng_live->url) {
-               g_string_free(lttng_live->url, TRUE);
-       }
-       if (lttng_live->no_stream_port) {
-               bt_object_get_ref(lttng_live->no_stream_port);
-               ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
-               bt_object_put_ref(lttng_live->no_stream_port);
-               BT_ASSERT(!ret);
-       }
-       if (lttng_live->no_stream_iter) {
-               g_free(lttng_live->no_stream_iter);
+       if (lttng_live->params.url) {
+               g_string_free(lttng_live->params.url, TRUE);
        }
        g_free(lttng_live);
 }
 
 BT_HIDDEN
-void lttng_live_component_finalize(bt_self_component *component)
+void lttng_live_component_finalize(bt_self_component_source *component)
 {
-       void *data = bt_self_component_get_user_data(component);
+       void *data = bt_self_component_get_data(
+                       bt_self_component_source_as_self_component(component));
 
        if (!data) {
                return;
@@ -1060,41 +1490,71 @@ void lttng_live_component_finalize(bt_self_component *component)
 }
 
 static
-struct lttng_live_component *lttng_live_component_create(bt_value *params,
-               bt_self_component *private_component)
+enum session_not_found_action parse_session_not_found_action_param(
+       const bt_value *no_session_param)
+{
+       enum session_not_found_action action;
+       const char *no_session_act_str;
+       no_session_act_str = bt_value_string_get(no_session_param);
+       if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) {
+               action = SESSION_NOT_FOUND_ACTION_CONTINUE;
+       } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) {
+               action = SESSION_NOT_FOUND_ACTION_FAIL;
+       } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0) {
+               action = SESSION_NOT_FOUND_ACTION_END;
+       } else {
+               action = -1;
+       }
+
+       return action;
+}
+
+struct lttng_live_component *lttng_live_component_create(const bt_value *params)
 {
        struct lttng_live_component *lttng_live;
-       bt_value *value = NULL;
+       const bt_value *value = NULL;
        const char *url;
-       bt_value_status ret;
 
        lttng_live = g_new0(struct lttng_live_component, 1);
        if (!lttng_live) {
                goto end;
        }
-       /* TODO: make this an overridable parameter. */
        lttng_live->max_query_size = MAX_QUERY_SIZE;
-       BT_INIT_LIST_HEAD(&lttng_live->sessions);
-       value = bt_value_map_get(params, "url");
-       if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) {
-               BT_LOGW("Mandatory \"url\" parameter missing");
+       lttng_live->has_msg_iter = false;
+
+       value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
+       if (!value || !bt_value_is_string(value)) {
+               BT_LOGW("Mandatory `%s` parameter missing or not a string",
+                       URL_PARAM);
                goto error;
        }
        url = bt_value_string_get(value);
-       lttng_live->url = g_string_new(url);
-       if (!lttng_live->url) {
-               goto error;
-       }
-       BT_VALUE_PUT_REF_AND_RESET(value);
-       lttng_live->viewer_connection =
-               bt_live_viewer_connection_create(lttng_live->url->str, lttng_live);
-       if (!lttng_live->viewer_connection) {
+       lttng_live->params.url = g_string_new(url);
+       if (!lttng_live->params.url) {
                goto error;
        }
-       if (lttng_live_create_viewer_session(lttng_live)) {
-               goto error;
+
+       value = bt_value_map_borrow_entry_value_const(params,
+               SESS_NOT_FOUND_ACTION_PARAM);
+
+       if (value && bt_value_is_string(value)) {
+               lttng_live->params.sess_not_found_act =
+                       parse_session_not_found_action_param(value);
+               if (lttng_live->params.sess_not_found_act == -1) {
+                       BT_LOGE("Unexpected value for `%s` parameter: "
+                               "value=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM,
+                               bt_value_string_get(value));
+                       goto error;
+               }
+       } else {
+               BT_LOGW("Optional `%s` parameter is missing or "
+                       "not a string value. Defaulting to %s=\"%s\".",
+                       SESS_NOT_FOUND_ACTION_PARAM,
+                       SESS_NOT_FOUND_ACTION_PARAM,
+                       SESS_NOT_FOUND_ACTION_CONTINUE_STR);
+               lttng_live->params.sess_not_found_act =
+                       SESSION_NOT_FOUND_ACTION_CONTINUE;
        }
-       lttng_live->private_component = private_component;
 
        goto end;
 
@@ -1106,84 +1566,35 @@ end:
 }
 
 BT_HIDDEN
-bt_component_status lttng_live_component_init(
-               bt_self_component *private_component,
-               bt_value *params, void *init_method_data)
+bt_self_component_status lttng_live_component_init(
+               bt_self_component_source *self_comp,
+               const bt_value *params, UNUSED_VAR void *init_method_data)
 {
        struct lttng_live_component *lttng_live;
-       bt_component_status ret = BT_COMPONENT_STATUS_OK;
+       bt_self_component_status ret = BT_SELF_COMPONENT_STATUS_OK;
 
-       /* Passes ownership of iter ref to lttng_live_component_create. */
-       lttng_live = lttng_live_component_create(params, private_component);
+       lttng_live = lttng_live_component_create(params);
        if (!lttng_live) {
-               //TODO : we need access to the application cancel state
-               //because we are not part of a graph yet.
-               ret = BT_COMPONENT_STATUS_NOMEM;
+               ret = BT_SELF_COMPONENT_STATUS_NOMEM;
                goto end;
        }
+       lttng_live->self_comp = self_comp;
 
-       lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1);
-       lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM;
-       lttng_live->no_stream_iter->lttng_live = lttng_live;
        if (lttng_live_is_canceled(lttng_live)) {
                goto end;
        }
+
        ret = bt_self_component_source_add_output_port(
-                               lttng_live->private_component, "no-stream",
-                               lttng_live->no_stream_iter,
-                               &lttng_live->no_stream_port);
-       if (ret != BT_COMPONENT_STATUS_OK) {
+                               lttng_live->self_comp, "out",
+                               NULL, NULL);
+       if (ret != BT_SELF_COMPONENT_STATUS_OK) {
                goto end;
        }
-       bt_object_put_ref(lttng_live->no_stream_port);  /* weak */
-       lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
 
-       ret = bt_self_component_set_user_data(private_component, lttng_live);
-       if (ret != BT_COMPONENT_STATUS_OK) {
-               goto error;
-       }
+       bt_self_component_set_data(
+                       bt_self_component_source_as_self_component(self_comp),
+                       lttng_live);
 
 end:
        return ret;
-error:
-       (void) bt_self_component_set_user_data(private_component, NULL);
-       lttng_live_component_destroy_data(lttng_live);
-       return ret;
-}
-
-BT_HIDDEN
-bt_component_status lttng_live_accept_port_connection(
-               bt_self_component *private_component,
-               struct bt_private_port *self_private_port,
-               const bt_port *other_port)
-{
-       struct lttng_live_component *lttng_live =
-                       bt_self_component_get_user_data(private_component);
-       bt_component *other_component;
-       bt_component_status status = BT_COMPONENT_STATUS_OK;
-       const bt_port *self_port = bt_port_from_private(self_private_port);
-
-       other_component = bt_port_get_component(other_port);
-       bt_component_put_ref(other_component);  /* weak */
-
-       if (!lttng_live->downstream_component) {
-               lttng_live->downstream_component = other_component;
-               goto end;
-       }
-
-       /*
-        * Compare prior component to ensure we are connected to the
-        * same downstream component as prior ports.
-        */
-       if (lttng_live->downstream_component != other_component) {
-               BT_LOGW("Cannot connect ctf.lttng-live component port \"%s\" to component \"%s\": already connected to component \"%s\".",
-                       bt_port_get_name(self_port),
-                       bt_component_get_name(other_component),
-                       bt_component_get_name(lttng_live->downstream_component));
-               status = BT_COMPONENT_STATUS_REFUSE_PORT_CONNECTION;
-               goto end;
-       }
-end:
-       bt_port_put_ref(self_port);
-       return status;
 }
diff --git a/plugins/ctf/lttng-live/lttng-live.h b/plugins/ctf/lttng-live/lttng-live.h
new file mode 100644 (file)
index 0000000..6a3614b
--- /dev/null
@@ -0,0 +1,300 @@
+#ifndef BABELTRACE_PLUGIN_CTF_LTTNG_LIVE_H
+#define BABELTRACE_PLUGIN_CTF_LTTNG_LIVE_H
+
+/*
+ * BabelTrace - LTTng-live client Component
+ *
+ * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
+ * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#include <stdbool.h>
+
+#include <babeltrace/babeltrace-internal.h>
+#include <babeltrace/babeltrace.h>
+
+#include "../common/metadata/decoder.h"
+#include "../common/msg-iter/msg-iter.h"
+
+#include "viewer-connection.h"
+
+struct lttng_live_component;
+struct lttng_live_session;
+struct lttng_live_msg_iter;
+
+enum lttng_live_stream_state {
+       /* This stream won't have data until some known time in the future. */
+       LTTNG_LIVE_STREAM_QUIESCENT,
+       /*
+        * This stream won't have data until some known time in the future and
+        * the message iterator inactivity message was already sent downstream.
+        */
+       LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA, /* */
+       /* This stream has data ready to be consumed. */
+       LTTNG_LIVE_STREAM_ACTIVE_DATA,
+       /*
+        * This stream has no data left to consume. We should asked the relay
+        * for more.
+        */
+       LTTNG_LIVE_STREAM_ACTIVE_NO_DATA,
+       /* This stream won't have anymore data, ever. */
+       LTTNG_LIVE_STREAM_EOF,
+};
+
+/* Iterator over a live stream. */
+struct lttng_live_stream_iterator {
+       /* Owned by this. */
+       bt_stream *stream;
+
+       /* Weak reference. */
+       struct lttng_live_trace *trace;
+
+       /*
+        * Since only a single iterator per viewer connection, we have
+        * only a single message iterator per stream.
+        */
+       struct bt_msg_iter *msg_iter;
+
+       uint64_t viewer_stream_id;
+
+       uint64_t ctf_stream_class_id;
+
+       /* base offset in current index. */
+       uint64_t base_offset;
+       /* len to read in current index. */
+       uint64_t len;
+       /* offset in current index. */
+       uint64_t offset;
+
+       /*
+        * Clock Snapshot value of the last message iterator inactivity message
+        * sent downstream.
+        */
+       uint64_t last_inactivity_ts;
+
+       /*
+        * Clock Snapshot value of the current message iterator inactivity
+        * message we might want to send downstream.
+        */
+       uint64_t current_inactivity_ts;
+
+       enum lttng_live_stream_state state;
+
+       /*
+        * The current message produced by this live stream iterator. Owned by
+        * this.
+        */
+       bt_message *current_msg;
+
+       /* Timestamp in nanoseconds of the current message (current_msg). */
+       int64_t current_msg_ts_ns;
+
+       /* Owned by this. */
+       uint8_t *buf;
+       size_t buflen;
+
+       /* Owned by this. */
+       GString *name;
+};
+
+struct lttng_live_metadata {
+       /* Weak reference. */
+       struct lttng_live_trace *trace;
+
+       uint64_t stream_id;
+       /* Weak reference. */
+       struct ctf_metadata_decoder *decoder;
+
+       bool closed;
+};
+
+struct lttng_live_trace {
+       /* Back reference to session. */
+       struct lttng_live_session *session;
+
+       /* ctf trace ID within the session. */
+       uint64_t id;
+
+       /* Owned by this. */
+       bt_trace *trace;
+
+       /* Weak reference. */
+       bt_trace_class *trace_class;
+
+       struct lttng_live_metadata *metadata;
+
+       const bt_clock_class *clock_class;
+
+       /* Array of pointers to struct lttng_live_stream_iterator. */
+       /* Owned by this. */
+       GPtrArray *stream_iterators;
+
+       bool new_metadata_needed;
+};
+
+struct lttng_live_session {
+       /* Weak reference. */
+       struct lttng_live_msg_iter *lttng_live_msg_iter;
+
+       /* Owned by this. */
+       GString *hostname;
+
+       /* Owned by this. */
+       GString *session_name;
+
+       uint64_t id;
+
+       /* Array of pointers to struct lttng_live_trace. */
+       GPtrArray *traces;
+
+       bool attached;
+       bool new_streams_needed;
+       bool lazy_stream_msg_init;
+       bool closed;
+};
+
+enum session_not_found_action {
+       SESSION_NOT_FOUND_ACTION_CONTINUE,
+       SESSION_NOT_FOUND_ACTION_FAIL,
+       SESSION_NOT_FOUND_ACTION_END,
+};
+
+/*
+ * A component instance is an iterator on a single session.
+ */
+struct lttng_live_component {
+       /* Weak reference. */
+       bt_self_component_source *self_comp;
+
+       struct {
+               GString *url;
+               enum session_not_found_action sess_not_found_act;
+       } params;
+
+       size_t max_query_size;
+
+       /*
+        * Keeps track of whether the downstream component already has a
+        * message iterator on this component.
+        */
+       bool has_msg_iter;
+};
+
+struct lttng_live_msg_iter {
+       /* Weak reference. */
+       struct lttng_live_component *lttng_live_comp;
+
+       /* Weak reference. */
+       bt_self_message_iterator *self_msg_iter;
+
+       /* Owned by this. */
+       struct live_viewer_connection *viewer_connection;
+
+       /* Array of pointers to struct lttng_live_session. */
+       GPtrArray *sessions;
+
+       /* Number of live stream iterator this message iterator has.*/
+       uint64_t active_stream_iter;
+
+       /* Timestamp in nanosecond of the last message sent downstream. */
+       int64_t last_msg_ts_ns;
+};
+
+enum lttng_live_iterator_status {
+       /** Iterator state has progressed. Continue iteration immediately. */
+       LTTNG_LIVE_ITERATOR_STATUS_CONTINUE = 3,
+       /** No message available for now. Try again later. */
+       LTTNG_LIVE_ITERATOR_STATUS_AGAIN = 2,
+       /** No more CTF_LTTNG_LIVEs to be delivered. */
+       LTTNG_LIVE_ITERATOR_STATUS_END = 1,
+       /** No error, okay. */
+       LTTNG_LIVE_ITERATOR_STATUS_OK = 0,
+       /** Invalid arguments. */
+       LTTNG_LIVE_ITERATOR_STATUS_INVAL = -1,
+       /** General error. */
+       LTTNG_LIVE_ITERATOR_STATUS_ERROR = -2,
+       /** Out of memory. */
+       LTTNG_LIVE_ITERATOR_STATUS_NOMEM = -3,
+       /** Unsupported iterator feature. */
+       LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED = -4,
+};
+
+bt_self_component_status lttng_live_component_init(
+               bt_self_component_source *self_comp,
+               const bt_value *params, void *init_method_data);
+
+bt_query_status lttng_live_query(
+               bt_self_component_class_source *comp_class,
+               const bt_query_executor *query_exec,
+               const char *object, const bt_value *params,
+               const bt_value **result);
+
+void lttng_live_component_finalize(bt_self_component_source *component);
+
+bt_self_message_iterator_status lttng_live_msg_iter_next(
+               bt_self_message_iterator *iterator,
+               bt_message_array_const msgs, uint64_t capacity,
+               uint64_t *count);
+
+bt_self_message_iterator_status lttng_live_msg_iter_init(
+               bt_self_message_iterator *self_msg_it,
+               bt_self_component_source *self_comp,
+               bt_self_component_port_output *self_port);
+
+void lttng_live_msg_iter_finalize(bt_self_message_iterator *it);
+
+int lttng_live_create_viewer_session(struct lttng_live_msg_iter *lttng_live_msg_iter);
+int lttng_live_attach_session(struct lttng_live_session *session);
+int lttng_live_detach_session(struct lttng_live_session *session);
+enum lttng_live_iterator_status lttng_live_get_new_streams(
+               struct lttng_live_session *session);
+
+int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
+               uint64_t session_id,
+               const char *hostname,
+               const char *session_name);
+
+ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace,
+               FILE *fp);
+enum lttng_live_iterator_status lttng_live_get_next_index(
+               struct lttng_live_msg_iter *lttng_live_msg_iter,
+               struct lttng_live_stream_iterator *stream,
+               struct packet_index *index);
+
+enum bt_msg_iter_medium_status lttng_live_get_stream_bytes(
+               struct lttng_live_msg_iter *lttng_live_msg_iter,
+               struct lttng_live_stream_iterator *stream, uint8_t *buf,
+               uint64_t offset, uint64_t req_len, uint64_t *recv_len);
+void lttng_live_add_stream_iterator(struct lttng_live_msg_iter *lttng_live_msg_iter,
+               struct lttng_live_stream_iterator *stream_iter);
+void lttng_live_remove_stream_iterator(struct lttng_live_msg_iter *lttng_live_msg_iter,
+               struct lttng_live_stream_iterator *stream_iter);
+
+struct lttng_live_trace *lttng_live_borrow_trace(
+               struct lttng_live_session *session, uint64_t trace_id);
+void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter);
+
+bool lttng_live_is_canceled(struct lttng_live_component *lttng_live);
+
+#endif /* BABELTRACE_PLUGIN_CTF_LTTNG_LIVE_H */
index 42aac5d9efe23f67f2047b7dc00233e88d063e95..36ff1c6c52fabc2abc0fc3f6c3abc1a473b7a3c5 100644 (file)
@@ -1,4 +1,5 @@
 /*
+ * Copyright 2019 - Francis Deslauriers <francis.deslauriers@efficios.com>
  * Copyright 2016 - Philippe Proulx <pproulx@efficios.com>
  * Copyright 2010-2011 - EfficiOS Inc. and Linux Foundation
  *
@@ -31,7 +32,6 @@
 #include <stdlib.h>
 #include <stdbool.h>
 #include <glib.h>
-#include <babeltrace/compat/uuid-internal.h>
 #include <babeltrace/compat/memstream-internal.h>
 #include <babeltrace/babeltrace.h>
 
@@ -53,66 +53,85 @@ struct packet_header {
        uint8_t  minor;
 } __attribute__((__packed__));
 
+
 static
-bt_lttng_live_iterator_status lttng_live_update_clock_map(
-               struct lttng_live_trace *trace)
+bool stream_classes_all_have_default_clock_class(bt_trace_class *tc)
 {
-       bt_lttng_live_iterator_status status =
-                       BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
-       size_t i;
-       int count, ret;
-
-       BT_OBJECT_PUT_REF_AND_RESET(trace->cc_prio_map);
-       trace->cc_prio_map = bt_clock_class_priority_map_create();
-       if (!trace->cc_prio_map) {
-               goto error;
-       }
+       uint64_t i, sc_count;
+       const bt_clock_class *cc = NULL;
+       const bt_stream_class *sc;
+       bool ret = true;
 
-       count = bt_trace_get_clock_class_count(trace->trace);
-       BT_ASSERT(count >= 0);
+       sc_count = bt_trace_class_get_stream_class_count(tc);
+       for (i = 0; i < sc_count; i++) {
+               sc = bt_trace_class_borrow_stream_class_by_index_const(tc, i);
 
-       for (i = 0; i < count; i++) {
-               const bt_clock_class *clock_class =
-                       bt_trace_get_clock_class_by_index(trace->trace, i);
+               BT_ASSERT(sc);
 
-               BT_ASSERT(clock_class);
-               ret = bt_clock_class_priority_map_add_clock_class(
-                       trace->cc_prio_map, clock_class, 0);
-               BT_CLOCK_CLASS_PUT_REF_AND_RESET(clock_class);
-
-               if (ret) {
-                       goto error;
+               cc = bt_stream_class_borrow_default_clock_class_const(sc);
+               if (!cc) {
+                       ret = false;
+                       BT_LOGE("Stream class doesn't have a default clock class: "
+                               "sc-id=%" PRIu64 ", sc-name=\"%s\"",
+                               bt_stream_class_get_id(sc),
+                               bt_stream_class_get_name(sc));
+                       goto end;
                }
        }
 
-       goto end;
-error:
-       status = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
 end:
-       return status;
+       return ret;
+}
+/*
+ * Iterate over the stream classes and returns the first clock class
+ * encountered. This is useful to create message iterator inactivity message as
+ * we don't need a particular clock class.
+ */
+static
+const bt_clock_class *borrow_any_clock_class(bt_trace_class *tc)
+{
+       uint64_t i, sc_count;
+       const bt_clock_class *cc = NULL;
+       const bt_stream_class *sc;
+
+       sc_count = bt_trace_class_get_stream_class_count(tc);
+       for (i = 0; i < sc_count; i++) {
+               sc = bt_trace_class_borrow_stream_class_by_index_const(tc, i);
+               BT_ASSERT(sc);
+
+               cc = bt_stream_class_borrow_default_clock_class_const(sc);
+               if (cc) {
+                       goto end;
+               }
+       }
+end:
+       BT_ASSERT(cc);
+       return cc;
 }
 
 BT_HIDDEN
-bt_lttng_live_iterator_status lttng_live_metadata_update(
+enum lttng_live_iterator_status lttng_live_metadata_update(
                struct lttng_live_trace *trace)
 {
        struct lttng_live_session *session = trace->session;
        struct lttng_live_metadata *metadata = trace->metadata;
+       struct lttng_live_component *lttng_live =
+               session->lttng_live_msg_iter->lttng_live_comp;
        ssize_t ret = 0;
        size_t size, len_read = 0;
        char *metadata_buf = NULL;
        FILE *fp = NULL;
        enum ctf_metadata_decoder_status decoder_status;
-       bt_lttng_live_iterator_status status =
-               BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+       enum lttng_live_iterator_status status =
+               LTTNG_LIVE_ITERATOR_STATUS_OK;
 
        /* No metadata stream yet. */
        if (!metadata) {
                if (session->new_streams_needed) {
-                       status = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+                       status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
                } else {
                        session->new_streams_needed = true;
-                       status = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+                       status = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
                }
                goto end;
        }
@@ -159,12 +178,11 @@ bt_lttng_live_iterator_status lttng_live_metadata_update(
                         * metadata objects immediately, but only when
                         * the data streams are done.
                         */
-                       lttng_live_unref_trace(metadata->trace);
                        metadata->trace = NULL;
                }
                if (errno == EINTR) {
-                       if (lttng_live_is_canceled(session->lttng_live)) {
-                               status = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+                       if (lttng_live_is_canceled(lttng_live)) {
+                               status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
                                goto end;
                        }
                }
@@ -178,7 +196,7 @@ bt_lttng_live_iterator_status lttng_live_metadata_update(
 
        if (len_read == 0) {
                if (!trace->trace) {
-                       status = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+                       status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
                        goto end;
                }
                trace->new_metadata_needed = false;
@@ -192,19 +210,31 @@ bt_lttng_live_iterator_status lttng_live_metadata_update(
                goto error;
        }
 
+       /*
+        * The call to ctf_metadata_decoder_decode will append new metadata to
+        * our current trace class.
+        */
        decoder_status = ctf_metadata_decoder_decode(metadata->decoder, fp);
        switch (decoder_status) {
        case CTF_METADATA_DECODER_STATUS_OK:
-               BT_OBJECT_PUT_REF_AND_RESET(trace->trace);
-               trace->trace = ctf_metadata_decoder_get_trace(metadata->decoder);
-               trace->new_metadata_needed = false;
-               status = lttng_live_update_clock_map(trace);
-               if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
-                       goto end;
+               if (!trace->trace_class) {
+                       trace->trace_class =
+                               ctf_metadata_decoder_get_ir_trace_class(
+                                               metadata->decoder);
+                       trace->trace = bt_trace_create(trace->trace_class);
+                       if (!stream_classes_all_have_default_clock_class(
+                                       trace->trace_class)) {
+                               /* Error logged in function. */
+                               goto error;
+                       }
+                       trace->clock_class =
+                               borrow_any_clock_class(trace->trace_class);
                }
+               trace->new_metadata_needed = false;
+
                break;
        case CTF_METADATA_DECODER_STATUS_INCOMPLETE:
-               status = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+               status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
                break;
        case CTF_METADATA_DECODER_STATUS_ERROR:
        case CTF_METADATA_DECODER_STATUS_INVAL_VERSION:
@@ -214,7 +244,7 @@ bt_lttng_live_iterator_status lttng_live_metadata_update(
 
        goto end;
 error:
-       status = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+       status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
 end:
        if (fp) {
                int closeret;
@@ -234,6 +264,8 @@ int lttng_live_metadata_create_stream(struct lttng_live_session *session,
                uint64_t stream_id,
                const char *trace_name)
 {
+       struct lttng_live_component *lttng_live =
+               session->lttng_live_msg_iter->lttng_live_comp;
        struct lttng_live_metadata *metadata = NULL;
        struct lttng_live_trace *trace;
        const char *match;
@@ -243,17 +275,18 @@ int lttng_live_metadata_create_stream(struct lttng_live_session *session,
                return -1;
        }
        metadata->stream_id = stream_id;
-       //TODO: add clock offset option
+
        match = strstr(trace_name, session->session_name->str);
        if (!match) {
                goto error;
        }
-       metadata->decoder = ctf_metadata_decoder_create(NULL,
-               match);
+
+       metadata->decoder = ctf_metadata_decoder_create(
+                               lttng_live->self_comp, NULL);
        if (!metadata->decoder) {
                goto error;
        }
-       trace = lttng_live_ref_trace(session, ctf_trace_id);
+       trace = lttng_live_borrow_trace(session, ctf_trace_id);
        if (!trace) {
                goto error;
        }
@@ -275,13 +308,7 @@ void lttng_live_metadata_fini(struct lttng_live_trace *trace)
        if (!metadata) {
                return;
        }
-       if (metadata->text) {
-               free(metadata->text);
-       }
        ctf_metadata_decoder_destroy(metadata->decoder);
        trace->metadata = NULL;
-       if (!metadata->closed) {
-               lttng_live_unref_trace(metadata->trace);
-       }
        g_free(metadata);
 }
index f3b66ed7ebb543d408995d90ca0b00273db54186..ddd0e0f4db98e9f99a40921efe26b7546cd522eb 100644 (file)
 #include <glib.h>
 #include <babeltrace/babeltrace-internal.h>
 #include <babeltrace/babeltrace.h>
-#include "lttng-live-internal.h"
+#include "lttng-live.h"
 
 int lttng_live_metadata_create_stream(struct lttng_live_session *session,
                uint64_t ctf_trace_id, uint64_t stream_id,
                const char *trace_name);
 
-bt_lttng_live_iterator_status lttng_live_metadata_update(
+enum lttng_live_iterator_status lttng_live_metadata_update(
                struct lttng_live_trace *trace);
 
 void lttng_live_metadata_fini(struct lttng_live_trace *trace);
index d7a4a0a106be293c5731bed49b59262770d33cfe..b37db12ec46e2f9a0f464658b71e7e63a0440692 100644 (file)
@@ -1,4 +1,5 @@
 /*
+ * Copyright 2019 - Francis Deslauriers <francis.deslauriers@efficios.com>
  * Copyright 2016 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
  *
  * Permission is hereby granted, free of charge, to any person obtaining a copy
 #include <babeltrace/common-internal.h>
 #include <babeltrace/babeltrace.h>
 
-#include "lttng-live-internal.h"
+#include "lttng-live.h"
 #include "viewer-connection.h"
 #include "lttng-viewer-abi.h"
 #include "data-stream.h"
 #include "metadata.h"
 
-static ssize_t lttng_live_recv(struct bt_live_viewer_connection *viewer_connection,
+static
+ssize_t lttng_live_recv(struct live_viewer_connection *viewer_connection,
                void *buf, size_t len)
 {
        ssize_t ret;
        size_t copied = 0, to_copy = len;
-       struct lttng_live_component *lttng_live =
-                       viewer_connection->lttng_live;
+       struct lttng_live_msg_iter *lttng_live_msg_iter =
+               viewer_connection->lttng_live_msg_iter;
        BT_SOCKET sock = viewer_connection->control_sock;
 
        do {
@@ -62,7 +64,8 @@ static ssize_t lttng_live_recv(struct bt_live_viewer_connection *viewer_connecti
                        to_copy -= ret;
                }
                if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) {
-                       if (lttng_live_is_canceled(lttng_live)) {
+                       if (!viewer_connection->in_query &&
+                                       lttng_live_is_canceled(lttng_live_msg_iter->lttng_live_comp)) {
                                break;
                        } else {
                                continue;
@@ -75,18 +78,20 @@ static ssize_t lttng_live_recv(struct bt_live_viewer_connection *viewer_connecti
        return ret;
 }
 
-static ssize_t lttng_live_send(struct bt_live_viewer_connection *viewer_connection,
+static
+ssize_t lttng_live_send(struct live_viewer_connection *viewer_connection,
                const void *buf, size_t len)
 {
-       struct lttng_live_component *lttng_live =
-                       viewer_connection->lttng_live;
+       struct lttng_live_msg_iter *lttng_live_msg_iter =
+               viewer_connection->lttng_live_msg_iter;
        BT_SOCKET sock = viewer_connection->control_sock;
        ssize_t ret;
 
        for (;;) {
                ret = bt_socket_send_nosigpipe(sock, buf, len);
                if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) {
-                       if (lttng_live_is_canceled(lttng_live)) {
+                       if (!viewer_connection->in_query &&
+                                       lttng_live_is_canceled(lttng_live_msg_iter->lttng_live_comp)) {
                                break;
                        } else {
                                continue;
@@ -98,7 +103,8 @@ static ssize_t lttng_live_send(struct bt_live_viewer_connection *viewer_connecti
        return ret;
 }
 
-static int parse_url(struct bt_live_viewer_connection *viewer_connection)
+static
+int parse_url(struct live_viewer_connection *viewer_connection)
 {
        char error_buf[256] = { 0 };
        struct bt_common_lttng_live_url_parts lttng_live_url_parts = { 0 };
@@ -153,7 +159,8 @@ end:
        return ret;
 }
 
-static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connection)
+static
+int lttng_live_handshake(struct live_viewer_connection *viewer_connection)
 {
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_connect connect;
@@ -221,7 +228,8 @@ error:
        return -1;
 }
 
-static int lttng_live_connect_viewer(struct bt_live_viewer_connection *viewer_connection)
+static
+int lttng_live_connect_viewer(struct live_viewer_connection *viewer_connection)
 {
        struct hostent *host;
        struct sockaddr_in server_addr;
@@ -271,7 +279,9 @@ error:
        return -1;
 }
 
-static void lttng_live_disconnect_viewer(struct bt_live_viewer_connection *viewer_connection)
+static
+void lttng_live_disconnect_viewer(
+               struct live_viewer_connection *viewer_connection)
 {
        if (viewer_connection->control_sock == BT_INVALID_SOCKET) {
                return;
@@ -282,20 +292,21 @@ static void lttng_live_disconnect_viewer(struct bt_live_viewer_connection *viewe
        }
 }
 
-static void connection_release(bt_object *obj)
+static
+void connection_release(bt_object *obj)
 {
-       struct bt_live_viewer_connection *conn =
-               container_of(obj, struct bt_live_viewer_connection, obj);
+       struct live_viewer_connection *conn =
+               container_of(obj, struct live_viewer_connection, obj);
 
-       bt_live_viewer_connection_destroy(conn);
+       live_viewer_connection_destroy(conn);
 }
 
 static
-bt_value_status list_update_session(bt_value *results,
+int list_update_session(bt_value *results,
                const struct lttng_viewer_session *session,
                bool *_found)
 {
-       bt_value_status ret = BT_VALUE_STATUS_OK;
+       int ret = 0;
        bt_value *map = NULL;
        bt_value *hostname = NULL;
        bt_value *session_name = NULL;
@@ -305,26 +316,30 @@ bt_value_status list_update_session(bt_value *results,
 
        len = bt_value_array_get_size(results);
        if (len < 0) {
-               ret = BT_VALUE_STATUS_ERROR;
+               BT_LOGE_STR("Error getting size of array.");
+               ret = -1;
                goto end;
        }
        for (i = 0; i < len; i++) {
                const char *hostname_str = NULL;
                const char *session_name_str = NULL;
 
-               map = bt_value_array_get(results, (size_t) i);
+               map = bt_value_array_borrow_element_by_index(results, (size_t) i);
                if (!map) {
-                       ret = BT_VALUE_STATUS_ERROR;
+                       BT_LOGE_STR("Error borrowing map.");
+                       ret = -1;
                        goto end;
                }
-               hostname = bt_value_map_get(map, "target-hostname");
+               hostname = bt_value_map_borrow_entry_value(map, "target-hostname");
                if (!hostname) {
-                       ret = BT_VALUE_STATUS_ERROR;
+                       BT_LOGE_STR("Error borrowing \"target-hostname\" entry.");
+                       ret = -1;
                        goto end;
                }
-               session_name = bt_value_map_get(map, "session-name");
+               session_name = bt_value_map_borrow_entry_value(map, "session-name");
                if (!session_name) {
-                       ret = BT_VALUE_STATUS_ERROR;
+                       BT_LOGE_STR("Error borrowing \"session-name\" entry.");
+                       ret = -1;
                        goto end;
                }
                hostname_str = bt_value_string_get(hostname);
@@ -339,58 +354,45 @@ bt_value_status list_update_session(bt_value *results,
 
                        found = true;
 
-                       btval = bt_value_map_get(map, "stream-count");
+                       btval = bt_value_map_borrow_entry_value(map, "stream-count");
                        if (!btval) {
-                               ret = BT_VALUE_STATUS_ERROR;
+                               BT_LOGE_STR("Error borrowing \"stream-count\" entry.");
+                               ret = -1;
                                goto end;
                        }
                        val = bt_value_integer_get(btval);
                        /* sum */
                        val += streams;
-                       ret = bt_private_integer_bool_set(btval, val);
-                       if (ret != BT_VALUE_STATUS_OK) {
-                               goto end;
-                       }
-                       BT_VALUE_PUT_REF_AND_RESET(btval);
+                       bt_value_integer_set(btval, val);
 
-                       btval = bt_value_map_get(map, "client-count");
+                       btval = bt_value_map_borrow_entry_value(map, "client-count");
                        if (!btval) {
-                               ret = BT_VALUE_STATUS_ERROR;
+                               BT_LOGE_STR("Error borrowing \"client-count\" entry.");
+                               ret = -1;
                                goto end;
                        }
                        val = bt_value_integer_get(btval);
                        /* max */
                        val = max_t(int64_t, clients, val);
-                       ret = bt_private_integer_bool_set(btval, val);
-                       if (ret != BT_VALUE_STATUS_OK) {
-                               goto end;
-                       }
-                       BT_VALUE_PUT_REF_AND_RESET(btval);
+                       bt_value_integer_set(btval, val);
                }
 
-               BT_VALUE_PUT_REF_AND_RESET(hostname);
-               BT_VALUE_PUT_REF_AND_RESET(session_name);
-               BT_VALUE_PUT_REF_AND_RESET(map);
-
                if (found) {
                        break;
                }
        }
 end:
-       BT_VALUE_PUT_REF_AND_RESET(btval);
-       BT_VALUE_PUT_REF_AND_RESET(hostname);
-       BT_VALUE_PUT_REF_AND_RESET(session_name);
-       BT_VALUE_PUT_REF_AND_RESET(map);
        *_found = found;
        return ret;
 }
 
 static
-bt_value_status list_append_session(bt_value *results,
+int list_append_session(bt_value *results,
                GString *base_url,
                const struct lttng_viewer_session *session)
 {
-       bt_value_status ret = BT_VALUE_STATUS_OK;
+       int ret = 0;
+       bt_value_status ret_status;
        bt_value *map = NULL;
        GString *url = NULL;
        bool found = false;
@@ -400,18 +402,20 @@ bt_value_status list_append_session(bt_value *results,
         * and do max of client counts.
         */
        ret = list_update_session(results, session, &found);
-       if (ret != BT_VALUE_STATUS_OK || found) {
+       if (ret || found) {
                goto end;
        }
 
-       map = bt_private_value_map_create();
+       map = bt_value_map_create();
        if (!map) {
-               ret = BT_VALUE_STATUS_ERROR;
+               BT_LOGE_STR("Error creating map value.");
+               ret = -1;
                goto end;
        }
 
        if (base_url->len < 1) {
-               ret = BT_VALUE_STATUS_ERROR;
+               BT_LOGE_STR("Error: base_url length smaller than 1.");
+               ret = -1;
                goto end;
        }
        /*
@@ -424,8 +428,10 @@ bt_value_status list_append_session(bt_value *results,
        g_string_append_c(url, '/');
        g_string_append(url, session->session_name);
 
-       ret = bt_private_value_map_insert_string_entry(map, "url", url->str);
-       if (ret != BT_VALUE_STATUS_OK) {
+       ret_status = bt_value_map_insert_string_entry(map, "url", url->str);
+       if (ret_status != BT_VALUE_STATUS_OK) {
+               BT_LOGE_STR("Error inserting \"url\" entry.");
+               ret = -1;
                goto end;
        }
 
@@ -433,9 +439,11 @@ bt_value_status list_append_session(bt_value *results,
         * key = "target-hostname",
         * value = <string>,
         */
-       ret = bt_private_value_map_insert_string_entry(map, "target-hostname",
+       ret_status = bt_value_map_insert_string_entry(map, "target-hostname",
                session->hostname);
-       if (ret != BT_VALUE_STATUS_OK) {
+       if (ret_status != BT_VALUE_STATUS_OK) {
+               BT_LOGE_STR("Error inserting \"target-hostname\" entry.");
+               ret = -1;
                goto end;
        }
 
@@ -443,9 +451,11 @@ bt_value_status list_append_session(bt_value *results,
         * key = "session-name",
         * value = <string>,
         */
-       ret = bt_private_value_map_insert_string_entry(map, "session-name",
+       ret_status = bt_value_map_insert_string_entry(map, "session-name",
                session->session_name);
-       if (ret != BT_VALUE_STATUS_OK) {
+       if (ret_status != BT_VALUE_STATUS_OK) {
+               BT_LOGE_STR("Error inserting \"session-name\" entry.");
+               ret = -1;
                goto end;
        }
 
@@ -456,9 +466,11 @@ bt_value_status list_append_session(bt_value *results,
        {
                uint32_t live_timer = be32toh(session->live_timer);
 
-               ret = bt_private_value_map_insert_integer_entry(map, "timer-us",
+               ret_status = bt_value_map_insert_integer_entry(map, "timer-us",
                        live_timer);
-               if (ret != BT_VALUE_STATUS_OK) {
+               if (ret_status != BT_VALUE_STATUS_OK) {
+                       BT_LOGE_STR("Error inserting \"timer-us\" entry.");
+                       ret = -1;
                        goto end;
                }
        }
@@ -470,14 +482,15 @@ bt_value_status list_append_session(bt_value *results,
        {
                uint32_t streams = be32toh(session->streams);
 
-               ret = bt_private_value_map_insert_integer_entry(map, "stream-count",
+               ret_status = bt_value_map_insert_integer_entry(map, "stream-count",
                        streams);
-               if (ret != BT_VALUE_STATUS_OK) {
+               if (ret_status != BT_VALUE_STATUS_OK) {
+                       BT_LOGE_STR("Error inserting \"stream-count\" entry.");
+                       ret = -1;
                        goto end;
                }
        }
 
-
        /*
         * key = "client-count",
         * value = <integer>,
@@ -485,17 +498,24 @@ bt_value_status list_append_session(bt_value *results,
        {
                uint32_t clients = be32toh(session->clients);
 
-               ret = bt_private_value_map_insert_integer_entry(map, "client-count",
+               ret_status = bt_value_map_insert_integer_entry(map, "client-count",
                        clients);
-               if (ret != BT_VALUE_STATUS_OK) {
+               if (ret_status != BT_VALUE_STATUS_OK) {
+                       BT_LOGE_STR("Error inserting \"client-count\" entry.");
+                       ret = -1;
                        goto end;
                }
        }
 
-       ret = bt_private_value_array_append_element(results, map);
+       ret_status = bt_value_array_append_element(results, map);
+       if (ret_status != BT_VALUE_STATUS_OK) {
+               BT_LOGE_STR("Error appending map to results.");
+               ret = -1;
+       }
+
 end:
        if (url) {
-               g_string_free(url, TRUE);
+               g_string_free(url, true);
        }
        BT_VALUE_PUT_REF_AND_RESET(map);
        return ret;
@@ -538,9 +558,12 @@ end:
  */
 
 BT_HIDDEN
-bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_connection *viewer_connection)
+bt_query_status live_viewer_connection_list_sessions(
+               struct live_viewer_connection *viewer_connection,
+               const bt_value **user_result)
 {
-       bt_value *results = NULL;
+       bt_query_status status = BT_QUERY_STATUS_OK;
+       bt_value *result = NULL;
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_list_sessions list;
        uint32_t i, sessions_count;
@@ -550,9 +573,10 @@ bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_connecti
                goto error;
        }
 
-       results = bt_private_value_array_create();
-       if (!results) {
+       result = bt_value_array_create();
+       if (!result) {
                BT_LOGE("Error creating array");
+               status = BT_QUERY_STATUS_NOMEM;
                goto error;
        }
 
@@ -563,6 +587,7 @@ bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_connecti
        ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
        if (ret_len == BT_SOCKET_ERROR) {
                BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
+               status = BT_QUERY_STATUS_ERROR;
                goto error;
        }
        BT_ASSERT(ret_len == sizeof(cmd));
@@ -570,10 +595,12 @@ bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_connecti
        ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list));
        if (ret_len == 0) {
                BT_LOGI("Remote side has closed connection");
+               status = BT_QUERY_STATUS_ERROR;
                goto error;
        }
        if (ret_len == BT_SOCKET_ERROR) {
                BT_LOGE("Error receiving session list: %s", bt_socket_errormsg());
+               status = BT_QUERY_STATUS_ERROR;
                goto error;
        }
        BT_ASSERT(ret_len == sizeof(list));
@@ -582,34 +609,38 @@ bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_connecti
        for (i = 0; i < sessions_count; i++) {
                struct lttng_viewer_session lsession;
 
-               ret_len = lttng_live_recv(viewer_connection,
-                               &lsession, sizeof(lsession));
+               ret_len = lttng_live_recv(viewer_connection, &lsession,
+                       sizeof(lsession));
                if (ret_len == 0) {
                        BT_LOGI("Remote side has closed connection");
+                       status = BT_QUERY_STATUS_ERROR;
                        goto error;
                }
                if (ret_len == BT_SOCKET_ERROR) {
                        BT_LOGE("Error receiving session: %s", bt_socket_errormsg());
+                       status = BT_QUERY_STATUS_ERROR;
                        goto error;
                }
                BT_ASSERT(ret_len == sizeof(lsession));
                lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
                lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
-               if (list_append_session(results,
-                               viewer_connection->url, &lsession)
-                               != BT_VALUE_STATUS_OK) {
+               if (list_append_session(result, viewer_connection->url,
+                               &lsession)) {
+                       status = BT_QUERY_STATUS_ERROR;
                        goto error;
                }
        }
+
+       *user_result = result;
        goto end;
 error:
-       BT_VALUE_PUT_REF_AND_RESET(results);
+       BT_VALUE_PUT_REF_AND_RESET(result);
 end:
-       return results;
+       return status;
 }
 
 static
-int lttng_live_query_session_ids(struct lttng_live_component *lttng_live)
+int lttng_live_query_session_ids(struct lttng_live_msg_iter *lttng_live_msg_iter)
 {
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_list_sessions list;
@@ -617,8 +648,8 @@ int lttng_live_query_session_ids(struct lttng_live_component *lttng_live)
        uint32_t i, sessions_count;
        ssize_t ret_len;
        uint64_t session_id;
-       struct bt_live_viewer_connection *viewer_connection =
-                       lttng_live->viewer_connection;
+       struct live_viewer_connection *viewer_connection =
+                       lttng_live_msg_iter->viewer_connection;
 
        cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
        cmd.data_size = htobe64((uint64_t) 0);
@@ -667,7 +698,7 @@ int lttng_live_query_session_ids(struct lttng_live_component *lttng_live)
                        LTTNG_VIEWER_NAME_MAX) == 0) && (strncmp(lsession.hostname,
                                viewer_connection->target_hostname->str,
                                LTTNG_VIEWER_HOST_NAME_MAX) == 0)) {
-                       if (lttng_live_add_session(lttng_live, session_id,
+                       if (lttng_live_add_session(lttng_live_msg_iter, session_id,
                                        lsession.hostname,
                                        lsession.session_name)) {
                                goto error;
@@ -683,13 +714,14 @@ error:
 }
 
 BT_HIDDEN
-int lttng_live_create_viewer_session(struct lttng_live_component *lttng_live)
+int lttng_live_create_viewer_session(
+               struct lttng_live_msg_iter *lttng_live_msg_iter)
 {
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_create_session_response resp;
        ssize_t ret_len;
-       struct bt_live_viewer_connection *viewer_connection =
-                       lttng_live->viewer_connection;
+       struct live_viewer_connection *viewer_connection =
+                       lttng_live_msg_iter->viewer_connection;
 
        cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
        cmd.data_size = htobe64((uint64_t) 0);
@@ -717,7 +749,7 @@ int lttng_live_create_viewer_session(struct lttng_live_component *lttng_live)
                BT_LOGE("Error creating viewer session");
                goto error;
        }
-       if (lttng_live_query_session_ids(lttng_live)) {
+       if (lttng_live_query_session_ids(lttng_live_msg_iter)) {
                goto error;
        }
 
@@ -733,9 +765,10 @@ int receive_streams(struct lttng_live_session *session,
 {
        ssize_t ret_len;
        uint32_t i;
-       struct lttng_live_component *lttng_live = session->lttng_live;
-       struct bt_live_viewer_connection *viewer_connection =
-                       lttng_live->viewer_connection;
+       struct lttng_live_msg_iter *lttng_live_msg_iter =
+                       session->lttng_live_msg_iter;
+       struct live_viewer_connection *viewer_connection =
+                       lttng_live_msg_iter->viewer_connection;
 
        BT_LOGD("Getting %" PRIu32 " new streams:", stream_count);
        for (i = 0; i < stream_count; i++) {
@@ -796,18 +829,14 @@ int lttng_live_attach_session(struct lttng_live_session *session)
        struct lttng_viewer_attach_session_request rq;
        struct lttng_viewer_attach_session_response rp;
        ssize_t ret_len;
-       struct lttng_live_component *lttng_live = session->lttng_live;
-       struct bt_live_viewer_connection *viewer_connection =
-                       lttng_live->viewer_connection;
+       struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
+       struct live_viewer_connection *viewer_connection =
+                       lttng_live_msg_iter->viewer_connection;
        uint64_t session_id = session->id;
        uint32_t streams_count;
        const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
        char cmd_buf[cmd_buf_len];
 
-       if (session->attached) {
-               return 0;
-       }
-
        cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
        cmd.data_size = htobe64((uint64_t) sizeof(rq));
        cmd.cmd_version = htobe32(0);
@@ -885,9 +914,9 @@ int lttng_live_detach_session(struct lttng_live_session *session)
        struct lttng_viewer_detach_session_request rq;
        struct lttng_viewer_detach_session_response rp;
        ssize_t ret_len;
-       struct lttng_live_component *lttng_live = session->lttng_live;
-       struct bt_live_viewer_connection *viewer_connection =
-                       lttng_live->viewer_connection;
+       struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
+       struct live_viewer_connection *viewer_connection =
+                       lttng_live_msg_iter->viewer_connection;
        uint64_t session_id = session->id;
        const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
        char cmd_buf[cmd_buf_len];
@@ -962,10 +991,10 @@ ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace,
        char *data = NULL;
        ssize_t ret_len;
        struct lttng_live_session *session = trace->session;
-       struct lttng_live_component *lttng_live = session->lttng_live;
+       struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
        struct lttng_live_metadata *metadata = trace->metadata;
-       struct bt_live_viewer_connection *viewer_connection =
-                       lttng_live->viewer_connection;
+       struct live_viewer_connection *viewer_connection =
+                       lttng_live_msg_iter->viewer_connection;
        const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
        char cmd_buf[cmd_buf_len];
 
@@ -1075,7 +1104,8 @@ void lttng_index_to_packet_index(struct lttng_viewer_index *lindex,
 }
 
 BT_HIDDEN
-bt_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_component *lttng_live,
+enum lttng_live_iterator_status lttng_live_get_next_index(
+               struct lttng_live_msg_iter *lttng_live_msg_iter,
                struct lttng_live_stream_iterator *stream,
                struct packet_index *index)
 {
@@ -1084,13 +1114,15 @@ bt_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_compon
        ssize_t ret_len;
        struct lttng_viewer_index rp;
        uint32_t flags, status;
-       bt_lttng_live_iterator_status retstatus =
-                       BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
-       struct bt_live_viewer_connection *viewer_connection =
-                       lttng_live->viewer_connection;
+       enum lttng_live_iterator_status retstatus =
+                       LTTNG_LIVE_ITERATOR_STATUS_OK;
+       struct live_viewer_connection *viewer_connection =
+                       lttng_live_msg_iter->viewer_connection;
        struct lttng_live_trace *trace = stream->trace;
        const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
        char cmd_buf[cmd_buf_len];
+       struct lttng_live_component *lttng_live =
+               lttng_live_msg_iter->lttng_live_comp;
 
        cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
        cmd.data_size = htobe64((uint64_t) sizeof(rq));
@@ -1109,7 +1141,8 @@ bt_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_compon
        memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
        ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
        if (ret_len == BT_SOCKET_ERROR) {
-               BT_LOGE("Error sending get_next_index request: %s", bt_socket_errormsg());
+               BT_LOGE("Error sending get_next_index request: %s",
+                               bt_socket_errormsg());
                goto error;
        }
 
@@ -1120,7 +1153,8 @@ bt_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_compon
                goto error;
        }
        if (ret_len == BT_SOCKET_ERROR) {
-               BT_LOGE("Error receiving get_next_index response: %s", bt_socket_errormsg());
+               BT_LOGE("Error receiving get_next_index response: %s",
+                               bt_socket_errormsg());
                goto error;
        }
        BT_ASSERT(ret_len == sizeof(rp));
@@ -1136,7 +1170,7 @@ bt_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_compon
                BT_LOGD("get_next_index: inactive");
                memset(index, 0, sizeof(struct packet_index));
                index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
-               stream->current_inactivity_timestamp = index->ts_cycles.timestamp_end;
+               stream->current_inactivity_ts = index->ts_cycles.timestamp_end;
                ctf_stream_class_id = be64toh(rp.stream_id);
                if (stream->ctf_stream_class_id != -1ULL) {
                        BT_ASSERT(stream->ctf_stream_class_id ==
@@ -1162,8 +1196,6 @@ bt_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_compon
                }
 
                stream->state = LTTNG_LIVE_STREAM_ACTIVE_DATA;
-               stream->current_packet_end_timestamp =
-                       index->ts_cycles.timestamp_end;
 
                if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
                        BT_LOGD("get_next_index: new metadata needed");
@@ -1171,21 +1203,21 @@ bt_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_compon
                }
                if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
                        BT_LOGD("get_next_index: new streams needed");
-                       lttng_live_need_new_streams(lttng_live);
+                       lttng_live_need_new_streams(lttng_live_msg_iter);
                }
                break;
        }
        case LTTNG_VIEWER_INDEX_RETRY:
                BT_LOGD("get_next_index: retry");
                memset(index, 0, sizeof(struct packet_index));
-               retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+               retstatus = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
                stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
                goto end;
        case LTTNG_VIEWER_INDEX_HUP:
                BT_LOGD("get_next_index: stream hung up");
                memset(index, 0, sizeof(struct packet_index));
                index->offset = EOF;
-               retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
+               retstatus = LTTNG_LIVE_ITERATOR_STATUS_END;
                stream->state = LTTNG_LIVE_STREAM_EOF;
                break;
        case LTTNG_VIEWER_INDEX_ERR:
@@ -1204,17 +1236,18 @@ end:
 
 error:
        if (lttng_live_is_canceled(lttng_live)) {
-               retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+               retstatus = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
        } else {
-               retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+               retstatus = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
        }
        return retstatus;
 }
 
 BT_HIDDEN
-enum bt_msg_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_component *lttng_live,
-               struct lttng_live_stream_iterator *stream, uint8_t *buf, uint64_t offset,
-               uint64_t req_len, uint64_t *recv_len)
+enum bt_msg_iter_medium_status lttng_live_get_stream_bytes(
+               struct lttng_live_msg_iter *lttng_live_msg_iter,
+               struct lttng_live_stream_iterator *stream, uint8_t *buf,
+               uint64_t offset, uint64_t req_len, uint64_t *recv_len)
 {
        enum bt_msg_iter_medium_status retstatus = BT_MSG_ITER_MEDIUM_STATUS_OK;
        struct lttng_viewer_cmd cmd;
@@ -1222,11 +1255,13 @@ enum bt_msg_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_com
        struct lttng_viewer_trace_packet rp;
        ssize_t ret_len;
        uint32_t flags, status;
-       struct bt_live_viewer_connection *viewer_connection =
-                       lttng_live->viewer_connection;
+       struct live_viewer_connection *viewer_connection =
+                       lttng_live_msg_iter->viewer_connection;
        struct lttng_live_trace *trace = stream->trace;
        const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
        char cmd_buf[cmd_buf_len];
+       struct lttng_live_component *lttng_live =
+               lttng_live_msg_iter->lttng_live_comp;
 
        BT_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64,
                        offset, req_len);
@@ -1289,7 +1324,7 @@ enum bt_msg_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_com
                }
                if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
                        BT_LOGD("get_data_packet: new streams needed, try again later");
-                       lttng_live_need_new_streams(lttng_live);
+                       lttng_live_need_new_streams(lttng_live_msg_iter);
                }
                if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA
                                | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
@@ -1337,24 +1372,27 @@ error:
  * Request new streams for a session.
  */
 BT_HIDDEN
-bt_lttng_live_iterator_status lttng_live_get_new_streams(
+enum lttng_live_iterator_status lttng_live_get_new_streams(
                struct lttng_live_session *session)
 {
-       bt_lttng_live_iterator_status status =
-                       BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+       enum lttng_live_iterator_status status =
+               LTTNG_LIVE_ITERATOR_STATUS_OK;
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_new_streams_request rq;
        struct lttng_viewer_new_streams_response rp;
        ssize_t ret_len;
-       struct lttng_live_component *lttng_live = session->lttng_live;
-       struct bt_live_viewer_connection *viewer_connection =
-                       lttng_live->viewer_connection;
+       struct lttng_live_msg_iter *lttng_live_msg_iter =
+               session->lttng_live_msg_iter;
+       struct live_viewer_connection *viewer_connection =
+               lttng_live_msg_iter->viewer_connection;
+       struct lttng_live_component *lttng_live =
+               lttng_live_msg_iter->lttng_live_comp;
        uint32_t streams_count;
        const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
        char cmd_buf[cmd_buf_len];
 
        if (!session->new_streams_needed) {
-               return BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+               return LTTNG_LIVE_ITERATOR_STATUS_OK;
        }
 
        cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
@@ -1373,7 +1411,8 @@ bt_lttng_live_iterator_status lttng_live_get_new_streams(
        memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
        ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
        if (ret_len == BT_SOCKET_ERROR) {
-               BT_LOGE("Error sending get_new_streams request: %s", bt_socket_errormsg());
+               BT_LOGE("Error sending get_new_streams request: %s",
+                               bt_socket_errormsg());
                goto error;
        }
 
@@ -1401,7 +1440,7 @@ bt_lttng_live_iterator_status lttng_live_get_new_streams(
        case LTTNG_VIEWER_NEW_STREAMS_HUP:
                session->new_streams_needed = false;
                session->closed = true;
-               status = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
+               status = LTTNG_LIVE_ITERATOR_STATUS_END;
                goto end;
        case LTTNG_VIEWER_NEW_STREAMS_ERR:
                BT_LOGE("get_new_streams error");
@@ -1419,30 +1458,31 @@ end:
 
 error:
        if (lttng_live_is_canceled(lttng_live)) {
-               status = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+               status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
        } else {
-               status = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+               status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
        }
        return status;
 }
 
 BT_HIDDEN
-struct bt_live_viewer_connection *
-       bt_live_viewer_connection_create(const char *url,
-                       struct lttng_live_component *lttng_live)
+struct live_viewer_connection *live_viewer_connection_create(
+               const char *url, bool in_query,
+               struct lttng_live_msg_iter *lttng_live_msg_iter)
 {
-       struct bt_live_viewer_connection *viewer_connection;
+       struct live_viewer_connection *viewer_connection;
 
-       viewer_connection = g_new0(struct bt_live_viewer_connection, 1);
+       viewer_connection = g_new0(struct live_viewer_connection, 1);
 
        if (bt_socket_init() != 0) {
                goto error;
        }
 
-       bt_object_init(&viewer_connection->obj, connection_release);
+       bt_object_init_shared(&viewer_connection->obj, connection_release);
        viewer_connection->control_sock = BT_INVALID_SOCKET;
        viewer_connection->port = -1;
-       viewer_connection->lttng_live = lttng_live;
+       viewer_connection->in_query = in_query;
+       viewer_connection->lttng_live_msg_iter = lttng_live_msg_iter;
        viewer_connection->url = g_string_new(url);
        if (!viewer_connection->url) {
                goto error;
@@ -1463,19 +1503,20 @@ error:
 }
 
 BT_HIDDEN
-void bt_live_viewer_connection_destroy(struct bt_live_viewer_connection *viewer_connection)
+void live_viewer_connection_destroy(
+               struct live_viewer_connection *viewer_connection)
 {
        BT_LOGD("Closing connection to url \"%s\"", viewer_connection->url->str);
        lttng_live_disconnect_viewer(viewer_connection);
-       g_string_free(viewer_connection->url, TRUE);
+       g_string_free(viewer_connection->url, true);
        if (viewer_connection->relay_hostname) {
-               g_string_free(viewer_connection->relay_hostname, TRUE);
+               g_string_free(viewer_connection->relay_hostname, true);
        }
        if (viewer_connection->target_hostname) {
-               g_string_free(viewer_connection->target_hostname, TRUE);
+               g_string_free(viewer_connection->target_hostname, true);
        }
        if (viewer_connection->session_name) {
-               g_string_free(viewer_connection->session_name, TRUE);
+               g_string_free(viewer_connection->session_name, true);
        }
        g_free(viewer_connection);
 
index 90d2dcff608714c0cc4eaf39222a691d3b7fa053..85fd745d9ddad21cf84d70e0006bd3f7dd768b34 100644 (file)
@@ -40,7 +40,7 @@
 
 struct lttng_live_component;
 
-struct bt_live_viewer_connection {
+struct live_viewer_connection {
        bt_object obj;
 
        GString *url;
@@ -55,12 +55,13 @@ struct bt_live_viewer_connection {
        int32_t major;
        int32_t minor;
 
-       struct lttng_live_component *lttng_live;
+       bool in_query;
+       struct lttng_live_msg_iter *lttng_live_msg_iter;
 };
 
 struct packet_index_time {
-       int64_t timestamp_begin;
-       int64_t timestamp_end;
+       uint64_t timestamp_begin;
+       uint64_t timestamp_end;
 };
 
 struct packet_index {
@@ -77,11 +78,15 @@ struct packet_index {
        uint64_t packet_seq_num;        /* packet sequence number */
 };
 
-struct bt_live_viewer_connection *
-       bt_live_viewer_connection_create(const char *url, struct lttng_live_component *lttng_live);
+struct live_viewer_connection * live_viewer_connection_create(
+               const char *url, bool in_query,
+               struct lttng_live_msg_iter *lttng_live_msg_iter);
 
-void bt_live_viewer_connection_destroy(struct bt_live_viewer_connection *conn);
+void live_viewer_connection_destroy(
+               struct live_viewer_connection *conn);
 
-bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_connection *viewer_connection);
+bt_query_status live_viewer_connection_list_sessions(
+               struct live_viewer_connection *viewer_connection,
+               const bt_value **user_result);
 
 #endif /* LTTNG_LIVE_VIEWER_CONNECTION_H */
index 1e99e1cd2a16d16ed51cbc2619884029be479338..d7e073d1fa2c7f0a56758c4afcd7887fef3170a9 100644 (file)
@@ -30,6 +30,7 @@
 
 #include "fs-src/fs.h"
 #include "fs-sink/fs-sink.h"
+#include "lttng-live/lttng-live.h"
 
 #ifndef BT_BUILT_IN_PLUGINS
 BT_PLUGIN_MODULE();
@@ -63,22 +64,18 @@ BT_PLUGIN_SINK_COMPONENT_CLASS_GRAPH_IS_CONFIGURED_METHOD(fs,
        ctf_fs_sink_graph_is_configured);
 BT_PLUGIN_SINK_COMPONENT_CLASS_DESCRIPTION(fs, "Write CTF traces to the file system.");
 
-#if 0
 /* ctf.lttng-live source */
 BT_PLUGIN_SOURCE_COMPONENT_CLASS_WITH_ID(auto, lttng_live, "lttng-live",
-       lttng_live_iterator_next);
+       lttng_live_msg_iter_next);
 BT_PLUGIN_SOURCE_COMPONENT_CLASS_DESCRIPTION_WITH_ID(auto, lttng_live,
-        "Connect to an LTTng relay daemon and receive CTF streams.");
+       "Connect to an LTTng relay daemon and receive CTF streams.");
 BT_PLUGIN_SOURCE_COMPONENT_CLASS_INIT_METHOD_WITH_ID(auto, lttng_live,
        lttng_live_component_init);
 BT_PLUGIN_SOURCE_COMPONENT_CLASS_QUERY_METHOD_WITH_ID(auto, lttng_live,
        lttng_live_query);
 BT_PLUGIN_SOURCE_COMPONENT_CLASS_FINALIZE_METHOD_WITH_ID(auto, lttng_live,
        lttng_live_component_finalize);
-BT_PLUGIN_SOURCE_COMPONENT_CLASS_ACCEPT_PORT_CONNECTION_METHOD_WITH_ID(auto,
-       lttng_live, lttng_live_accept_port_connection);
-BT_PLUGIN_SOURCE_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_WITH_ID(
-       auto, lttng_live, lttng_live_iterator_init);
-BT_PLUGIN_SOURCE_COMPONENT_CLASS_MESSAGE_ITERATOR_FINALIZE_METHOD_WITH_ID(
-       auto, lttng_live, lttng_live_iterator_finalize);
-#endif
+BT_PLUGIN_SOURCE_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_WITH_ID(auto,
+       lttng_live, lttng_live_msg_iter_init);
+BT_PLUGIN_SOURCE_COMPONENT_CLASS_MESSAGE_ITERATOR_FINALIZE_METHOD_WITH_ID(auto,
+       lttng_live, lttng_live_msg_iter_finalize);
This page took 0.11418 seconds and 5 git commands to generate.