src.ctf.lttng-live: make lttng_live_stream_iterator::current_msg a bt2::Message:...
authorSimon Marchi <simon.marchi@efficios.com>
Wed, 10 Apr 2024 14:58:05 +0000 (10:58 -0400)
committerSimon Marchi <simon.marchi@efficios.com>
Wed, 17 Apr 2024 17:57:53 +0000 (13:57 -0400)
Adjust a bunch of related functions involved in creating the message
that ends up there.

Signed-off-by: Simon Marchi <simon.marchi@efficios.com>
Change-Id: I2dabd3b3e6d0d17ee2a23774863b160505a0b830
Reviewed-on: https://review.lttng.org/c/babeltrace/+/8458
Reviewed-by: Philippe Proulx <eeppeliteloop@gmail.com>
Reviewed-on: https://review.lttng.org/c/babeltrace/+/12374
Tested-by: jenkins <jenkins@lttng.org>
src/plugins/ctf/lttng-live/data-stream.cpp
src/plugins/ctf/lttng-live/lttng-live.cpp
src/plugins/ctf/lttng-live/lttng-live.hpp

index 4cfc7e0e4f3f91a5226f3b2efc36d0798c7c82dd..cf9771ae30c75fd8932d56766af81b144942336b 100644 (file)
@@ -232,8 +232,6 @@ void lttng_live_stream_iterator_destroy(struct lttng_live_stream_iterator *strea
         return;
     }
 
-    bt_message_put_ref(stream_iter->current_msg);
-
     /* Track the number of active stream iterator. */
     stream_iter->trace->session->lttng_live_msg_iter->active_stream_iter--;
 
index dfeb9ae830fef1396a5607c5b5259d2715107fcf..3f74c408decf1d0bdbc6e85b42dc36a33ca83198 100644 (file)
@@ -564,41 +564,32 @@ end:
 
 static enum lttng_live_iterator_status
 emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
-                        struct lttng_live_stream_iterator *stream_iter, const bt_message **message,
-                        uint64_t timestamp)
+                        struct lttng_live_stream_iterator *stream_iter,
+                        bt2::ConstMessage::Shared& message, uint64_t timestamp)
 {
-    enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
-    bt_message *msg = NULL;
-
     BT_ASSERT(stream_iter->trace->clock_class);
-
     BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
                     "Emitting inactivity message for stream: ctf-stream-id={}, "
                     "viewer-stream-id={}, timestamp={}",
                     stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id,
                     timestamp);
 
-    msg = bt_message_message_iterator_inactivity_create(lttng_live_msg_iter->self_msg_iter,
-                                                        stream_iter->trace->clock_class, timestamp);
+    const auto msg = bt_message_message_iterator_inactivity_create(
+        lttng_live_msg_iter->self_msg_iter, stream_iter->trace->clock_class, timestamp);
+
     if (!msg) {
         BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
                                      "Error emitting message iterator inactivity message");
-        goto error;
+        return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
     }
 
-    *message = msg;
-end:
-    return ret;
-
-error:
-    ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
-    bt_message_put_ref(msg);
-    goto end;
+    message = bt2::ConstMessage::Shared::createWithoutRef(msg);
+    return LTTNG_LIVE_ITERATOR_STATUS_OK;
 }
 
 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
     struct lttng_live_msg_iter *lttng_live_msg_iter,
-    struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
+    struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message)
 {
     enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
 
@@ -698,7 +689,7 @@ end:
 
 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
     struct lttng_live_msg_iter *lttng_live_msg_iter,
-    struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
+    struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message)
 {
     enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
     enum ctf_msg_iter_status status;
@@ -739,12 +730,14 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_activ
         goto end;
     }
 
-    status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), message);
+    const bt_message *msg;
+    status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), &msg);
     switch (status) {
     case CTF_MSG_ITER_STATUS_EOF:
         ret = LTTNG_LIVE_ITERATOR_STATUS_END;
         break;
     case CTF_MSG_ITER_STATUS_OK:
+        message = bt2::ConstMessage::Shared::createWithoutRef(msg);
         ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
         break;
     case CTF_MSG_ITER_STATUS_AGAIN:
@@ -772,7 +765,7 @@ end:
 static enum lttng_live_iterator_status
 lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
                                  struct lttng_live_stream_iterator *stream_iter,
-                                 const bt_message **curr_msg)
+                                 bt2::ConstMessage::Shared& curr_msg)
 {
     enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
 
@@ -786,8 +779,9 @@ lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter
      * `ctf_msg_iter` should simply realize that it needs to close the
      * stream properly by emitting the necessary stream end message.
      */
+    const bt_message *msg;
     enum ctf_msg_iter_status status =
-        ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), curr_msg);
+        ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), &msg);
 
     if (status == CTF_MSG_ITER_STATUS_ERROR) {
         BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
@@ -803,6 +797,8 @@ lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter
 
     BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
 
+    curr_msg = bt2::ConstMessage::Shared::createWithoutRef(msg);
+
 end:
     return live_status;
 }
@@ -858,7 +854,7 @@ end:
 static enum lttng_live_iterator_status
 lttng_live_iterator_next_msg_on_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
                                        struct lttng_live_stream_iterator *stream_iter,
-                                       const bt_message **curr_msg)
+                                       bt2::ConstMessage::Shared& curr_msg)
 {
     enum lttng_live_iterator_status live_status;
 
@@ -907,16 +903,16 @@ retry:
     live_status = lttng_live_iterator_next_handle_one_quiescent_stream(lttng_live_msg_iter,
                                                                        stream_iter, curr_msg);
     if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
-        BT_ASSERT(!*curr_msg);
+        BT_ASSERT(!curr_msg);
         goto end;
     }
-    if (*curr_msg) {
+    if (curr_msg) {
         goto end;
     }
     live_status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live_msg_iter,
                                                                          stream_iter, curr_msg);
     if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
-        BT_ASSERT(!*curr_msg);
+        BT_ASSERT(!curr_msg);
     }
 
 end:
@@ -935,20 +931,17 @@ end:
     return live_status;
 }
 
-static bool is_discarded_packet_or_event_message(const bt_message *msg)
+static bool is_discarded_packet_or_event_message(const bt2::ConstMessage msg)
 {
-    const enum bt_message_type msg_type = bt_message_get_type(msg);
-
-    return msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS ||
-           msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS;
+    return msg.type() == bt2::MessageType::DiscardedEvents ||
+           msg.type() == bt2::MessageType::DiscardedPackets;
 }
 
 static enum lttng_live_iterator_status
 adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream,
-                                 const bt_message *msg_in, bt_message **msg_out,
+                                 const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out,
                                  uint64_t new_begin_ts)
 {
-    enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
     enum bt_property_availability availability;
     const bt_clock_snapshot *clock_snapshot;
     uint64_t end_ts;
@@ -960,24 +953,23 @@ adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream
     availability = bt_message_discarded_packets_get_count(msg_in, &count);
     BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
 
-    *msg_out = bt_message_discarded_packets_create_with_default_clock_snapshots(
+    const auto msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
         iter, stream, new_begin_ts, end_ts);
-    if (!*msg_out) {
-        status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
-        goto end;
+
+    if (!msg) {
+        return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
     }
 
-    bt_message_discarded_packets_set_count(*msg_out, count);
-end:
-    return status;
+    bt_message_discarded_packets_set_count(msg, count);
+    msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
+    return LTTNG_LIVE_ITERATOR_STATUS_OK;
 }
 
 static enum lttng_live_iterator_status
 adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream,
-                                const bt_message *msg_in, bt_message **msg_out,
+                                const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out,
                                 uint64_t new_begin_ts)
 {
-    enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
     enum bt_property_availability availability;
     const bt_clock_snapshot *clock_snapshot;
     uint64_t end_ts;
@@ -989,22 +981,22 @@ adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream
     availability = bt_message_discarded_events_get_count(msg_in, &count);
     BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
 
-    *msg_out = bt_message_discarded_events_create_with_default_clock_snapshots(
+    const auto msg = bt_message_discarded_events_create_with_default_clock_snapshots(
         iter, stream, new_begin_ts, end_ts);
-    if (!*msg_out) {
-        status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
-        goto end;
+
+    if (!msg) {
+        return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
     }
 
-    bt_message_discarded_events_set_count(*msg_out, count);
-end:
-    return status;
+    bt_message_discarded_events_set_count(msg, count);
+    msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
+    return LTTNG_LIVE_ITERATOR_STATUS_OK;
 }
 
 static enum lttng_live_iterator_status
 handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
                     struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns,
-                    const bt_message *late_msg)
+                    const bt2::ConstMessage& late_msg)
 {
     const bt_clock_class *clock_class;
     const bt_stream_class *stream_class;
@@ -1012,7 +1004,7 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
     int64_t last_inactivity_ts_ns;
     enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
     enum lttng_live_iterator_status adjust_status;
-    bt_message *adjusted_message;
+    bt2::ConstMessage::Shared adjusted_message;
 
     /*
      * The timestamp of the current message is before the last message sent
@@ -1062,7 +1054,7 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
                                      "Invalid live stream state: "
                                      "have a late message that is not a packet discarded or "
                                      "event discarded message: late-msg-type={}",
-                                     static_cast<bt2::MessageType>(bt_message_get_type(late_msg)));
+                                     late_msg.type());
         stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
         goto end;
     }
@@ -1098,18 +1090,17 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
     BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
                     "Adjusting the timestamp of late message: late-msg-type={}, "
                     "msg-new-ts-ns={}",
-                    static_cast<bt2::MessageType>(bt_message_get_type(late_msg)),
-                    stream_iter->last_inactivity_ts.value);
-    switch (bt_message_get_type(late_msg)) {
-    case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
+                    late_msg.type(), stream_iter->last_inactivity_ts.value);
+    switch (late_msg.type()) {
+    case bt2::MessageType::DiscardedEvents:
         adjust_status = adjust_discarded_events_message(
-            lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(), late_msg,
-            &adjusted_message, stream_iter->last_inactivity_ts.value);
+            lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(),
+            late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
         break;
-    case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
+    case bt2::MessageType::DiscardedPackets:
         adjust_status = adjust_discarded_packets_message(
-            lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(), late_msg,
-            &adjusted_message, stream_iter->last_inactivity_ts.value);
+            lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(),
+            late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
         break;
     default:
         bt_common_abort();
@@ -1123,7 +1114,6 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
     BT_ASSERT_DBG(adjusted_message);
     stream_iter->current_msg = adjusted_message;
     stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
-    bt_message_put_ref(late_msg);
 
 end:
     return stream_iter_status;
@@ -1164,11 +1154,11 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
          * one.
          */
         while (!stream_iter->current_msg) {
-            const bt_message *msg = NULL;
+            bt2::ConstMessage::Shared msg;
             int64_t curr_msg_ts_ns = INT64_MAX;
 
             stream_iter_status =
-                lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, &msg);
+                lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, msg);
 
             if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
                 stream_iter_is_ended = true;
@@ -1184,15 +1174,14 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
             BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
                             "Live stream iterator returned message: msg-type={}, "
                             "stream-name=\"{}\", viewer-stream-id={}",
-                            static_cast<bt2::MessageType>(bt_message_get_type(msg)),
-                            stream_iter->name, stream_iter->viewer_stream_id);
+                            msg->type(), stream_iter->name, stream_iter->viewer_stream_id);
 
             /*
              * Get the timestamp in nanoseconds from origin of this
              * message.
              */
-            live_get_msg_ts_ns(lttng_live_msg_iter, msg, lttng_live_msg_iter->last_msg_ts_ns,
-                               &curr_msg_ts_ns);
+            live_get_msg_ts_ns(lttng_live_msg_iter, msg->libObjPtr(),
+                               lttng_live_msg_iter->last_msg_ts_ns, &curr_msg_ts_ns);
 
             /*
              * Check if the message of the current live stream
@@ -1201,7 +1190,7 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
              * iterator. If not, we need to handle it with care.
              */
             if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
-                stream_iter->current_msg = msg;
+                stream_iter->current_msg = std::move(msg);
                 stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
             } else {
                 /*
@@ -1209,7 +1198,7 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
                  * may be fixable but it can also be an error.
                  */
                 stream_iter_status =
-                    handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, msg);
+                    handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, *msg);
                 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
                     BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
                                                  "Late message could not be handled correctly: "
@@ -1244,7 +1233,8 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
                  */
                 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
                 int ret = common_muxing_compare_messages(
-                    stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
+                    stream_iter->current_msg->libObjPtr(),
+                    youngest_candidate_stream_iter->current_msg->libObjPtr());
                 if (ret < 0) {
                     /*
                      * The `youngest_candidate_stream_iter->current_msg`
@@ -1356,7 +1346,8 @@ next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter
                  * deterministic way.
                  */
                 int ret = common_muxing_compare_messages(
-                    stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
+                    stream_iter->current_msg->libObjPtr(),
+                    youngest_candidate_stream_iter->current_msg->libObjPtr());
                 if (ret < 0) {
                     /*
                      * The `youngest_candidate_stream_iter->current_msg`
@@ -1565,8 +1556,9 @@ lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array
                      * Order the messages in an arbitrary but
                      * deterministic way.
                      */
-                    int ret = common_muxing_compare_messages(candidate_stream_iter->current_msg,
-                                                             youngest_stream_iter->current_msg);
+                    int ret = common_muxing_compare_messages(
+                        candidate_stream_iter->current_msg->libObjPtr(),
+                        youngest_stream_iter->current_msg->libObjPtr());
                     if (ret < 0) {
                         /*
                          * The `candidate_stream_iter->current_msg`
@@ -1604,7 +1596,7 @@ lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array
              * stream iterator current message to NULL so that next time
              * we fetch the next message of that stream iterator
              */
-            BT_MESSAGE_MOVE_REF(msgs[*count], youngest_stream_iter->current_msg);
+            msgs[*count] = youngest_stream_iter->current_msg.release().libObjPtr();
             (*count)++;
 
             /* Update the last timestamp in nanoseconds sent downstream. */
index a4974f34642ce464a09ed921c38bb3a5a4738f74..56c86d39a679cadbe995e585800588c5ea1efa0e 100644 (file)
@@ -16,6 +16,7 @@
 
 #include <babeltrace2/babeltrace.h>
 
+#include "cpp-common/bt2/message.hpp"
 #include "cpp-common/vendor/fmt/format.h" /* IWYU pragma: keep */
 
 #include "../common/src/metadata/tsdl/decoder.hpp"
@@ -118,11 +119,8 @@ struct lttng_live_stream_iterator
 
     enum lttng_live_stream_state state = LTTNG_LIVE_STREAM_QUIESCENT;
 
-    /*
-     * The current message produced by this live stream iterator. Owned by
-     * this.
-     */
-    const bt_message *current_msg = nullptr;
+    /* The current message produced by this live stream iterator. */
+    bt2::ConstMessage::Shared current_msg;
 
     /* Timestamp in nanoseconds of the current message (current_msg). */
     int64_t current_msg_ts_ns = 0;
This page took 0.031443 seconds and 4 git commands to generate.