src.ctf.lttng-live: make lttng_live_stream_iterator::current_msg a bt2::Message:...
authorSimon Marchi <simon.marchi@efficios.com>
Tue, 26 Jul 2022 21:39:18 +0000 (17:39 -0400)
committerSimon Marchi <simon.marchi@efficios.com>
Tue, 23 Aug 2022 16:06:16 +0000 (12:06 -0400)
Wrapped in an optional, because it is not set at construction, and its
value changes over time.  Adjust a bunch of related functions involved
in creating the message that ends up there.

Signed-off-by: Simon Marchi <simon.marchi@efficios.com>
Change-Id: I2dabd3b3e6d0d17ee2a23774863b160505a0b830
Reviewed-on: https://review.lttng.org/c/babeltrace/+/8458
Reviewed-by: Philippe Proulx <eeppeliteloop@gmail.com>
src/plugins/ctf/lttng-live/data-stream.cpp
src/plugins/ctf/lttng-live/lttng-live.cpp
src/plugins/ctf/lttng-live/lttng-live.hpp

index 4ad282942355009eecee57c5383db538cc90f687..ab0de6047723f8a7e8bed368ad371c44f6120268 100644 (file)
@@ -238,8 +238,6 @@ void lttng_live_stream_iterator_destroy(struct lttng_live_stream_iterator *strea
         return;
     }
 
-    bt_message_put_ref(stream_iter->current_msg);
-
     /* Track the number of active stream iterator. */
     stream_iter->trace->session->lttng_live_msg_iter->active_stream_iter--;
 
index 615e4c2e4e2467fc2b825b1890bcbbf05e5b7f55..98451576fac0e259b1d39d0ca67dfa8221996397 100644 (file)
@@ -621,8 +621,8 @@ end:
 
 static enum lttng_live_iterator_status
 emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
-                        struct lttng_live_stream_iterator *stream_iter, const bt_message **message,
-                        uint64_t timestamp)
+                        struct lttng_live_stream_iterator *stream_iter,
+                        nonstd::optional<bt2::ConstMessage::Shared>& message, uint64_t timestamp)
 {
     enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
     const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
@@ -641,19 +641,20 @@ emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
         goto error;
     }
 
-    *message = msg;
+    message = bt2::ConstMessage::Shared::createWithoutRef(msg);
+
 end:
     return ret;
 
 error:
     ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
-    bt_message_put_ref(msg);
     goto end;
 }
 
 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
     struct lttng_live_msg_iter *lttng_live_msg_iter,
-    struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
+    struct lttng_live_stream_iterator *lttng_live_stream,
+    nonstd::optional<bt2::ConstMessage::Shared>& message)
 {
     enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
 
@@ -752,7 +753,8 @@ end:
 
 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
     struct lttng_live_msg_iter *lttng_live_msg_iter,
-    struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
+    struct lttng_live_stream_iterator *lttng_live_stream,
+    nonstd::optional<bt2::ConstMessage::Shared>& message)
 {
     enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
     const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
@@ -791,12 +793,14 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_activ
         goto end;
     }
 
-    status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), message);
+    const bt_message *msg;
+    status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), &msg);
     switch (status) {
     case CTF_MSG_ITER_STATUS_EOF:
         ret = LTTNG_LIVE_ITERATOR_STATUS_END;
         break;
     case CTF_MSG_ITER_STATUS_OK:
+        message = bt2::ConstMessage::Shared::createWithoutRef(msg);
         ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
         break;
     case CTF_MSG_ITER_STATUS_AGAIN:
@@ -824,7 +828,7 @@ end:
 static enum lttng_live_iterator_status
 lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
                                  struct lttng_live_stream_iterator *stream_iter,
-                                 const bt_message **curr_msg)
+                                 nonstd::optional<bt2::ConstMessage::Shared>& curr_msg)
 {
     enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
     const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
@@ -838,8 +842,9 @@ lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter
      * `ctf_msg_iter` should simply realize that it needs to close the
      * stream properly by emitting the necessary stream end message.
      */
+    const bt_message *msg;
     enum ctf_msg_iter_status status =
-        ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), curr_msg);
+        ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), &msg);
 
     if (status == CTF_MSG_ITER_STATUS_ERROR) {
         BT_CLOGE_APPEND_CAUSE("Error getting the next message from CTF message iterator");
@@ -853,6 +858,8 @@ lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter
 
     BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
 
+    curr_msg = bt2::ConstMessage::Shared::createWithoutRef(msg);
+
 end:
     return live_status;
 }
@@ -908,7 +915,7 @@ end:
 static enum lttng_live_iterator_status
 lttng_live_iterator_next_msg_on_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
                                        struct lttng_live_stream_iterator *stream_iter,
-                                       const bt_message **curr_msg)
+                                       nonstd::optional<bt2::ConstMessage::Shared>& curr_msg)
 {
     const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
     enum lttng_live_iterator_status live_status;
@@ -957,16 +964,16 @@ retry:
     live_status = lttng_live_iterator_next_handle_one_quiescent_stream(lttng_live_msg_iter,
                                                                        stream_iter, curr_msg);
     if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
-        BT_ASSERT(!*curr_msg);
+        BT_ASSERT(!curr_msg);
         goto end;
     }
-    if (*curr_msg) {
+    if (curr_msg) {
         goto end;
     }
     live_status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live_msg_iter,
                                                                          stream_iter, curr_msg);
     if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
-        BT_ASSERT(!*curr_msg);
+        BT_ASSERT(!curr_msg);
     }
 
 end:
@@ -983,18 +990,17 @@ end:
     return live_status;
 }
 
-static bool is_discarded_packet_or_event_message(const bt_message *msg)
+static bool is_discarded_packet_or_event_message(bt2::ConstMessage msg)
 {
-    const enum bt_message_type msg_type = bt_message_get_type(msg);
+    const bt2::MessageType type = msg.type();
 
-    return msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS ||
-           msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS;
+    return type == bt2::MessageType::DISCARDED_EVENTS ||
+           type == bt2::MessageType::DISCARDED_PACKETS;
 }
 
-static enum lttng_live_iterator_status
-adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream,
-                                 const bt_message *msg_in, bt_message **msg_out,
-                                 uint64_t new_begin_ts)
+static enum lttng_live_iterator_status adjust_discarded_packets_message(
+    bt_self_message_iterator *iter, const bt_stream *stream, const bt_message *msg_in,
+    nonstd::optional<bt2::ConstMessage::Shared>& msg_out, uint64_t new_begin_ts)
 {
     enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
     enum bt_property_availability availability;
@@ -1008,22 +1014,23 @@ adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream
     availability = bt_message_discarded_packets_get_count(msg_in, &count);
     BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
 
-    *msg_out = bt_message_discarded_packets_create_with_default_clock_snapshots(
+    bt_message *msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
         iter, stream, new_begin_ts, end_ts);
-    if (!*msg_out) {
+    if (!msg) {
         status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
         goto end;
     }
 
-    bt_message_discarded_packets_set_count(*msg_out, count);
+    bt_message_discarded_packets_set_count(msg, count);
+    msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
+
 end:
     return status;
 }
 
-static enum lttng_live_iterator_status
-adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream,
-                                const bt_message *msg_in, bt_message **msg_out,
-                                uint64_t new_begin_ts)
+static enum lttng_live_iterator_status adjust_discarded_events_message(
+    bt_self_message_iterator *iter, const bt_stream *stream, const bt_message *msg_in,
+    nonstd::optional<bt2::ConstMessage::Shared>& msg_out, uint64_t new_begin_ts)
 {
     enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
     enum bt_property_availability availability;
@@ -1037,14 +1044,16 @@ adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream
     availability = bt_message_discarded_events_get_count(msg_in, &count);
     BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
 
-    *msg_out = bt_message_discarded_events_create_with_default_clock_snapshots(
+    bt_message *msg = bt_message_discarded_events_create_with_default_clock_snapshots(
         iter, stream, new_begin_ts, end_ts);
-    if (!*msg_out) {
+    if (!msg) {
         status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
         goto end;
     }
 
-    bt_message_discarded_events_set_count(*msg_out, count);
+    bt_message_discarded_events_set_count(msg, count);
+    msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
+
 end:
     return status;
 }
@@ -1052,7 +1061,7 @@ end:
 static enum lttng_live_iterator_status
 handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
                     struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns,
-                    const bt_message *late_msg)
+                    bt2::ConstMessage::Shared late_msg)
 {
     const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
     const bt_clock_class *clock_class;
@@ -1061,7 +1070,7 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
     int64_t last_inactivity_ts_ns;
     enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
     enum lttng_live_iterator_status adjust_status;
-    bt_message *adjusted_message;
+    nonstd::optional<bt2::ConstMessage::Shared> adjusted_message;
 
     /*
      * The timestamp of the current message is before the last message sent
@@ -1104,11 +1113,12 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
         goto end;
     }
 
-    if (!is_discarded_packet_or_event_message(late_msg)) {
-        BT_CLOGE_APPEND_CAUSE("Invalid live stream state: "
-                              "have a late message that is not a packet discarded or "
-                              "event discarded message: late-msg-type=%s",
-                              bt_common_message_type_string(bt_message_get_type(late_msg)));
+    if (!is_discarded_packet_or_event_message(*late_msg)) {
+        BT_CLOGE_APPEND_CAUSE(
+            "Invalid live stream state: "
+            "have a late message that is not a packet discarded or "
+            "event discarded message: late-msg-type=%s",
+            bt_common_message_type_string(bt_message_get_type(late_msg->libObjPtr())));
         stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
         goto end;
     }
@@ -1141,18 +1151,18 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
      */
     BT_CLOGD("Adjusting the timestamp of late message: late-msg-type=%s, "
              "msg-new-ts-ns=%" PRIu64,
-             bt_common_message_type_string(bt_message_get_type(late_msg)),
+             bt_common_message_type_string(bt_message_get_type(late_msg->libObjPtr())),
              stream_iter->last_inactivity_ts.value);
-    switch (bt_message_get_type(late_msg)) {
-    case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
+    switch (late_msg->type()) {
+    case bt2::MessageType::DISCARDED_EVENTS:
         adjust_status = adjust_discarded_events_message(
-            lttng_live_msg_iter->self_msg_iter, (*stream_iter->stream)->libObjPtr(), late_msg,
-            &adjusted_message, stream_iter->last_inactivity_ts.value);
+            lttng_live_msg_iter->self_msg_iter, (*stream_iter->stream)->libObjPtr(),
+            late_msg->libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
         break;
-    case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
+    case bt2::MessageType::DISCARDED_PACKETS:
         adjust_status = adjust_discarded_packets_message(
-            lttng_live_msg_iter->self_msg_iter, (*stream_iter->stream)->libObjPtr(), late_msg,
-            &adjusted_message, stream_iter->last_inactivity_ts.value);
+            lttng_live_msg_iter->self_msg_iter, (*stream_iter->stream)->libObjPtr(),
+            late_msg->libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
         break;
     default:
         bt_common_abort();
@@ -1166,7 +1176,6 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
     BT_ASSERT_DBG(adjusted_message);
     stream_iter->current_msg = adjusted_message;
     stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
-    bt_message_put_ref(late_msg);
 
 end:
     return stream_iter_status;
@@ -1208,11 +1217,11 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
          * one.
          */
         while (!stream_iter->current_msg) {
-            const bt_message *msg = NULL;
+            nonstd::optional<bt2::ConstMessage::Shared> msg;
             int64_t curr_msg_ts_ns = INT64_MAX;
 
             stream_iter_status =
-                lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, &msg);
+                lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, msg);
 
             if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
                 stream_iter_is_ended = true;
@@ -1227,14 +1236,14 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
 
             BT_CLOGD("Live stream iterator returned message: msg-type=%s, "
                      "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
-                     bt_common_message_type_string(bt_message_get_type(msg)),
+                     bt_common_message_type_string(bt_message_get_type((*msg)->libObjPtr())),
                      stream_iter->name.c_str(), stream_iter->viewer_stream_id);
 
             /*
              * Get the timestamp in nanoseconds from origin of this
              * messsage.
              */
-            live_get_msg_ts_ns(stream_iter, lttng_live_msg_iter, msg,
+            live_get_msg_ts_ns(stream_iter, lttng_live_msg_iter, (*msg)->libObjPtr(),
                                lttng_live_msg_iter->last_msg_ts_ns, &curr_msg_ts_ns);
 
             /*
@@ -1244,15 +1253,15 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
              * iterator. If not, we need to handle it with care.
              */
             if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
-                stream_iter->current_msg = msg;
+                stream_iter->current_msg = std::move(*msg);
                 stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
             } else {
                 /*
                  * We received a message from the past. This
                  * may be fixable but it can also be an error.
                  */
-                stream_iter_status =
-                    handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, msg);
+                stream_iter_status = handle_late_message(lttng_live_msg_iter, stream_iter,
+                                                         curr_msg_ts_ns, std::move(*msg));
                 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
                     BT_CLOGE_APPEND_CAUSE("Late message could not be handled correctly: "
                                           "lttng-live-msg-iter-addr=%p, "
@@ -1285,7 +1294,8 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
                  */
                 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
                 int ret = common_muxing_compare_messages(
-                    stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
+                    (*stream_iter->current_msg)->libObjPtr(),
+                    (*youngest_candidate_stream_iter->current_msg)->libObjPtr());
                 if (ret < 0) {
                     /*
                      * The `youngest_candidate_stream_iter->current_msg`
@@ -1396,7 +1406,8 @@ next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter
                  * deterministic way.
                  */
                 int ret = common_muxing_compare_messages(
-                    stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
+                    (*stream_iter->current_msg)->libObjPtr(),
+                    (*youngest_candidate_stream_iter->current_msg)->libObjPtr());
                 if (ret < 0) {
                     /*
                      * The `youngest_candidate_stream_iter->current_msg`
@@ -1606,8 +1617,9 @@ lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array
                      * Order the messages in an arbitrary but
                      * deterministic way.
                      */
-                    int ret = common_muxing_compare_messages(candidate_stream_iter->current_msg,
-                                                             youngest_stream_iter->current_msg);
+                    int ret = common_muxing_compare_messages(
+                        (*candidate_stream_iter->current_msg)->libObjPtr(),
+                        (*youngest_stream_iter->current_msg)->libObjPtr());
                     if (ret < 0) {
                         /*
                          * The `candidate_stream_iter->current_msg`
@@ -1644,7 +1656,8 @@ lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array
              * stream iterator current messsage to NULL so that next time
              * we fetch the next message of that stream iterator
              */
-            BT_MESSAGE_MOVE_REF(msgs[*count], youngest_stream_iter->current_msg);
+            msgs[*count] = youngest_stream_iter->current_msg->release().libObjPtr();
+            youngest_stream_iter->current_msg.reset();
             (*count)++;
 
             /* Update the last timestamp in nanoseconds sent downstream. */
index 52e419a915f954049add2e85427a1db65a7db45e..928ec5b0030ef6df627893a546ec1c4ebe512b30 100644 (file)
@@ -19,6 +19,7 @@
 #include <babeltrace2/babeltrace.h>
 
 #include "common/macros.h"
+#include "cpp-common/bt2/message.hpp"
 #include "../common/src/metadata/tsdl/decoder.hpp"
 #include "../common/src/msg-iter/msg-iter.hpp"
 #include "viewer-connection.hpp"
@@ -106,7 +107,7 @@ struct lttng_live_stream_iterator
      * The current message produced by this live stream iterator. Owned by
      * this.
      */
-    const bt_message *current_msg = nullptr;
+    nonstd::optional<bt2::ConstMessage::Shared> current_msg;
 
     /* Timestamp in nanoseconds of the current message (current_msg). */
     int64_t current_msg_ts_ns = 0;
This page took 0.031679 seconds and 5 git commands to generate.