src.ctf.lttng-live: add live_viewer_connection destructor
[babeltrace.git] / src / plugins / ctf / lttng-live / lttng-live.cpp
index 4d4dd162de0e62738d2ac2425b93cc61349fafae..cdf324c21298c2268a9d54753c8e3c424583b2e2 100644 (file)
@@ -14,6 +14,7 @@
 #include "common/assert.h"
 #include "cpp-common/bt2c/fmt.hpp"
 #include "cpp-common/bt2c/glib-up.hpp"
+#include "cpp-common/bt2c/vector.hpp"
 #include "cpp-common/bt2s/make-unique.hpp"
 #include "cpp-common/vendor/fmt/format.h"
 
@@ -72,30 +73,13 @@ end:
 static struct lttng_live_trace *
 lttng_live_session_borrow_trace_by_id(struct lttng_live_session *session, uint64_t trace_id)
 {
-    uint64_t trace_idx;
-    struct lttng_live_trace *ret_trace = NULL;
-
-    for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
-        struct lttng_live_trace *trace =
-            (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
+    for (lttng_live_trace::UP& trace : session->traces) {
         if (trace->id == trace_id) {
-            ret_trace = trace;
-            goto end;
+            return trace.get();
         }
     }
 
-end:
-    return ret_trace;
-}
-
-static void lttng_live_destroy_trace(struct lttng_live_trace *trace)
-{
-    BT_CPPLOGD_SPEC(trace->logger, "Destroying live trace: trace-id={}", trace->id);
-
-    BT_ASSERT(trace->stream_iterators);
-    g_ptr_array_free(trace->stream_iterators, TRUE);
-
-    delete trace;
+    return nullptr;
 }
 
 static struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
@@ -104,16 +88,15 @@ static struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_sessio
     BT_CPPLOGD_SPEC(session->logger, "Creating live trace: session-id={}, trace-id={}", session->id,
                     trace_id);
 
-    lttng_live_trace *trace = new lttng_live_trace {session->logger};
+    auto trace = bt2s::make_unique<lttng_live_trace>(session->logger);
+
     trace->session = session;
     trace->id = trace_id;
-    trace->stream_iterators =
-        g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_stream_iterator_destroy);
-    BT_ASSERT(trace->stream_iterators);
     trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
-    g_ptr_array_add(session->traces, trace);
 
-    return trace;
+    const auto ret = trace.get();
+    session->traces.emplace_back(std::move(trace));
+    return ret;
 }
 
 struct lttng_live_trace *
@@ -142,61 +125,35 @@ int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint
                     "session-id={}, hostname=\"{}\", session-name=\"{}\"",
                     session_id, hostname, session_name);
 
-    lttng_live_session *session = new lttng_live_session {lttng_live_msg_iter->logger};
+    auto session = bt2s::make_unique<lttng_live_session>(lttng_live_msg_iter->logger);
+
     session->self_comp = lttng_live_msg_iter->self_comp;
     session->id = session_id;
-    session->traces = g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_trace);
-    BT_ASSERT(session->traces);
     session->lttng_live_msg_iter = lttng_live_msg_iter;
     session->new_streams_needed = true;
-    session->hostname = g_string_new(hostname);
-    BT_ASSERT(session->hostname);
-
-    session->session_name = g_string_new(session_name);
-    BT_ASSERT(session->session_name);
+    session->hostname = hostname;
+    session->session_name = session_name;
 
-    g_ptr_array_add(lttng_live_msg_iter->sessions, session);
+    lttng_live_msg_iter->sessions.emplace_back(std::move(session));
 
     return 0;
 }
 
-static void lttng_live_destroy_session(struct lttng_live_session *session)
+lttng_live_session::~lttng_live_session()
 {
-    if (!session) {
-        goto end;
-    }
+    BT_CPPLOGD_SPEC(this->logger, "Destroying live session: session-id={}, session-name=\"{}\"",
+                    this->id, this->session_name);
 
-    BT_CPPLOGD_SPEC(session->logger,
-                    "Destroying live session: "
-                    "session-id={}, session-name=\"{}\"",
-                    session->id, session->session_name->str);
-    if (session->id != -1ULL) {
-        if (lttng_live_session_detach(session)) {
-            if (!lttng_live_graph_is_canceled(session->lttng_live_msg_iter)) {
+    if (this->id != -1ULL) {
+        if (lttng_live_session_detach(this)) {
+            if (!lttng_live_graph_is_canceled(this->lttng_live_msg_iter)) {
                 /* Old relayd cannot detach sessions. */
-                BT_CPPLOGD_SPEC(session->logger, "Unable to detach lttng live session {}",
-                                session->id);
+                BT_CPPLOGD_SPEC(this->logger, "Unable to detach lttng live session {}", this->id);
             }
         }
-        session->id = -1ULL;
-    }
-
-    if (session->traces) {
-        g_ptr_array_free(session->traces, TRUE);
-    }
-
-    if (session->hostname) {
-        g_string_free(session->hostname, TRUE);
-    }
 
-    if (session->session_name) {
-        g_string_free(session->session_name, TRUE);
+        this->id = -1ULL;
     }
-
-    delete session;
-
-end:
-    return;
 }
 
 static void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter)
@@ -205,12 +162,8 @@ static void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_m
         goto end;
     }
 
-    if (lttng_live_msg_iter->sessions) {
-        g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
-    }
-
     if (lttng_live_msg_iter->viewer_connection) {
-        live_viewer_connection_destroy(lttng_live_msg_iter->viewer_connection);
+        delete lttng_live_msg_iter->viewer_connection;
     }
     BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
     BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
@@ -276,21 +229,19 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_da
 
     if (lttng_live_stream->trace->metadata_stream_state ==
         LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
-        BT_CPPLOGD_SPEC(
-            lttng_live_msg_iter->logger,
-            "Need to get an update for the metadata stream before proceeding further with this stream: "
-            "stream-name=\"{}\"",
-            lttng_live_stream->name->str);
+        BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
+                        "Need to get an update for the metadata stream before proceeding "
+                        "further with this stream: stream-name=\"{}\"",
+                        lttng_live_stream->name);
         ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
         goto end;
     }
 
     if (lttng_live_stream->trace->session->new_streams_needed) {
-        BT_CPPLOGD_SPEC(
-            lttng_live_msg_iter->logger,
-            "Need to get an update of all streams before proceeding further with this stream: "
-            "stream-name=\"{}\"",
-            lttng_live_stream->name->str);
+        BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
+                        "Need to get an update of all streams before proceeding further "
+                        "with this stream: stream-name=\"{}\"",
+                        lttng_live_stream->name);
         ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
         goto end;
     }
@@ -333,7 +284,7 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_da
     BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
                     "Setting live stream reading info: stream-name=\"{}\", "
                     "viewer-stream-id={}, stream-base-offset={}, stream-offset={}, stream-len={}",
-                    lttng_live_stream->name->str, lttng_live_stream->viewer_stream_id,
+                    lttng_live_stream->name, lttng_live_stream->viewer_stream_id,
                     lttng_live_stream->base_offset, lttng_live_stream->offset,
                     lttng_live_stream->len);
 
@@ -356,7 +307,6 @@ lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
                        struct lttng_live_session *session)
 {
     enum lttng_live_iterator_status status;
-    uint64_t trace_idx;
 
     if (!session->attached) {
         BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Attach to session: session-id={}",
@@ -383,9 +333,8 @@ lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
     }
 
     BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
-                    "Updating all data streams: "
-                    "session-id={}, session-name=\"{}\"",
-                    session->id, session->session_name->str);
+                    "Updating all data streams: session-id={}, session-name=\"{}\"", session->id,
+                    session->session_name);
 
     status = lttng_live_session_get_new_streams(session, lttng_live_msg_iter->self_msg_iter);
     switch (status) {
@@ -393,23 +342,23 @@ lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
         break;
     case LTTNG_LIVE_ITERATOR_STATUS_END:
         /*
-                * We received a `_END` from the `_get_new_streams()` function,
-                * which means no more data will ever be received from the data
-                * streams of this session. But it's possible that the metadata
-                * is incomplete.
-                * The live protocol guarantees that we receive all the
-                * metadata needed before we receive data streams needing it.
-                * But it's possible to receive metadata NOT needed by
-                * data streams after the session was closed. For example, this
-                * could happen if a new event is registered and the session is
-                * stopped before any tracepoint for that event is actually
-                * fired.
-                */
+         * We received a `_END` from the `_get_new_streams()` function,
+         * which means no more data will ever be received from the data
+         * streams of this session. But it's possible that the metadata
+         * is incomplete.
+         * The live protocol guarantees that we receive all the
+         * metadata needed before we receive data streams needing it.
+         * But it's possible to receive metadata NOT needed by
+         * data streams after the session was closed. For example, this
+         * could happen if a new event is registered and the session is
+         * stopped before any tracepoint for that event is actually
+         * fired.
+         */
         BT_CPPLOGD_SPEC(
             lttng_live_msg_iter->logger,
             "Updating streams returned _END status. Override status to _OK in order fetch any remaining metadata:"
             "session-id={}, session-name=\"{}\"",
-            session->id, session->session_name->str);
+            session->id, session->session_name);
         status = LTTNG_LIVE_ITERATOR_STATUS_OK;
         break;
     default:
@@ -417,20 +366,14 @@ lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
     }
 
     BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
-                    "Updating metadata stream for session: "
-                    "session-id={}, session-name=\"{}\"",
-                    session->id, session->session_name->str);
+                    "Updating metadata stream for session: session-id={}, session-name=\"{}\"",
+                    session->id, session->session_name);
 
-    trace_idx = 0;
-    while (trace_idx < session->traces->len) {
-        struct lttng_live_trace *trace =
-            (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
-
-        status = lttng_live_metadata_update(trace);
+    for (lttng_live_trace::UP& trace : session->traces) {
+        status = lttng_live_metadata_update(trace.get());
         switch (status) {
         case LTTNG_LIVE_ITERATOR_STATUS_END:
         case LTTNG_LIVE_ITERATOR_STATUS_OK:
-            trace_idx++;
             break;
         case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
         case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
@@ -457,19 +400,13 @@ end:
 static void
 lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
 {
-    uint64_t session_idx, trace_idx;
-
-    for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
-        struct lttng_live_session *session =
-            (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
+    for (const auto& session : lttng_live_msg_iter->sessions) {
         BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
                         "Force marking session as needing new streams: "
                         "session-id={}",
                         session->id);
         session->new_streams_needed = true;
-        for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
-            struct lttng_live_trace *trace =
-                (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
+        for (lttng_live_trace::UP& trace : session->traces) {
             BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
                             "Force marking trace metadata state as needing an update: "
                             "session-id={}, trace-id={}",
@@ -487,8 +424,7 @@ lttng_live_iterator_handle_new_streams_and_metadata(struct lttng_live_msg_iter *
 {
     enum lttng_live_iterator_status status;
     enum lttng_live_viewer_status viewer_status;
-    uint64_t session_idx = 0, nr_sessions_opened = 0;
-    struct lttng_live_session *session;
+    uint64_t nr_sessions_opened = 0;
     enum session_not_found_action sess_not_found_act =
         lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
 
@@ -502,7 +438,7 @@ lttng_live_iterator_handle_new_streams_and_metadata(struct lttng_live_msg_iter *
      * need to query for new sessions even though we have sessions
      * currently ongoing.
      */
-    if (lttng_live_msg_iter->sessions->len == 0) {
+    if (lttng_live_msg_iter->sessions.empty()) {
         if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
             BT_CPPLOGD_SPEC(
                 lttng_live_msg_iter->logger,
@@ -533,10 +469,8 @@ lttng_live_iterator_handle_new_streams_and_metadata(struct lttng_live_msg_iter *
         }
     }
 
-    for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
-        session =
-            (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
-        status = lttng_live_get_session(lttng_live_msg_iter, session);
+    for (const auto& session : lttng_live_msg_iter->sessions) {
+        status = lttng_live_get_session(lttng_live_msg_iter, session.get());
         switch (status) {
         case LTTNG_LIVE_ITERATOR_STATUS_OK:
         case LTTNG_LIVE_ITERATOR_STATUS_END:
@@ -566,41 +500,32 @@ end:
 
 static enum lttng_live_iterator_status
 emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
-                        struct lttng_live_stream_iterator *stream_iter, const bt_message **message,
-                        uint64_t timestamp)
+                        struct lttng_live_stream_iterator *stream_iter,
+                        bt2::ConstMessage::Shared& message, uint64_t timestamp)
 {
-    enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
-    bt_message *msg = NULL;
-
     BT_ASSERT(stream_iter->trace->clock_class);
-
     BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
                     "Emitting inactivity message for stream: ctf-stream-id={}, "
                     "viewer-stream-id={}, timestamp={}",
                     stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id,
                     timestamp);
 
-    msg = bt_message_message_iterator_inactivity_create(lttng_live_msg_iter->self_msg_iter,
-                                                        stream_iter->trace->clock_class, timestamp);
+    const auto msg = bt_message_message_iterator_inactivity_create(
+        lttng_live_msg_iter->self_msg_iter, stream_iter->trace->clock_class, timestamp);
+
     if (!msg) {
         BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
                                      "Error emitting message iterator inactivity message");
-        goto error;
+        return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
     }
 
-    *message = msg;
-end:
-    return ret;
-
-error:
-    ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
-    bt_message_put_ref(msg);
-    goto end;
+    message = bt2::ConstMessage::Shared::createWithoutRef(msg);
+    return LTTNG_LIVE_ITERATOR_STATUS_OK;
 }
 
 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
     struct lttng_live_msg_iter *lttng_live_msg_iter,
-    struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
+    struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message)
 {
     enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
 
@@ -700,16 +625,12 @@ end:
 
 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
     struct lttng_live_msg_iter *lttng_live_msg_iter,
-    struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
+    struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message)
 {
     enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
     enum ctf_msg_iter_status status;
-    uint64_t session_idx, trace_idx;
-
-    for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
-        struct lttng_live_session *session =
-            (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
 
+    for (const auto& session : lttng_live_msg_iter->sessions) {
         if (session->new_streams_needed) {
             BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
                             "Need an update for streams: "
@@ -718,9 +639,7 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_activ
             ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
             goto end;
         }
-        for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
-            struct lttng_live_trace *trace =
-                (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
+        for (lttng_live_trace::UP& trace : session->traces) {
             if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
                 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
                                 "Need an update for metadata stream: "
@@ -741,12 +660,14 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_activ
         goto end;
     }
 
-    status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter, message);
+    const bt_message *msg;
+    status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), &msg);
     switch (status) {
     case CTF_MSG_ITER_STATUS_EOF:
         ret = LTTNG_LIVE_ITERATOR_STATUS_END;
         break;
     case CTF_MSG_ITER_STATUS_OK:
+        message = bt2::ConstMessage::Shared::createWithoutRef(msg);
         ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
         break;
     case CTF_MSG_ITER_STATUS_AGAIN:
@@ -774,22 +695,23 @@ end:
 static enum lttng_live_iterator_status
 lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
                                  struct lttng_live_stream_iterator *stream_iter,
-                                 const bt_message **curr_msg)
+                                 bt2::ConstMessage::Shared& curr_msg)
 {
     enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
 
     BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
                     "Closing live stream iterator: stream-name=\"{}\", "
                     "viewer-stream-id={}",
-                    stream_iter->name->str, stream_iter->viewer_stream_id);
+                    stream_iter->name, stream_iter->viewer_stream_id);
 
     /*
      * The viewer has hung up on us so we are closing the stream. The
      * `ctf_msg_iter` should simply realize that it needs to close the
      * stream properly by emitting the necessary stream end message.
      */
+    const bt_message *msg;
     enum ctf_msg_iter_status status =
-        ctf_msg_iter_get_next_message(stream_iter->msg_iter, curr_msg);
+        ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), &msg);
 
     if (status == CTF_MSG_ITER_STATUS_ERROR) {
         BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
@@ -805,6 +727,8 @@ lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter
 
     BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
 
+    curr_msg = bt2::ConstMessage::Shared::createWithoutRef(msg);
+
 end:
     return live_status;
 }
@@ -860,14 +784,14 @@ end:
 static enum lttng_live_iterator_status
 lttng_live_iterator_next_msg_on_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
                                        struct lttng_live_stream_iterator *stream_iter,
-                                       const bt_message **curr_msg)
+                                       bt2::ConstMessage::Shared& curr_msg)
 {
     enum lttng_live_iterator_status live_status;
 
     BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
                     "Advancing live stream iterator until next message if possible: "
                     "stream-name=\"{}\", viewer-stream-id={}",
-                    stream_iter->name->str, stream_iter->viewer_stream_id);
+                    stream_iter->name, stream_iter->viewer_stream_id);
 
     if (stream_iter->has_stream_hung_up) {
         /*
@@ -909,16 +833,16 @@ retry:
     live_status = lttng_live_iterator_next_handle_one_quiescent_stream(lttng_live_msg_iter,
                                                                        stream_iter, curr_msg);
     if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
-        BT_ASSERT(!*curr_msg);
+        BT_ASSERT(!curr_msg);
         goto end;
     }
-    if (*curr_msg) {
+    if (curr_msg) {
         goto end;
     }
     live_status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live_msg_iter,
                                                                          stream_iter, curr_msg);
     if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
-        BT_ASSERT(!*curr_msg);
+        BT_ASSERT(!curr_msg);
     }
 
 end:
@@ -932,25 +856,22 @@ end:
     BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
                     "Returning from advancing live stream iterator: status={}, "
                     "stream-name=\"{}\", viewer-stream-id={}",
-                    live_status, stream_iter->name->str, stream_iter->viewer_stream_id);
+                    live_status, stream_iter->name, stream_iter->viewer_stream_id);
 
     return live_status;
 }
 
-static bool is_discarded_packet_or_event_message(const bt_message *msg)
+static bool is_discarded_packet_or_event_message(const bt2::ConstMessage msg)
 {
-    const enum bt_message_type msg_type = bt_message_get_type(msg);
-
-    return msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS ||
-           msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS;
+    return msg.type() == bt2::MessageType::DiscardedEvents ||
+           msg.type() == bt2::MessageType::DiscardedPackets;
 }
 
 static enum lttng_live_iterator_status
 adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream,
-                                 const bt_message *msg_in, bt_message **msg_out,
+                                 const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out,
                                  uint64_t new_begin_ts)
 {
-    enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
     enum bt_property_availability availability;
     const bt_clock_snapshot *clock_snapshot;
     uint64_t end_ts;
@@ -962,24 +883,23 @@ adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream
     availability = bt_message_discarded_packets_get_count(msg_in, &count);
     BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
 
-    *msg_out = bt_message_discarded_packets_create_with_default_clock_snapshots(
+    const auto msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
         iter, stream, new_begin_ts, end_ts);
-    if (!*msg_out) {
-        status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
-        goto end;
+
+    if (!msg) {
+        return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
     }
 
-    bt_message_discarded_packets_set_count(*msg_out, count);
-end:
-    return status;
+    bt_message_discarded_packets_set_count(msg, count);
+    msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
+    return LTTNG_LIVE_ITERATOR_STATUS_OK;
 }
 
 static enum lttng_live_iterator_status
 adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream,
-                                const bt_message *msg_in, bt_message **msg_out,
+                                const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out,
                                 uint64_t new_begin_ts)
 {
-    enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
     enum bt_property_availability availability;
     const bt_clock_snapshot *clock_snapshot;
     uint64_t end_ts;
@@ -991,22 +911,22 @@ adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream
     availability = bt_message_discarded_events_get_count(msg_in, &count);
     BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
 
-    *msg_out = bt_message_discarded_events_create_with_default_clock_snapshots(
+    const auto msg = bt_message_discarded_events_create_with_default_clock_snapshots(
         iter, stream, new_begin_ts, end_ts);
-    if (!*msg_out) {
-        status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
-        goto end;
+
+    if (!msg) {
+        return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
     }
 
-    bt_message_discarded_events_set_count(*msg_out, count);
-end:
-    return status;
+    bt_message_discarded_events_set_count(msg, count);
+    msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
+    return LTTNG_LIVE_ITERATOR_STATUS_OK;
 }
 
 static enum lttng_live_iterator_status
 handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
                     struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns,
-                    const bt_message *late_msg)
+                    const bt2::ConstMessage& late_msg)
 {
     const bt_clock_class *clock_class;
     const bt_stream_class *stream_class;
@@ -1014,7 +934,7 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
     int64_t last_inactivity_ts_ns;
     enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
     enum lttng_live_iterator_status adjust_status;
-    bt_message *adjusted_message;
+    bt2::ConstMessage::Shared adjusted_message;
 
     /*
      * The timestamp of the current message is before the last message sent
@@ -1039,16 +959,16 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
      * In short, the only scenario in which it's okay and fixable to
      * received a late message is when:
      *  1. the late message is a discarded packets or discarded events
-     * message,
+     *     message,
      *  2. this stream produced an inactivity message downstream, and
      *  3. the timestamp of the late message is within the inactivity
-     * timespan we sent downstream through the inactivity message.
+     *     timespan we sent downstream through the inactivity message.
      */
 
     BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
                     "Handling late message on live stream iterator: "
                     "stream-name=\"{}\", viewer-stream-id={}",
-                    stream_iter->name->str, stream_iter->viewer_stream_id);
+                    stream_iter->name, stream_iter->viewer_stream_id);
 
     if (!stream_iter->last_inactivity_ts.is_set) {
         BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
@@ -1064,12 +984,12 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
                                      "Invalid live stream state: "
                                      "have a late message that is not a packet discarded or "
                                      "event discarded message: late-msg-type={}",
-                                     static_cast<bt2::MessageType>(bt_message_get_type(late_msg)));
+                                     late_msg.type());
         stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
         goto end;
     }
 
-    stream_class = bt_stream_borrow_class_const(stream_iter->stream);
+    stream_class = bt_stream_borrow_class_const(stream_iter->stream->libObjPtr());
     clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class);
 
     ts_ns_status = bt_clock_class_cycles_to_ns_from_origin(
@@ -1100,18 +1020,17 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
     BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
                     "Adjusting the timestamp of late message: late-msg-type={}, "
                     "msg-new-ts-ns={}",
-                    static_cast<bt2::MessageType>(bt_message_get_type(late_msg)),
-                    stream_iter->last_inactivity_ts.value);
-    switch (bt_message_get_type(late_msg)) {
-    case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
+                    late_msg.type(), stream_iter->last_inactivity_ts.value);
+    switch (late_msg.type()) {
+    case bt2::MessageType::DiscardedEvents:
         adjust_status = adjust_discarded_events_message(
-            lttng_live_msg_iter->self_msg_iter, stream_iter->stream, late_msg, &adjusted_message,
-            stream_iter->last_inactivity_ts.value);
+            lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(),
+            late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
         break;
-    case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
+    case bt2::MessageType::DiscardedPackets:
         adjust_status = adjust_discarded_packets_message(
-            lttng_live_msg_iter->self_msg_iter, stream_iter->stream, late_msg, &adjusted_message,
-            stream_iter->last_inactivity_ts.value);
+            lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(),
+            late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
         break;
     default:
         bt_common_abort();
@@ -1125,7 +1044,6 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
     BT_ASSERT_DBG(adjusted_message);
     stream_iter->current_msg = adjusted_message;
     stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
-    bt_message_put_ref(late_msg);
 
 end:
     return stream_iter_status;
@@ -1142,7 +1060,6 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
     uint64_t stream_iter_idx;
 
     BT_ASSERT_DBG(live_trace);
-    BT_ASSERT_DBG(live_trace->stream_iterators);
 
     BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
                     "Finding the next stream iterator for trace: "
@@ -1155,22 +1072,21 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
      * ensure monotonicity.
      */
     stream_iter_idx = 0;
-    while (stream_iter_idx < live_trace->stream_iterators->len) {
+    while (stream_iter_idx < live_trace->stream_iterators.size()) {
         bool stream_iter_is_ended = false;
-        struct lttng_live_stream_iterator *stream_iter =
-            (lttng_live_stream_iterator *) g_ptr_array_index(live_trace->stream_iterators,
-                                                             stream_iter_idx);
+        lttng_live_stream_iterator *stream_iter =
+            live_trace->stream_iterators[stream_iter_idx].get();
 
         /*
          * If there is no current message for this stream, go fetch
          * one.
          */
         while (!stream_iter->current_msg) {
-            const bt_message *msg = NULL;
+            bt2::ConstMessage::Shared msg;
             int64_t curr_msg_ts_ns = INT64_MAX;
 
             stream_iter_status =
-                lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, &msg);
+                lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, msg);
 
             if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
                 stream_iter_is_ended = true;
@@ -1186,15 +1102,14 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
             BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
                             "Live stream iterator returned message: msg-type={}, "
                             "stream-name=\"{}\", viewer-stream-id={}",
-                            static_cast<bt2::MessageType>(bt_message_get_type(msg)),
-                            stream_iter->name->str, stream_iter->viewer_stream_id);
+                            msg->type(), stream_iter->name, stream_iter->viewer_stream_id);
 
             /*
              * Get the timestamp in nanoseconds from origin of this
              * message.
              */
-            live_get_msg_ts_ns(lttng_live_msg_iter, msg, lttng_live_msg_iter->last_msg_ts_ns,
-                               &curr_msg_ts_ns);
+            live_get_msg_ts_ns(lttng_live_msg_iter, msg->libObjPtr(),
+                               lttng_live_msg_iter->last_msg_ts_ns, &curr_msg_ts_ns);
 
             /*
              * Check if the message of the current live stream
@@ -1203,7 +1118,7 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
              * iterator. If not, we need to handle it with care.
              */
             if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
-                stream_iter->current_msg = msg;
+                stream_iter->current_msg = std::move(msg);
                 stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
             } else {
                 /*
@@ -1211,15 +1126,15 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
                  * may be fixable but it can also be an error.
                  */
                 stream_iter_status =
-                    handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, msg);
+                    handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, *msg);
                 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
                     BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
                                                  "Late message could not be handled correctly: "
                                                  "lttng-live-msg-iter-addr={}, "
                                                  "stream-name=\"{}\", "
                                                  "curr-msg-ts={}, last-msg-ts={}",
-                                                 fmt::ptr(lttng_live_msg_iter),
-                                                 stream_iter->name->str, curr_msg_ts_ns,
+                                                 fmt::ptr(lttng_live_msg_iter), stream_iter->name,
+                                                 curr_msg_ts_ns,
                                                  lttng_live_msg_iter->last_msg_ts_ns);
                     stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
                     goto end;
@@ -1246,7 +1161,8 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
                  */
                 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
                 int ret = common_muxing_compare_messages(
-                    stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
+                    stream_iter->current_msg->libObjPtr(),
+                    youngest_candidate_stream_iter->current_msg->libObjPtr());
                 if (ret < 0) {
                     /*
                      * The `youngest_candidate_stream_iter->current_msg`
@@ -1280,7 +1196,7 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
              * removed element with the array's last
              * element.
              */
-            g_ptr_array_remove_index_fast(live_trace->stream_iterators, stream_iter_idx);
+            bt2c::vectorFastRemove(live_trace->stream_iterators, stream_iter_idx);
         }
     }
 
@@ -1292,7 +1208,7 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
          * The only case where we don't have a candidate for this trace
          * is if we reached the end of all the iterators.
          */
-        BT_ASSERT(live_trace->stream_iterators->len == 0);
+        BT_ASSERT(live_trace->stream_iterators.empty());
         stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
     }
 
@@ -1325,13 +1241,10 @@ next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter
         goto end;
     }
 
-    BT_ASSERT_DBG(session->traces);
-
-    while (trace_idx < session->traces->len) {
+    while (trace_idx < session->traces.size()) {
         bool trace_is_ended = false;
         struct lttng_live_stream_iterator *stream_iter;
-        struct lttng_live_trace *trace =
-            (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
+        lttng_live_trace *trace = session->traces[trace_idx].get();
 
         stream_iter_status =
             next_stream_iterator_for_trace(lttng_live_msg_iter, trace, &stream_iter);
@@ -1358,7 +1271,8 @@ next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter
                  * deterministic way.
                  */
                 int ret = common_muxing_compare_messages(
-                    stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
+                    stream_iter->current_msg->libObjPtr(),
+                    youngest_candidate_stream_iter->current_msg->libObjPtr());
                 if (ret < 0) {
                     /*
                      * The `youngest_candidate_stream_iter->current_msg`
@@ -1381,10 +1295,10 @@ next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter
         } else {
             /*
              * trace_idx is not incremented since
-             * g_ptr_array_remove_index_fast replaces the
+             * vectorFastRemove replaces the
              * element at trace_idx with the array's last element.
              */
-            g_ptr_array_remove_index_fast(session->traces, trace_idx);
+            bt2c::vectorFastRemove(session->traces, trace_idx);
         }
     }
     if (youngest_candidate_stream_iter) {
@@ -1400,7 +1314,7 @@ next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter
          *
          * In either cases, we return END.
          */
-        BT_ASSERT(session->traces->len == 0);
+        BT_ASSERT(session->traces.empty());
         stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
     }
 end:
@@ -1427,7 +1341,6 @@ lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array
             (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_it);
         struct lttng_live_component *lttng_live = lttng_live_msg_iter->lttng_live_comp;
         enum lttng_live_iterator_status stream_iter_status;
-        uint64_t session_idx;
 
         *count = 0;
 
@@ -1435,13 +1348,13 @@ lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array
 
         if (G_UNLIKELY(lttng_live_msg_iter->was_interrupted)) {
             /*
-         * The iterator was interrupted in a previous call to the
-         * `_next()` method. We currently do not support generating
-         * messages after such event. The babeltrace2 CLI should never
-         * be running the graph after being interrupted. So this check
-         * is to prevent other graph users from using this live
-         * iterator in an messed up internal state.
-         */
+             * The iterator was interrupted in a previous call to the
+             * `_next()` method. We currently do not support generating
+             * messages after such event. The babeltrace2 CLI should never
+             * be running the graph after being interrupted. So this check
+             * is to prevent other graph users from using this live
+             * iterator in an messed up internal state.
+             */
             status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
             BT_CPPLOGE_APPEND_CAUSE_SPEC(
                 lttng_live_msg_iter->logger,
@@ -1450,26 +1363,26 @@ lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array
         }
 
         /*
-     * Clear all the invalid message reference that might be left over in
-     * the output array.
-     */
+         * Clear all the invalid message reference that might be left over in
+         * the output array.
+         */
         memset(msgs, 0, capacity * sizeof(*msgs));
 
         /*
-     * If no session are exposed on the relay found at the url provided by
-     * the user, session count will be 0. In this case, we return status
-     * end to return gracefully.
-     */
-        if (lttng_live_msg_iter->sessions->len == 0) {
+         * If no session are exposed on the relay found at the url provided by
+         * the user, session count will be 0. In this case, we return status
+         * end to return gracefully.
+         */
+        if (lttng_live_msg_iter->sessions.empty()) {
             if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
                 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
                 goto end;
             } else {
                 /*
-             * The are no more active session for this session
-             * name. Retry to create a viewer session for the
-             * requested session name.
-             */
+                 * The are no more active session for this session
+                 * name. Retry to create a viewer session for the
+                 * requested session name.
+                 */
                 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
                 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
                     if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
@@ -1491,53 +1404,52 @@ lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array
         }
 
         /*
-     * Here the muxing of message is done.
-     *
-     * We need to iterate over all the streams of all the traces of all the
-     * viewer sessions in order to get the message with the smallest
-     * timestamp. In this case, a session is a viewer session and there is
-     * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
-     * kernel). Each viewer session can have multiple traces, for example,
-     * 64bit UST viewer sessions could have multiple per-pid traces.
-     *
-     * We iterate over the streams of each traces to update and see what is
-     * their next message's timestamp. From those timestamps, we select the
-     * message with the smallest timestamp as the best candidate message
-     * for that trace and do the same thing across all the sessions.
-     *
-     * We then compare the timestamp of best candidate message of all the
-     * sessions to pick the message with the smallest timestamp and we
-     * return it.
-     */
+         * Here the muxing of message is done.
+         *
+         * We need to iterate over all the streams of all the traces of all the
+         * viewer sessions in order to get the message with the smallest
+         * timestamp. In this case, a session is a viewer session and there is
+         * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
+         * kernel). Each viewer session can have multiple traces, for example,
+         * 64bit UST viewer sessions could have multiple per-pid traces.
+         *
+         * We iterate over the streams of each traces to update and see what is
+         * their next message's timestamp. From those timestamps, we select the
+         * message with the smallest timestamp as the best candidate message
+         * for that trace and do the same thing across all the sessions.
+         *
+         * We then compare the timestamp of best candidate message of all the
+         * sessions to pick the message with the smallest timestamp and we
+         * return it.
+         */
         while (*count < capacity) {
             struct lttng_live_stream_iterator *youngest_stream_iter = NULL,
                                               *candidate_stream_iter = NULL;
             int64_t youngest_msg_ts_ns = INT64_MAX;
 
-            BT_ASSERT_DBG(lttng_live_msg_iter->sessions);
-            session_idx = 0;
-            while (session_idx < lttng_live_msg_iter->sessions->len) {
-                struct lttng_live_session *session = (lttng_live_session *) g_ptr_array_index(
-                    lttng_live_msg_iter->sessions, session_idx);
+            uint64_t session_idx = 0;
+            while (session_idx < lttng_live_msg_iter->sessions.size()) {
+                lttng_live_session *session = lttng_live_msg_iter->sessions[session_idx].get();
 
                 /* Find the best candidate message to send downstream. */
                 stream_iter_status = next_stream_iterator_for_session(lttng_live_msg_iter, session,
                                                                       &candidate_stream_iter);
 
                 /* If we receive an END status, it means that either:
-             * - Those traces never had active streams (UST with no
-             *   data produced yet),
-             * - All live stream iterators have ENDed.*/
+                 * - Those traces never had active streams (UST with no
+                 *   data produced yet),
+                 * - All live stream iterators have ENDed.
+                 */
                 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
-                    if (session->closed && session->traces->len == 0) {
+                    if (session->closed && session->traces.empty()) {
                         /*
-                     * Remove the session from the list.
-                     * session_idx is not modified since
-                     * g_ptr_array_remove_index_fast
-                     * replaces the the removed element with
-                     * the array's last element.
-                     */
-                        g_ptr_array_remove_index_fast(lttng_live_msg_iter->sessions, session_idx);
+                         * Remove the session from the list.
+                         * session_idx is not modified since
+                         * g_ptr_array_remove_index_fast
+                         * replaces the the removed element with
+                         * the array's last element.
+                         */
+                        bt2c::vectorFastRemove(lttng_live_msg_iter->sessions, session_idx);
                     } else {
                         session_idx++;
                     }
@@ -1554,26 +1466,27 @@ lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array
                     youngest_stream_iter = candidate_stream_iter;
                 } else if (candidate_stream_iter->current_msg_ts_ns == youngest_msg_ts_ns) {
                     /*
-                 * The currently selected message to be sent
-                 * downstream next has the exact same timestamp
-                 * that of the current candidate message. We
-                 * must break the tie in a predictable manner.
-                 */
+                     * The currently selected message to be sent
+                     * downstream next has the exact same timestamp
+                     * that of the current candidate message. We
+                     * must break the tie in a predictable manner.
+                     */
                     BT_CPPLOGD_STR_SPEC(
                         lttng_live_msg_iter->logger,
                         "Two of the next message candidates have the same timestamps, pick one deterministically.");
                     /*
-                 * Order the messages in an arbitrary but
-                 * deterministic way.
-                 */
-                    int ret = common_muxing_compare_messages(candidate_stream_iter->current_msg,
-                                                             youngest_stream_iter->current_msg);
+                     * Order the messages in an arbitrary but
+                     * deterministic way.
+                     */
+                    int ret = common_muxing_compare_messages(
+                        candidate_stream_iter->current_msg->libObjPtr(),
+                        youngest_stream_iter->current_msg->libObjPtr());
                     if (ret < 0) {
                         /*
-                     * The `candidate_stream_iter->current_msg`
-                     * should go first. Update the next
-                     * iterator and the current timestamp.
-                     */
+                         * The `candidate_stream_iter->current_msg`
+                         * should go first. Update the next
+                         * iterator and the current timestamp.
+                         */
                         youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
                         youngest_stream_iter = candidate_stream_iter;
                     } else if (ret == 0) {
@@ -1601,11 +1514,11 @@ lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array
                           youngest_stream_iter->current_msg_ts_ns);
 
             /*
-         * Insert the next message to the message batch. This will set
-         * stream iterator current message to NULL so that next time
-         * we fetch the next message of that stream iterator
-         */
-            BT_MESSAGE_MOVE_REF(msgs[*count], youngest_stream_iter->current_msg);
+             * Insert the next message to the message batch. This will set
+             * stream iterator current message to NULL so that next time
+             * we fetch the next message of that stream iterator
+             */
+            msgs[*count] = youngest_stream_iter->current_msg.release().libObjPtr();
             (*count)++;
 
             /* Update the last timestamp in nanoseconds sent downstream. */
@@ -1620,13 +1533,13 @@ return_status:
         case LTTNG_LIVE_ITERATOR_STATUS_OK:
         case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
             /*
-         * If we gathered messages, return _OK even if the graph was
-         * interrupted. This allows for the components downstream to at
-         * least get the those messages. If the graph was indeed
-         * interrupted there should not be another _next() call as the
-         * application will tear down the graph. This component class
-         * doesn't support restarting after an interruption.
-         */
+             * If we gathered messages, return _OK even if the graph was
+             * interrupted. This allows for the components downstream to at
+             * least get the those messages. If the graph was indeed
+             * interrupted there should not be another _next() call as the
+             * application will tear down the graph. This component class
+             * doesn't support restarting after an interruption.
+             */
             if (*count > 0) {
                 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
             } else {
@@ -1681,10 +1594,6 @@ lttng_live_msg_iter_create(struct lttng_live_component *lttng_live_comp,
     msg_iter->last_msg_ts_ns = INT64_MIN;
     msg_iter->was_interrupted = false;
 
-    msg_iter->sessions =
-        g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_session);
-    BT_ASSERT(msg_iter->sessions);
-
     return msg_iter;
 }
 
@@ -1722,9 +1631,9 @@ lttng_live_msg_iter_init(bt_self_message_iterator *self_msg_it,
                                              "Failed to create viewer connection");
             } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
                 /*
-             * Interruption in the _iter_init() method is not
-             * supported. Return an error.
-             */
+                 * Interruption in the _iter_init() method is not
+                 * supported. Return an error.
+                 */
                 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
                                              "Interrupted while creating viewer connection");
             }
@@ -1740,9 +1649,9 @@ lttng_live_msg_iter_init(bt_self_message_iterator *self_msg_it,
                                              "Failed to create viewer session");
             } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
                 /*
-             * Interruption in the _iter_init() method is not
-             * supported. Return an error.
-             */
+                 * Interruption in the _iter_init() method is not
+                 * supported. Return an error.
+                 */
                 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
                                              "Interrupted when creating viewer session");
             }
@@ -1751,7 +1660,7 @@ lttng_live_msg_iter_init(bt_self_message_iterator *self_msg_it,
             goto error;
         }
 
-        if (lttng_live_msg_iter->sessions->len == 0) {
+        if (lttng_live_msg_iter->sessions.empty()) {
             switch (lttng_live->params.sess_not_found_act) {
             case SESSION_NOT_FOUND_ACTION_CONTINUE:
                 BT_CPPLOGI_SPEC(
@@ -1858,9 +1767,7 @@ error:
     }
 
 end:
-    if (viewer_connection) {
-        live_viewer_connection_destroy(viewer_connection);
-    }
+    delete viewer_connection;
 
     g_free(validate_error);
 
This page took 0.040776 seconds and 4 git commands to generate.