From 35e432d77cf9a4cf5c21d0cbfcc7f7dfd9e63073 Mon Sep 17 00:00:00 2001 From: Simon Marchi Date: Wed, 10 Apr 2024 10:58:05 -0400 Subject: [PATCH] src.ctf.lttng-live: make lttng_live_stream_iterator::current_msg a bt2::Message::Shared Adjust a bunch of related functions involved in creating the message that ends up there. Signed-off-by: Simon Marchi Change-Id: I2dabd3b3e6d0d17ee2a23774863b160505a0b830 Reviewed-on: https://review.lttng.org/c/babeltrace/+/8458 Reviewed-by: Philippe Proulx Reviewed-on: https://review.lttng.org/c/babeltrace/+/12374 Tested-by: jenkins --- src/plugins/ctf/lttng-live/data-stream.cpp | 2 - src/plugins/ctf/lttng-live/lttng-live.cpp | 142 ++++++++++----------- src/plugins/ctf/lttng-live/lttng-live.hpp | 8 +- 3 files changed, 70 insertions(+), 82 deletions(-) diff --git a/src/plugins/ctf/lttng-live/data-stream.cpp b/src/plugins/ctf/lttng-live/data-stream.cpp index 4cfc7e0e..cf9771ae 100644 --- a/src/plugins/ctf/lttng-live/data-stream.cpp +++ b/src/plugins/ctf/lttng-live/data-stream.cpp @@ -232,8 +232,6 @@ void lttng_live_stream_iterator_destroy(struct lttng_live_stream_iterator *strea return; } - bt_message_put_ref(stream_iter->current_msg); - /* Track the number of active stream iterator. */ stream_iter->trace->session->lttng_live_msg_iter->active_stream_iter--; diff --git a/src/plugins/ctf/lttng-live/lttng-live.cpp b/src/plugins/ctf/lttng-live/lttng-live.cpp index dfeb9ae8..3f74c408 100644 --- a/src/plugins/ctf/lttng-live/lttng-live.cpp +++ b/src/plugins/ctf/lttng-live/lttng-live.cpp @@ -564,41 +564,32 @@ end: static enum lttng_live_iterator_status emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter, - struct lttng_live_stream_iterator *stream_iter, const bt_message **message, - uint64_t timestamp) + struct lttng_live_stream_iterator *stream_iter, + bt2::ConstMessage::Shared& message, uint64_t timestamp) { - enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK; - bt_message *msg = NULL; - BT_ASSERT(stream_iter->trace->clock_class); - BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Emitting inactivity message for stream: ctf-stream-id={}, " "viewer-stream-id={}, timestamp={}", stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id, timestamp); - msg = bt_message_message_iterator_inactivity_create(lttng_live_msg_iter->self_msg_iter, - stream_iter->trace->clock_class, timestamp); + const auto msg = bt_message_message_iterator_inactivity_create( + lttng_live_msg_iter->self_msg_iter, stream_iter->trace->clock_class, timestamp); + if (!msg) { BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, "Error emitting message iterator inactivity message"); - goto error; + return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } - *message = msg; -end: - return ret; - -error: - ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - bt_message_put_ref(msg); - goto end; + message = bt2::ConstMessage::Shared::createWithoutRef(msg); + return LTTNG_LIVE_ITERATOR_STATUS_OK; } static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream( struct lttng_live_msg_iter *lttng_live_msg_iter, - struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message) + struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message) { enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK; @@ -698,7 +689,7 @@ end: static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream( struct lttng_live_msg_iter *lttng_live_msg_iter, - struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message) + struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message) { enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK; enum ctf_msg_iter_status status; @@ -739,12 +730,14 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_activ goto end; } - status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), message); + const bt_message *msg; + status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), &msg); switch (status) { case CTF_MSG_ITER_STATUS_EOF: ret = LTTNG_LIVE_ITERATOR_STATUS_END; break; case CTF_MSG_ITER_STATUS_OK: + message = bt2::ConstMessage::Shared::createWithoutRef(msg); ret = LTTNG_LIVE_ITERATOR_STATUS_OK; break; case CTF_MSG_ITER_STATUS_AGAIN: @@ -772,7 +765,7 @@ end: static enum lttng_live_iterator_status lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator *stream_iter, - const bt_message **curr_msg) + bt2::ConstMessage::Shared& curr_msg) { enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK; @@ -786,8 +779,9 @@ lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter * `ctf_msg_iter` should simply realize that it needs to close the * stream properly by emitting the necessary stream end message. */ + const bt_message *msg; enum ctf_msg_iter_status status = - ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), curr_msg); + ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), &msg); if (status == CTF_MSG_ITER_STATUS_ERROR) { BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, @@ -803,6 +797,8 @@ lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK); + curr_msg = bt2::ConstMessage::Shared::createWithoutRef(msg); + end: return live_status; } @@ -858,7 +854,7 @@ end: static enum lttng_live_iterator_status lttng_live_iterator_next_msg_on_stream(struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator *stream_iter, - const bt_message **curr_msg) + bt2::ConstMessage::Shared& curr_msg) { enum lttng_live_iterator_status live_status; @@ -907,16 +903,16 @@ retry: live_status = lttng_live_iterator_next_handle_one_quiescent_stream(lttng_live_msg_iter, stream_iter, curr_msg); if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { - BT_ASSERT(!*curr_msg); + BT_ASSERT(!curr_msg); goto end; } - if (*curr_msg) { + if (curr_msg) { goto end; } live_status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live_msg_iter, stream_iter, curr_msg); if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { - BT_ASSERT(!*curr_msg); + BT_ASSERT(!curr_msg); } end: @@ -935,20 +931,17 @@ end: return live_status; } -static bool is_discarded_packet_or_event_message(const bt_message *msg) +static bool is_discarded_packet_or_event_message(const bt2::ConstMessage msg) { - const enum bt_message_type msg_type = bt_message_get_type(msg); - - return msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS || - msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS; + return msg.type() == bt2::MessageType::DiscardedEvents || + msg.type() == bt2::MessageType::DiscardedPackets; } static enum lttng_live_iterator_status adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream, - const bt_message *msg_in, bt_message **msg_out, + const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out, uint64_t new_begin_ts) { - enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK; enum bt_property_availability availability; const bt_clock_snapshot *clock_snapshot; uint64_t end_ts; @@ -960,24 +953,23 @@ adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream availability = bt_message_discarded_packets_get_count(msg_in, &count); BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE); - *msg_out = bt_message_discarded_packets_create_with_default_clock_snapshots( + const auto msg = bt_message_discarded_packets_create_with_default_clock_snapshots( iter, stream, new_begin_ts, end_ts); - if (!*msg_out) { - status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM; - goto end; + + if (!msg) { + return LTTNG_LIVE_ITERATOR_STATUS_NOMEM; } - bt_message_discarded_packets_set_count(*msg_out, count); -end: - return status; + bt_message_discarded_packets_set_count(msg, count); + msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg); + return LTTNG_LIVE_ITERATOR_STATUS_OK; } static enum lttng_live_iterator_status adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream, - const bt_message *msg_in, bt_message **msg_out, + const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out, uint64_t new_begin_ts) { - enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK; enum bt_property_availability availability; const bt_clock_snapshot *clock_snapshot; uint64_t end_ts; @@ -989,22 +981,22 @@ adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream availability = bt_message_discarded_events_get_count(msg_in, &count); BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE); - *msg_out = bt_message_discarded_events_create_with_default_clock_snapshots( + const auto msg = bt_message_discarded_events_create_with_default_clock_snapshots( iter, stream, new_begin_ts, end_ts); - if (!*msg_out) { - status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM; - goto end; + + if (!msg) { + return LTTNG_LIVE_ITERATOR_STATUS_NOMEM; } - bt_message_discarded_events_set_count(*msg_out, count); -end: - return status; + bt_message_discarded_events_set_count(msg, count); + msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg); + return LTTNG_LIVE_ITERATOR_STATUS_OK; } static enum lttng_live_iterator_status handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns, - const bt_message *late_msg) + const bt2::ConstMessage& late_msg) { const bt_clock_class *clock_class; const bt_stream_class *stream_class; @@ -1012,7 +1004,7 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter, int64_t last_inactivity_ts_ns; enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK; enum lttng_live_iterator_status adjust_status; - bt_message *adjusted_message; + bt2::ConstMessage::Shared adjusted_message; /* * The timestamp of the current message is before the last message sent @@ -1062,7 +1054,7 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter, "Invalid live stream state: " "have a late message that is not a packet discarded or " "event discarded message: late-msg-type={}", - static_cast(bt_message_get_type(late_msg))); + late_msg.type()); stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; goto end; } @@ -1098,18 +1090,17 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter, BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Adjusting the timestamp of late message: late-msg-type={}, " "msg-new-ts-ns={}", - static_cast(bt_message_get_type(late_msg)), - stream_iter->last_inactivity_ts.value); - switch (bt_message_get_type(late_msg)) { - case BT_MESSAGE_TYPE_DISCARDED_EVENTS: + late_msg.type(), stream_iter->last_inactivity_ts.value); + switch (late_msg.type()) { + case bt2::MessageType::DiscardedEvents: adjust_status = adjust_discarded_events_message( - lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(), late_msg, - &adjusted_message, stream_iter->last_inactivity_ts.value); + lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(), + late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value); break; - case BT_MESSAGE_TYPE_DISCARDED_PACKETS: + case bt2::MessageType::DiscardedPackets: adjust_status = adjust_discarded_packets_message( - lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(), late_msg, - &adjusted_message, stream_iter->last_inactivity_ts.value); + lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(), + late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value); break; default: bt_common_abort(); @@ -1123,7 +1114,6 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter, BT_ASSERT_DBG(adjusted_message); stream_iter->current_msg = adjusted_message; stream_iter->current_msg_ts_ns = last_inactivity_ts_ns; - bt_message_put_ref(late_msg); end: return stream_iter_status; @@ -1164,11 +1154,11 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter, * one. */ while (!stream_iter->current_msg) { - const bt_message *msg = NULL; + bt2::ConstMessage::Shared msg; int64_t curr_msg_ts_ns = INT64_MAX; stream_iter_status = - lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, &msg); + lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, msg); if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) { stream_iter_is_ended = true; @@ -1184,15 +1174,14 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter, BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Live stream iterator returned message: msg-type={}, " "stream-name=\"{}\", viewer-stream-id={}", - static_cast(bt_message_get_type(msg)), - stream_iter->name, stream_iter->viewer_stream_id); + msg->type(), stream_iter->name, stream_iter->viewer_stream_id); /* * Get the timestamp in nanoseconds from origin of this * message. */ - live_get_msg_ts_ns(lttng_live_msg_iter, msg, lttng_live_msg_iter->last_msg_ts_ns, - &curr_msg_ts_ns); + live_get_msg_ts_ns(lttng_live_msg_iter, msg->libObjPtr(), + lttng_live_msg_iter->last_msg_ts_ns, &curr_msg_ts_ns); /* * Check if the message of the current live stream @@ -1201,7 +1190,7 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter, * iterator. If not, we need to handle it with care. */ if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) { - stream_iter->current_msg = msg; + stream_iter->current_msg = std::move(msg); stream_iter->current_msg_ts_ns = curr_msg_ts_ns; } else { /* @@ -1209,7 +1198,7 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter, * may be fixable but it can also be an error. */ stream_iter_status = - handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, msg); + handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, *msg); if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, "Late message could not be handled correctly: " @@ -1244,7 +1233,8 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter, */ BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter); int ret = common_muxing_compare_messages( - stream_iter->current_msg, youngest_candidate_stream_iter->current_msg); + stream_iter->current_msg->libObjPtr(), + youngest_candidate_stream_iter->current_msg->libObjPtr()); if (ret < 0) { /* * The `youngest_candidate_stream_iter->current_msg` @@ -1356,7 +1346,8 @@ next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter * deterministic way. */ int ret = common_muxing_compare_messages( - stream_iter->current_msg, youngest_candidate_stream_iter->current_msg); + stream_iter->current_msg->libObjPtr(), + youngest_candidate_stream_iter->current_msg->libObjPtr()); if (ret < 0) { /* * The `youngest_candidate_stream_iter->current_msg` @@ -1565,8 +1556,9 @@ lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array * Order the messages in an arbitrary but * deterministic way. */ - int ret = common_muxing_compare_messages(candidate_stream_iter->current_msg, - youngest_stream_iter->current_msg); + int ret = common_muxing_compare_messages( + candidate_stream_iter->current_msg->libObjPtr(), + youngest_stream_iter->current_msg->libObjPtr()); if (ret < 0) { /* * The `candidate_stream_iter->current_msg` @@ -1604,7 +1596,7 @@ lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array * stream iterator current message to NULL so that next time * we fetch the next message of that stream iterator */ - BT_MESSAGE_MOVE_REF(msgs[*count], youngest_stream_iter->current_msg); + msgs[*count] = youngest_stream_iter->current_msg.release().libObjPtr(); (*count)++; /* Update the last timestamp in nanoseconds sent downstream. */ diff --git a/src/plugins/ctf/lttng-live/lttng-live.hpp b/src/plugins/ctf/lttng-live/lttng-live.hpp index a4974f34..56c86d39 100644 --- a/src/plugins/ctf/lttng-live/lttng-live.hpp +++ b/src/plugins/ctf/lttng-live/lttng-live.hpp @@ -16,6 +16,7 @@ #include +#include "cpp-common/bt2/message.hpp" #include "cpp-common/vendor/fmt/format.h" /* IWYU pragma: keep */ #include "../common/src/metadata/tsdl/decoder.hpp" @@ -118,11 +119,8 @@ struct lttng_live_stream_iterator enum lttng_live_stream_state state = LTTNG_LIVE_STREAM_QUIESCENT; - /* - * The current message produced by this live stream iterator. Owned by - * this. - */ - const bt_message *current_msg = nullptr; + /* The current message produced by this live stream iterator. */ + bt2::ConstMessage::Shared current_msg; /* Timestamp in nanoseconds of the current message (current_msg). */ int64_t current_msg_ts_ns = 0; -- 2.34.1