static enum lttng_live_iterator_status
emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
- struct lttng_live_stream_iterator *stream_iter, const bt_message **message,
- uint64_t timestamp)
+ struct lttng_live_stream_iterator *stream_iter,
+ bt2::ConstMessage::Shared& message, uint64_t timestamp)
{
- enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
- bt_message *msg = NULL;
-
BT_ASSERT(stream_iter->trace->clock_class);
-
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Emitting inactivity message for stream: ctf-stream-id={}, "
"viewer-stream-id={}, timestamp={}",
stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id,
timestamp);
- msg = bt_message_message_iterator_inactivity_create(lttng_live_msg_iter->self_msg_iter,
- stream_iter->trace->clock_class, timestamp);
+ const auto msg = bt_message_message_iterator_inactivity_create(
+ lttng_live_msg_iter->self_msg_iter, stream_iter->trace->clock_class, timestamp);
+
if (!msg) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Error emitting message iterator inactivity message");
- goto error;
+ return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
}
- *message = msg;
-end:
- return ret;
-
-error:
- ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- bt_message_put_ref(msg);
- goto end;
+ message = bt2::ConstMessage::Shared::createWithoutRef(msg);
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
struct lttng_live_msg_iter *lttng_live_msg_iter,
- struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
+ struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message)
{
enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
struct lttng_live_msg_iter *lttng_live_msg_iter,
- struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
+ struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message)
{
enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
enum ctf_msg_iter_status status;
goto end;
}
- status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), message);
+ const bt_message *msg;
+ status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), &msg);
switch (status) {
case CTF_MSG_ITER_STATUS_EOF:
ret = LTTNG_LIVE_ITERATOR_STATUS_END;
break;
case CTF_MSG_ITER_STATUS_OK:
+ message = bt2::ConstMessage::Shared::createWithoutRef(msg);
ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
break;
case CTF_MSG_ITER_STATUS_AGAIN:
static enum lttng_live_iterator_status
lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_stream_iterator *stream_iter,
- const bt_message **curr_msg)
+ bt2::ConstMessage::Shared& curr_msg)
{
enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
* `ctf_msg_iter` should simply realize that it needs to close the
* stream properly by emitting the necessary stream end message.
*/
+ const bt_message *msg;
enum ctf_msg_iter_status status =
- ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), curr_msg);
+ ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), &msg);
if (status == CTF_MSG_ITER_STATUS_ERROR) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
+ curr_msg = bt2::ConstMessage::Shared::createWithoutRef(msg);
+
end:
return live_status;
}
static enum lttng_live_iterator_status
lttng_live_iterator_next_msg_on_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_stream_iterator *stream_iter,
- const bt_message **curr_msg)
+ bt2::ConstMessage::Shared& curr_msg)
{
enum lttng_live_iterator_status live_status;
live_status = lttng_live_iterator_next_handle_one_quiescent_stream(lttng_live_msg_iter,
stream_iter, curr_msg);
if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
- BT_ASSERT(!*curr_msg);
+ BT_ASSERT(!curr_msg);
goto end;
}
- if (*curr_msg) {
+ if (curr_msg) {
goto end;
}
live_status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live_msg_iter,
stream_iter, curr_msg);
if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
- BT_ASSERT(!*curr_msg);
+ BT_ASSERT(!curr_msg);
}
end:
return live_status;
}
-static bool is_discarded_packet_or_event_message(const bt_message *msg)
+static bool is_discarded_packet_or_event_message(const bt2::ConstMessage msg)
{
- const enum bt_message_type msg_type = bt_message_get_type(msg);
-
- return msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS ||
- msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS;
+ return msg.type() == bt2::MessageType::DiscardedEvents ||
+ msg.type() == bt2::MessageType::DiscardedPackets;
}
static enum lttng_live_iterator_status
adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream,
- const bt_message *msg_in, bt_message **msg_out,
+ const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out,
uint64_t new_begin_ts)
{
- enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
enum bt_property_availability availability;
const bt_clock_snapshot *clock_snapshot;
uint64_t end_ts;
availability = bt_message_discarded_packets_get_count(msg_in, &count);
BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
- *msg_out = bt_message_discarded_packets_create_with_default_clock_snapshots(
+ const auto msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
iter, stream, new_begin_ts, end_ts);
- if (!*msg_out) {
- status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
- goto end;
+
+ if (!msg) {
+ return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
}
- bt_message_discarded_packets_set_count(*msg_out, count);
-end:
- return status;
+ bt_message_discarded_packets_set_count(msg, count);
+ msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
static enum lttng_live_iterator_status
adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream,
- const bt_message *msg_in, bt_message **msg_out,
+ const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out,
uint64_t new_begin_ts)
{
- enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
enum bt_property_availability availability;
const bt_clock_snapshot *clock_snapshot;
uint64_t end_ts;
availability = bt_message_discarded_events_get_count(msg_in, &count);
BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
- *msg_out = bt_message_discarded_events_create_with_default_clock_snapshots(
+ const auto msg = bt_message_discarded_events_create_with_default_clock_snapshots(
iter, stream, new_begin_ts, end_ts);
- if (!*msg_out) {
- status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
- goto end;
+
+ if (!msg) {
+ return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
}
- bt_message_discarded_events_set_count(*msg_out, count);
-end:
- return status;
+ bt_message_discarded_events_set_count(msg, count);
+ msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
static enum lttng_live_iterator_status
handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns,
- const bt_message *late_msg)
+ const bt2::ConstMessage& late_msg)
{
const bt_clock_class *clock_class;
const bt_stream_class *stream_class;
int64_t last_inactivity_ts_ns;
enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
enum lttng_live_iterator_status adjust_status;
- bt_message *adjusted_message;
+ bt2::ConstMessage::Shared adjusted_message;
/*
* The timestamp of the current message is before the last message sent
"Invalid live stream state: "
"have a late message that is not a packet discarded or "
"event discarded message: late-msg-type={}",
- static_cast<bt2::MessageType>(bt_message_get_type(late_msg)));
+ late_msg.type());
stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
goto end;
}
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Adjusting the timestamp of late message: late-msg-type={}, "
"msg-new-ts-ns={}",
- static_cast<bt2::MessageType>(bt_message_get_type(late_msg)),
- stream_iter->last_inactivity_ts.value);
- switch (bt_message_get_type(late_msg)) {
- case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
+ late_msg.type(), stream_iter->last_inactivity_ts.value);
+ switch (late_msg.type()) {
+ case bt2::MessageType::DiscardedEvents:
adjust_status = adjust_discarded_events_message(
- lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(), late_msg,
- &adjusted_message, stream_iter->last_inactivity_ts.value);
+ lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(),
+ late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
break;
- case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
+ case bt2::MessageType::DiscardedPackets:
adjust_status = adjust_discarded_packets_message(
- lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(), late_msg,
- &adjusted_message, stream_iter->last_inactivity_ts.value);
+ lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(),
+ late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
break;
default:
bt_common_abort();
BT_ASSERT_DBG(adjusted_message);
stream_iter->current_msg = adjusted_message;
stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
- bt_message_put_ref(late_msg);
end:
return stream_iter_status;
* one.
*/
while (!stream_iter->current_msg) {
- const bt_message *msg = NULL;
+ bt2::ConstMessage::Shared msg;
int64_t curr_msg_ts_ns = INT64_MAX;
stream_iter_status =
- lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, &msg);
+ lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, msg);
if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
stream_iter_is_ended = true;
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Live stream iterator returned message: msg-type={}, "
"stream-name=\"{}\", viewer-stream-id={}",
- static_cast<bt2::MessageType>(bt_message_get_type(msg)),
- stream_iter->name, stream_iter->viewer_stream_id);
+ msg->type(), stream_iter->name, stream_iter->viewer_stream_id);
/*
* Get the timestamp in nanoseconds from origin of this
* message.
*/
- live_get_msg_ts_ns(lttng_live_msg_iter, msg, lttng_live_msg_iter->last_msg_ts_ns,
- &curr_msg_ts_ns);
+ live_get_msg_ts_ns(lttng_live_msg_iter, msg->libObjPtr(),
+ lttng_live_msg_iter->last_msg_ts_ns, &curr_msg_ts_ns);
/*
* Check if the message of the current live stream
* iterator. If not, we need to handle it with care.
*/
if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
- stream_iter->current_msg = msg;
+ stream_iter->current_msg = std::move(msg);
stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
} else {
/*
* may be fixable but it can also be an error.
*/
stream_iter_status =
- handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, msg);
+ handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, *msg);
if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Late message could not be handled correctly: "
*/
BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
int ret = common_muxing_compare_messages(
- stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
+ stream_iter->current_msg->libObjPtr(),
+ youngest_candidate_stream_iter->current_msg->libObjPtr());
if (ret < 0) {
/*
* The `youngest_candidate_stream_iter->current_msg`
* deterministic way.
*/
int ret = common_muxing_compare_messages(
- stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
+ stream_iter->current_msg->libObjPtr(),
+ youngest_candidate_stream_iter->current_msg->libObjPtr());
if (ret < 0) {
/*
* The `youngest_candidate_stream_iter->current_msg`
* Order the messages in an arbitrary but
* deterministic way.
*/
- int ret = common_muxing_compare_messages(candidate_stream_iter->current_msg,
- youngest_stream_iter->current_msg);
+ int ret = common_muxing_compare_messages(
+ candidate_stream_iter->current_msg->libObjPtr(),
+ youngest_stream_iter->current_msg->libObjPtr());
if (ret < 0) {
/*
* The `candidate_stream_iter->current_msg`
* stream iterator current message to NULL so that next time
* we fetch the next message of that stream iterator
*/
- BT_MESSAGE_MOVE_REF(msgs[*count], youngest_stream_iter->current_msg);
+ msgs[*count] = youngest_stream_iter->current_msg.release().libObjPtr();
(*count)++;
/* Update the last timestamp in nanoseconds sent downstream. */