src.ctf.lttng-live: introduce lttng_live_session::UP and use it
[babeltrace.git] / src / plugins / ctf / lttng-live / lttng-live.cpp
index dfeb9ae830fef1396a5607c5b5259d2715107fcf..d1307bf837b654aae82d503793207cc32dbe66d5 100644 (file)
@@ -14,6 +14,7 @@
 #include "common/assert.h"
 #include "cpp-common/bt2c/fmt.hpp"
 #include "cpp-common/bt2c/glib-up.hpp"
+#include "cpp-common/bt2c/vector.hpp"
 #include "cpp-common/bt2s/make-unique.hpp"
 #include "cpp-common/vendor/fmt/format.h"
 
@@ -72,30 +73,13 @@ end:
 static struct lttng_live_trace *
 lttng_live_session_borrow_trace_by_id(struct lttng_live_session *session, uint64_t trace_id)
 {
-    uint64_t trace_idx;
-    struct lttng_live_trace *ret_trace = NULL;
-
-    for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
-        struct lttng_live_trace *trace =
-            (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
+    for (lttng_live_trace::UP& trace : session->traces) {
         if (trace->id == trace_id) {
-            ret_trace = trace;
-            goto end;
+            return trace.get();
         }
     }
 
-end:
-    return ret_trace;
-}
-
-static void lttng_live_destroy_trace(struct lttng_live_trace *trace)
-{
-    BT_CPPLOGD_SPEC(trace->logger, "Destroying live trace: trace-id={}", trace->id);
-
-    BT_ASSERT(trace->stream_iterators);
-    g_ptr_array_free(trace->stream_iterators, TRUE);
-
-    delete trace;
+    return nullptr;
 }
 
 static struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
@@ -104,16 +88,15 @@ static struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_sessio
     BT_CPPLOGD_SPEC(session->logger, "Creating live trace: session-id={}, trace-id={}", session->id,
                     trace_id);
 
-    lttng_live_trace *trace = new lttng_live_trace {session->logger};
+    auto trace = bt2s::make_unique<lttng_live_trace>(session->logger);
+
     trace->session = session;
     trace->id = trace_id;
-    trace->stream_iterators =
-        g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_stream_iterator_destroy);
-    BT_ASSERT(trace->stream_iterators);
     trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
-    g_ptr_array_add(session->traces, trace);
 
-    return trace;
+    const auto ret = trace.get();
+    session->traces.emplace_back(std::move(trace));
+    return ret;
 }
 
 struct lttng_live_trace *
@@ -142,61 +125,40 @@ int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint
                     "session-id={}, hostname=\"{}\", session-name=\"{}\"",
                     session_id, hostname, session_name);
 
-    lttng_live_session *session = new lttng_live_session {lttng_live_msg_iter->logger};
+    auto session = bt2s::make_unique<lttng_live_session>(lttng_live_msg_iter->logger);
+
     session->self_comp = lttng_live_msg_iter->self_comp;
     session->id = session_id;
-    session->traces = g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_trace);
-    BT_ASSERT(session->traces);
     session->lttng_live_msg_iter = lttng_live_msg_iter;
     session->new_streams_needed = true;
-    session->hostname = g_string_new(hostname);
-    BT_ASSERT(session->hostname);
+    session->hostname = hostname;
+    session->session_name = session_name;
 
-    session->session_name = g_string_new(session_name);
-    BT_ASSERT(session->session_name);
-
-    g_ptr_array_add(lttng_live_msg_iter->sessions, session);
+    g_ptr_array_add(lttng_live_msg_iter->sessions, session.release());
 
     return 0;
 }
 
-static void lttng_live_destroy_session(struct lttng_live_session *session)
+lttng_live_session::~lttng_live_session()
 {
-    if (!session) {
-        goto end;
-    }
+    BT_CPPLOGD_SPEC(this->logger, "Destroying live session: session-id={}, session-name=\"{}\"",
+                    this->id, this->session_name);
 
-    BT_CPPLOGD_SPEC(session->logger,
-                    "Destroying live session: "
-                    "session-id={}, session-name=\"{}\"",
-                    session->id, session->session_name->str);
-    if (session->id != -1ULL) {
-        if (lttng_live_session_detach(session)) {
-            if (!lttng_live_graph_is_canceled(session->lttng_live_msg_iter)) {
+    if (this->id != -1ULL) {
+        if (lttng_live_session_detach(this)) {
+            if (!lttng_live_graph_is_canceled(this->lttng_live_msg_iter)) {
                 /* Old relayd cannot detach sessions. */
-                BT_CPPLOGD_SPEC(session->logger, "Unable to detach lttng live session {}",
-                                session->id);
+                BT_CPPLOGD_SPEC(this->logger, "Unable to detach lttng live session {}", this->id);
             }
         }
-        session->id = -1ULL;
-    }
-
-    if (session->traces) {
-        g_ptr_array_free(session->traces, TRUE);
-    }
 
-    if (session->hostname) {
-        g_string_free(session->hostname, TRUE);
-    }
-
-    if (session->session_name) {
-        g_string_free(session->session_name, TRUE);
+        this->id = -1ULL;
     }
+}
 
+static void lttng_live_destroy_session(struct lttng_live_session *session)
+{
     delete session;
-
-end:
-    return;
 }
 
 static void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter)
@@ -354,7 +316,6 @@ lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
                        struct lttng_live_session *session)
 {
     enum lttng_live_iterator_status status;
-    uint64_t trace_idx;
 
     if (!session->attached) {
         BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Attach to session: session-id={}",
@@ -381,9 +342,8 @@ lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
     }
 
     BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
-                    "Updating all data streams: "
-                    "session-id={}, session-name=\"{}\"",
-                    session->id, session->session_name->str);
+                    "Updating all data streams: session-id={}, session-name=\"{}\"", session->id,
+                    session->session_name);
 
     status = lttng_live_session_get_new_streams(session, lttng_live_msg_iter->self_msg_iter);
     switch (status) {
@@ -407,7 +367,7 @@ lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
             lttng_live_msg_iter->logger,
             "Updating streams returned _END status. Override status to _OK in order fetch any remaining metadata:"
             "session-id={}, session-name=\"{}\"",
-            session->id, session->session_name->str);
+            session->id, session->session_name);
         status = LTTNG_LIVE_ITERATOR_STATUS_OK;
         break;
     default:
@@ -415,20 +375,14 @@ lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
     }
 
     BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
-                    "Updating metadata stream for session: "
-                    "session-id={}, session-name=\"{}\"",
-                    session->id, session->session_name->str);
-
-    trace_idx = 0;
-    while (trace_idx < session->traces->len) {
-        struct lttng_live_trace *trace =
-            (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
+                    "Updating metadata stream for session: session-id={}, session-name=\"{}\"",
+                    session->id, session->session_name);
 
-        status = lttng_live_metadata_update(trace);
+    for (lttng_live_trace::UP& trace : session->traces) {
+        status = lttng_live_metadata_update(trace.get());
         switch (status) {
         case LTTNG_LIVE_ITERATOR_STATUS_END:
         case LTTNG_LIVE_ITERATOR_STATUS_OK:
-            trace_idx++;
             break;
         case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
         case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
@@ -455,7 +409,7 @@ end:
 static void
 lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
 {
-    uint64_t session_idx, trace_idx;
+    uint64_t session_idx;
 
     for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
         struct lttng_live_session *session =
@@ -465,9 +419,7 @@ lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live
                         "session-id={}",
                         session->id);
         session->new_streams_needed = true;
-        for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
-            struct lttng_live_trace *trace =
-                (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
+        for (lttng_live_trace::UP& trace : session->traces) {
             BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
                             "Force marking trace metadata state as needing an update: "
                             "session-id={}, trace-id={}",
@@ -564,41 +516,32 @@ end:
 
 static enum lttng_live_iterator_status
 emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
-                        struct lttng_live_stream_iterator *stream_iter, const bt_message **message,
-                        uint64_t timestamp)
+                        struct lttng_live_stream_iterator *stream_iter,
+                        bt2::ConstMessage::Shared& message, uint64_t timestamp)
 {
-    enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
-    bt_message *msg = NULL;
-
     BT_ASSERT(stream_iter->trace->clock_class);
-
     BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
                     "Emitting inactivity message for stream: ctf-stream-id={}, "
                     "viewer-stream-id={}, timestamp={}",
                     stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id,
                     timestamp);
 
-    msg = bt_message_message_iterator_inactivity_create(lttng_live_msg_iter->self_msg_iter,
-                                                        stream_iter->trace->clock_class, timestamp);
+    const auto msg = bt_message_message_iterator_inactivity_create(
+        lttng_live_msg_iter->self_msg_iter, stream_iter->trace->clock_class, timestamp);
+
     if (!msg) {
         BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
                                      "Error emitting message iterator inactivity message");
-        goto error;
+        return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
     }
 
-    *message = msg;
-end:
-    return ret;
-
-error:
-    ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
-    bt_message_put_ref(msg);
-    goto end;
+    message = bt2::ConstMessage::Shared::createWithoutRef(msg);
+    return LTTNG_LIVE_ITERATOR_STATUS_OK;
 }
 
 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
     struct lttng_live_msg_iter *lttng_live_msg_iter,
-    struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
+    struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message)
 {
     enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
 
@@ -698,11 +641,11 @@ end:
 
 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
     struct lttng_live_msg_iter *lttng_live_msg_iter,
-    struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
+    struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message)
 {
     enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
     enum ctf_msg_iter_status status;
-    uint64_t session_idx, trace_idx;
+    uint64_t session_idx;
 
     for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
         struct lttng_live_session *session =
@@ -716,9 +659,7 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_activ
             ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
             goto end;
         }
-        for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
-            struct lttng_live_trace *trace =
-                (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
+        for (lttng_live_trace::UP& trace : session->traces) {
             if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
                 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
                                 "Need an update for metadata stream: "
@@ -739,12 +680,14 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_activ
         goto end;
     }
 
-    status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), message);
+    const bt_message *msg;
+    status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), &msg);
     switch (status) {
     case CTF_MSG_ITER_STATUS_EOF:
         ret = LTTNG_LIVE_ITERATOR_STATUS_END;
         break;
     case CTF_MSG_ITER_STATUS_OK:
+        message = bt2::ConstMessage::Shared::createWithoutRef(msg);
         ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
         break;
     case CTF_MSG_ITER_STATUS_AGAIN:
@@ -772,7 +715,7 @@ end:
 static enum lttng_live_iterator_status
 lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
                                  struct lttng_live_stream_iterator *stream_iter,
-                                 const bt_message **curr_msg)
+                                 bt2::ConstMessage::Shared& curr_msg)
 {
     enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
 
@@ -786,8 +729,9 @@ lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter
      * `ctf_msg_iter` should simply realize that it needs to close the
      * stream properly by emitting the necessary stream end message.
      */
+    const bt_message *msg;
     enum ctf_msg_iter_status status =
-        ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), curr_msg);
+        ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), &msg);
 
     if (status == CTF_MSG_ITER_STATUS_ERROR) {
         BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
@@ -803,6 +747,8 @@ lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter
 
     BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
 
+    curr_msg = bt2::ConstMessage::Shared::createWithoutRef(msg);
+
 end:
     return live_status;
 }
@@ -858,7 +804,7 @@ end:
 static enum lttng_live_iterator_status
 lttng_live_iterator_next_msg_on_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
                                        struct lttng_live_stream_iterator *stream_iter,
-                                       const bt_message **curr_msg)
+                                       bt2::ConstMessage::Shared& curr_msg)
 {
     enum lttng_live_iterator_status live_status;
 
@@ -907,16 +853,16 @@ retry:
     live_status = lttng_live_iterator_next_handle_one_quiescent_stream(lttng_live_msg_iter,
                                                                        stream_iter, curr_msg);
     if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
-        BT_ASSERT(!*curr_msg);
+        BT_ASSERT(!curr_msg);
         goto end;
     }
-    if (*curr_msg) {
+    if (curr_msg) {
         goto end;
     }
     live_status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live_msg_iter,
                                                                          stream_iter, curr_msg);
     if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
-        BT_ASSERT(!*curr_msg);
+        BT_ASSERT(!curr_msg);
     }
 
 end:
@@ -935,20 +881,17 @@ end:
     return live_status;
 }
 
-static bool is_discarded_packet_or_event_message(const bt_message *msg)
+static bool is_discarded_packet_or_event_message(const bt2::ConstMessage msg)
 {
-    const enum bt_message_type msg_type = bt_message_get_type(msg);
-
-    return msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS ||
-           msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS;
+    return msg.type() == bt2::MessageType::DiscardedEvents ||
+           msg.type() == bt2::MessageType::DiscardedPackets;
 }
 
 static enum lttng_live_iterator_status
 adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream,
-                                 const bt_message *msg_in, bt_message **msg_out,
+                                 const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out,
                                  uint64_t new_begin_ts)
 {
-    enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
     enum bt_property_availability availability;
     const bt_clock_snapshot *clock_snapshot;
     uint64_t end_ts;
@@ -960,24 +903,23 @@ adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream
     availability = bt_message_discarded_packets_get_count(msg_in, &count);
     BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
 
-    *msg_out = bt_message_discarded_packets_create_with_default_clock_snapshots(
+    const auto msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
         iter, stream, new_begin_ts, end_ts);
-    if (!*msg_out) {
-        status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
-        goto end;
+
+    if (!msg) {
+        return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
     }
 
-    bt_message_discarded_packets_set_count(*msg_out, count);
-end:
-    return status;
+    bt_message_discarded_packets_set_count(msg, count);
+    msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
+    return LTTNG_LIVE_ITERATOR_STATUS_OK;
 }
 
 static enum lttng_live_iterator_status
 adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream,
-                                const bt_message *msg_in, bt_message **msg_out,
+                                const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out,
                                 uint64_t new_begin_ts)
 {
-    enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
     enum bt_property_availability availability;
     const bt_clock_snapshot *clock_snapshot;
     uint64_t end_ts;
@@ -989,22 +931,22 @@ adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream
     availability = bt_message_discarded_events_get_count(msg_in, &count);
     BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
 
-    *msg_out = bt_message_discarded_events_create_with_default_clock_snapshots(
+    const auto msg = bt_message_discarded_events_create_with_default_clock_snapshots(
         iter, stream, new_begin_ts, end_ts);
-    if (!*msg_out) {
-        status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
-        goto end;
+
+    if (!msg) {
+        return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
     }
 
-    bt_message_discarded_events_set_count(*msg_out, count);
-end:
-    return status;
+    bt_message_discarded_events_set_count(msg, count);
+    msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
+    return LTTNG_LIVE_ITERATOR_STATUS_OK;
 }
 
 static enum lttng_live_iterator_status
 handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
                     struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns,
-                    const bt_message *late_msg)
+                    const bt2::ConstMessage& late_msg)
 {
     const bt_clock_class *clock_class;
     const bt_stream_class *stream_class;
@@ -1012,7 +954,7 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
     int64_t last_inactivity_ts_ns;
     enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
     enum lttng_live_iterator_status adjust_status;
-    bt_message *adjusted_message;
+    bt2::ConstMessage::Shared adjusted_message;
 
     /*
      * The timestamp of the current message is before the last message sent
@@ -1062,7 +1004,7 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
                                      "Invalid live stream state: "
                                      "have a late message that is not a packet discarded or "
                                      "event discarded message: late-msg-type={}",
-                                     static_cast<bt2::MessageType>(bt_message_get_type(late_msg)));
+                                     late_msg.type());
         stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
         goto end;
     }
@@ -1098,18 +1040,17 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
     BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
                     "Adjusting the timestamp of late message: late-msg-type={}, "
                     "msg-new-ts-ns={}",
-                    static_cast<bt2::MessageType>(bt_message_get_type(late_msg)),
-                    stream_iter->last_inactivity_ts.value);
-    switch (bt_message_get_type(late_msg)) {
-    case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
+                    late_msg.type(), stream_iter->last_inactivity_ts.value);
+    switch (late_msg.type()) {
+    case bt2::MessageType::DiscardedEvents:
         adjust_status = adjust_discarded_events_message(
-            lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(), late_msg,
-            &adjusted_message, stream_iter->last_inactivity_ts.value);
+            lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(),
+            late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
         break;
-    case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
+    case bt2::MessageType::DiscardedPackets:
         adjust_status = adjust_discarded_packets_message(
-            lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(), late_msg,
-            &adjusted_message, stream_iter->last_inactivity_ts.value);
+            lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(),
+            late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
         break;
     default:
         bt_common_abort();
@@ -1123,7 +1064,6 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
     BT_ASSERT_DBG(adjusted_message);
     stream_iter->current_msg = adjusted_message;
     stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
-    bt_message_put_ref(late_msg);
 
 end:
     return stream_iter_status;
@@ -1140,7 +1080,6 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
     uint64_t stream_iter_idx;
 
     BT_ASSERT_DBG(live_trace);
-    BT_ASSERT_DBG(live_trace->stream_iterators);
 
     BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
                     "Finding the next stream iterator for trace: "
@@ -1153,22 +1092,21 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
      * ensure monotonicity.
      */
     stream_iter_idx = 0;
-    while (stream_iter_idx < live_trace->stream_iterators->len) {
+    while (stream_iter_idx < live_trace->stream_iterators.size()) {
         bool stream_iter_is_ended = false;
-        struct lttng_live_stream_iterator *stream_iter =
-            (lttng_live_stream_iterator *) g_ptr_array_index(live_trace->stream_iterators,
-                                                             stream_iter_idx);
+        lttng_live_stream_iterator *stream_iter =
+            live_trace->stream_iterators[stream_iter_idx].get();
 
         /*
          * If there is no current message for this stream, go fetch
          * one.
          */
         while (!stream_iter->current_msg) {
-            const bt_message *msg = NULL;
+            bt2::ConstMessage::Shared msg;
             int64_t curr_msg_ts_ns = INT64_MAX;
 
             stream_iter_status =
-                lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, &msg);
+                lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, msg);
 
             if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
                 stream_iter_is_ended = true;
@@ -1184,15 +1122,14 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
             BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
                             "Live stream iterator returned message: msg-type={}, "
                             "stream-name=\"{}\", viewer-stream-id={}",
-                            static_cast<bt2::MessageType>(bt_message_get_type(msg)),
-                            stream_iter->name, stream_iter->viewer_stream_id);
+                            msg->type(), stream_iter->name, stream_iter->viewer_stream_id);
 
             /*
              * Get the timestamp in nanoseconds from origin of this
              * message.
              */
-            live_get_msg_ts_ns(lttng_live_msg_iter, msg, lttng_live_msg_iter->last_msg_ts_ns,
-                               &curr_msg_ts_ns);
+            live_get_msg_ts_ns(lttng_live_msg_iter, msg->libObjPtr(),
+                               lttng_live_msg_iter->last_msg_ts_ns, &curr_msg_ts_ns);
 
             /*
              * Check if the message of the current live stream
@@ -1201,7 +1138,7 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
              * iterator. If not, we need to handle it with care.
              */
             if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
-                stream_iter->current_msg = msg;
+                stream_iter->current_msg = std::move(msg);
                 stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
             } else {
                 /*
@@ -1209,7 +1146,7 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
                  * may be fixable but it can also be an error.
                  */
                 stream_iter_status =
-                    handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, msg);
+                    handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, *msg);
                 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
                     BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
                                                  "Late message could not be handled correctly: "
@@ -1244,7 +1181,8 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
                  */
                 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
                 int ret = common_muxing_compare_messages(
-                    stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
+                    stream_iter->current_msg->libObjPtr(),
+                    youngest_candidate_stream_iter->current_msg->libObjPtr());
                 if (ret < 0) {
                     /*
                      * The `youngest_candidate_stream_iter->current_msg`
@@ -1278,7 +1216,7 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
              * removed element with the array's last
              * element.
              */
-            g_ptr_array_remove_index_fast(live_trace->stream_iterators, stream_iter_idx);
+            bt2c::vectorFastRemove(live_trace->stream_iterators, stream_iter_idx);
         }
     }
 
@@ -1290,7 +1228,7 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
          * The only case where we don't have a candidate for this trace
          * is if we reached the end of all the iterators.
          */
-        BT_ASSERT(live_trace->stream_iterators->len == 0);
+        BT_ASSERT(live_trace->stream_iterators.empty());
         stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
     }
 
@@ -1323,13 +1261,10 @@ next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter
         goto end;
     }
 
-    BT_ASSERT_DBG(session->traces);
-
-    while (trace_idx < session->traces->len) {
+    while (trace_idx < session->traces.size()) {
         bool trace_is_ended = false;
         struct lttng_live_stream_iterator *stream_iter;
-        struct lttng_live_trace *trace =
-            (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
+        lttng_live_trace *trace = session->traces[trace_idx].get();
 
         stream_iter_status =
             next_stream_iterator_for_trace(lttng_live_msg_iter, trace, &stream_iter);
@@ -1356,7 +1291,8 @@ next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter
                  * deterministic way.
                  */
                 int ret = common_muxing_compare_messages(
-                    stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
+                    stream_iter->current_msg->libObjPtr(),
+                    youngest_candidate_stream_iter->current_msg->libObjPtr());
                 if (ret < 0) {
                     /*
                      * The `youngest_candidate_stream_iter->current_msg`
@@ -1379,10 +1315,10 @@ next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter
         } else {
             /*
              * trace_idx is not incremented since
-             * g_ptr_array_remove_index_fast replaces the
+             * vectorFastRemove replaces the
              * element at trace_idx with the array's last element.
              */
-            g_ptr_array_remove_index_fast(session->traces, trace_idx);
+            bt2c::vectorFastRemove(session->traces, trace_idx);
         }
     }
     if (youngest_candidate_stream_iter) {
@@ -1398,7 +1334,7 @@ next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter
          *
          * In either cases, we return END.
          */
-        BT_ASSERT(session->traces->len == 0);
+        BT_ASSERT(session->traces.empty());
         stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
     }
 end:
@@ -1528,7 +1464,7 @@ lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array
                  * - All live stream iterators have ENDed.
                  */
                 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
-                    if (session->closed && session->traces->len == 0) {
+                    if (session->closed && session->traces.empty()) {
                         /*
                          * Remove the session from the list.
                          * session_idx is not modified since
@@ -1565,8 +1501,9 @@ lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array
                      * Order the messages in an arbitrary but
                      * deterministic way.
                      */
-                    int ret = common_muxing_compare_messages(candidate_stream_iter->current_msg,
-                                                             youngest_stream_iter->current_msg);
+                    int ret = common_muxing_compare_messages(
+                        candidate_stream_iter->current_msg->libObjPtr(),
+                        youngest_stream_iter->current_msg->libObjPtr());
                     if (ret < 0) {
                         /*
                          * The `candidate_stream_iter->current_msg`
@@ -1604,7 +1541,7 @@ lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array
              * stream iterator current message to NULL so that next time
              * we fetch the next message of that stream iterator
              */
-            BT_MESSAGE_MOVE_REF(msgs[*count], youngest_stream_iter->current_msg);
+            msgs[*count] = youngest_stream_iter->current_msg.release().libObjPtr();
             (*count)++;
 
             /* Update the last timestamp in nanoseconds sent downstream. */
This page took 0.03267 seconds and 4 git commands to generate.