static enum lttng_live_iterator_status
emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
- struct lttng_live_stream_iterator *stream_iter, const bt_message **message,
- uint64_t timestamp)
+ struct lttng_live_stream_iterator *stream_iter,
+ nonstd::optional<bt2::ConstMessage::Shared>& message, uint64_t timestamp)
{
enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
goto error;
}
- *message = msg;
+ message = bt2::ConstMessage::Shared::createWithoutRef(msg);
+
end:
return ret;
error:
ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- bt_message_put_ref(msg);
goto end;
}
static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
struct lttng_live_msg_iter *lttng_live_msg_iter,
- struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
+ struct lttng_live_stream_iterator *lttng_live_stream,
+ nonstd::optional<bt2::ConstMessage::Shared>& message)
{
enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
struct lttng_live_msg_iter *lttng_live_msg_iter,
- struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
+ struct lttng_live_stream_iterator *lttng_live_stream,
+ nonstd::optional<bt2::ConstMessage::Shared>& message)
{
enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
goto end;
}
- status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), message);
+ const bt_message *msg;
+ status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), &msg);
switch (status) {
case CTF_MSG_ITER_STATUS_EOF:
ret = LTTNG_LIVE_ITERATOR_STATUS_END;
break;
case CTF_MSG_ITER_STATUS_OK:
+ message = bt2::ConstMessage::Shared::createWithoutRef(msg);
ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
break;
case CTF_MSG_ITER_STATUS_AGAIN:
static enum lttng_live_iterator_status
lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_stream_iterator *stream_iter,
- const bt_message **curr_msg)
+ nonstd::optional<bt2::ConstMessage::Shared>& curr_msg)
{
enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
* `ctf_msg_iter` should simply realize that it needs to close the
* stream properly by emitting the necessary stream end message.
*/
+ const bt_message *msg;
enum ctf_msg_iter_status status =
- ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), curr_msg);
+ ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), &msg);
if (status == CTF_MSG_ITER_STATUS_ERROR) {
BT_CLOGE_APPEND_CAUSE("Error getting the next message from CTF message iterator");
BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
+ curr_msg = bt2::ConstMessage::Shared::createWithoutRef(msg);
+
end:
return live_status;
}
static enum lttng_live_iterator_status
lttng_live_iterator_next_msg_on_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_stream_iterator *stream_iter,
- const bt_message **curr_msg)
+ nonstd::optional<bt2::ConstMessage::Shared>& curr_msg)
{
const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
enum lttng_live_iterator_status live_status;
live_status = lttng_live_iterator_next_handle_one_quiescent_stream(lttng_live_msg_iter,
stream_iter, curr_msg);
if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
- BT_ASSERT(!*curr_msg);
+ BT_ASSERT(!curr_msg);
goto end;
}
- if (*curr_msg) {
+ if (curr_msg) {
goto end;
}
live_status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live_msg_iter,
stream_iter, curr_msg);
if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
- BT_ASSERT(!*curr_msg);
+ BT_ASSERT(!curr_msg);
}
end:
return live_status;
}
-static bool is_discarded_packet_or_event_message(const bt_message *msg)
+static bool is_discarded_packet_or_event_message(bt2::ConstMessage msg)
{
- const enum bt_message_type msg_type = bt_message_get_type(msg);
+ const bt2::MessageType type = msg.type();
- return msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS ||
- msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS;
+ return type == bt2::MessageType::DISCARDED_EVENTS ||
+ type == bt2::MessageType::DISCARDED_PACKETS;
}
-static enum lttng_live_iterator_status
-adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream,
- const bt_message *msg_in, bt_message **msg_out,
- uint64_t new_begin_ts)
+static enum lttng_live_iterator_status adjust_discarded_packets_message(
+ bt_self_message_iterator *iter, const bt_stream *stream, const bt_message *msg_in,
+ nonstd::optional<bt2::ConstMessage::Shared>& msg_out, uint64_t new_begin_ts)
{
enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
enum bt_property_availability availability;
availability = bt_message_discarded_packets_get_count(msg_in, &count);
BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
- *msg_out = bt_message_discarded_packets_create_with_default_clock_snapshots(
+ bt_message *msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
iter, stream, new_begin_ts, end_ts);
- if (!*msg_out) {
+ if (!msg) {
status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
goto end;
}
- bt_message_discarded_packets_set_count(*msg_out, count);
+ bt_message_discarded_packets_set_count(msg, count);
+ msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
+
end:
return status;
}
-static enum lttng_live_iterator_status
-adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream,
- const bt_message *msg_in, bt_message **msg_out,
- uint64_t new_begin_ts)
+static enum lttng_live_iterator_status adjust_discarded_events_message(
+ bt_self_message_iterator *iter, const bt_stream *stream, const bt_message *msg_in,
+ nonstd::optional<bt2::ConstMessage::Shared>& msg_out, uint64_t new_begin_ts)
{
enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
enum bt_property_availability availability;
availability = bt_message_discarded_events_get_count(msg_in, &count);
BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
- *msg_out = bt_message_discarded_events_create_with_default_clock_snapshots(
+ bt_message *msg = bt_message_discarded_events_create_with_default_clock_snapshots(
iter, stream, new_begin_ts, end_ts);
- if (!*msg_out) {
+ if (!msg) {
status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
goto end;
}
- bt_message_discarded_events_set_count(*msg_out, count);
+ bt_message_discarded_events_set_count(msg, count);
+ msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
+
end:
return status;
}
static enum lttng_live_iterator_status
handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns,
- const bt_message *late_msg)
+ bt2::ConstMessage::Shared late_msg)
{
const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
const bt_clock_class *clock_class;
int64_t last_inactivity_ts_ns;
enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
enum lttng_live_iterator_status adjust_status;
- bt_message *adjusted_message;
+ nonstd::optional<bt2::ConstMessage::Shared> adjusted_message;
/*
* The timestamp of the current message is before the last message sent
goto end;
}
- if (!is_discarded_packet_or_event_message(late_msg)) {
- BT_CLOGE_APPEND_CAUSE("Invalid live stream state: "
- "have a late message that is not a packet discarded or "
- "event discarded message: late-msg-type=%s",
- bt_common_message_type_string(bt_message_get_type(late_msg)));
+ if (!is_discarded_packet_or_event_message(*late_msg)) {
+ BT_CLOGE_APPEND_CAUSE(
+ "Invalid live stream state: "
+ "have a late message that is not a packet discarded or "
+ "event discarded message: late-msg-type=%s",
+ bt_common_message_type_string(bt_message_get_type(late_msg->libObjPtr())));
stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
goto end;
}
*/
BT_CLOGD("Adjusting the timestamp of late message: late-msg-type=%s, "
"msg-new-ts-ns=%" PRIu64,
- bt_common_message_type_string(bt_message_get_type(late_msg)),
+ bt_common_message_type_string(bt_message_get_type(late_msg->libObjPtr())),
stream_iter->last_inactivity_ts.value);
- switch (bt_message_get_type(late_msg)) {
- case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
+ switch (late_msg->type()) {
+ case bt2::MessageType::DISCARDED_EVENTS:
adjust_status = adjust_discarded_events_message(
- lttng_live_msg_iter->self_msg_iter, (*stream_iter->stream)->libObjPtr(), late_msg,
- &adjusted_message, stream_iter->last_inactivity_ts.value);
+ lttng_live_msg_iter->self_msg_iter, (*stream_iter->stream)->libObjPtr(),
+ late_msg->libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
break;
- case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
+ case bt2::MessageType::DISCARDED_PACKETS:
adjust_status = adjust_discarded_packets_message(
- lttng_live_msg_iter->self_msg_iter, (*stream_iter->stream)->libObjPtr(), late_msg,
- &adjusted_message, stream_iter->last_inactivity_ts.value);
+ lttng_live_msg_iter->self_msg_iter, (*stream_iter->stream)->libObjPtr(),
+ late_msg->libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
break;
default:
bt_common_abort();
BT_ASSERT_DBG(adjusted_message);
stream_iter->current_msg = adjusted_message;
stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
- bt_message_put_ref(late_msg);
end:
return stream_iter_status;
* one.
*/
while (!stream_iter->current_msg) {
- const bt_message *msg = NULL;
+ nonstd::optional<bt2::ConstMessage::Shared> msg;
int64_t curr_msg_ts_ns = INT64_MAX;
stream_iter_status =
- lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, &msg);
+ lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, msg);
if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
stream_iter_is_ended = true;
BT_CLOGD("Live stream iterator returned message: msg-type=%s, "
"stream-name=\"%s\", viewer-stream-id=%" PRIu64,
- bt_common_message_type_string(bt_message_get_type(msg)),
+ bt_common_message_type_string(bt_message_get_type((*msg)->libObjPtr())),
stream_iter->name.c_str(), stream_iter->viewer_stream_id);
/*
* Get the timestamp in nanoseconds from origin of this
* messsage.
*/
- live_get_msg_ts_ns(stream_iter, lttng_live_msg_iter, msg,
+ live_get_msg_ts_ns(stream_iter, lttng_live_msg_iter, (*msg)->libObjPtr(),
lttng_live_msg_iter->last_msg_ts_ns, &curr_msg_ts_ns);
/*
* iterator. If not, we need to handle it with care.
*/
if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
- stream_iter->current_msg = msg;
+ stream_iter->current_msg = std::move(*msg);
stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
} else {
/*
* We received a message from the past. This
* may be fixable but it can also be an error.
*/
- stream_iter_status =
- handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, msg);
+ stream_iter_status = handle_late_message(lttng_live_msg_iter, stream_iter,
+ curr_msg_ts_ns, std::move(*msg));
if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
BT_CLOGE_APPEND_CAUSE("Late message could not be handled correctly: "
"lttng-live-msg-iter-addr=%p, "
*/
BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
int ret = common_muxing_compare_messages(
- stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
+ (*stream_iter->current_msg)->libObjPtr(),
+ (*youngest_candidate_stream_iter->current_msg)->libObjPtr());
if (ret < 0) {
/*
* The `youngest_candidate_stream_iter->current_msg`
* deterministic way.
*/
int ret = common_muxing_compare_messages(
- stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
+ (*stream_iter->current_msg)->libObjPtr(),
+ (*youngest_candidate_stream_iter->current_msg)->libObjPtr());
if (ret < 0) {
/*
* The `youngest_candidate_stream_iter->current_msg`
* Order the messages in an arbitrary but
* deterministic way.
*/
- int ret = common_muxing_compare_messages(candidate_stream_iter->current_msg,
- youngest_stream_iter->current_msg);
+ int ret = common_muxing_compare_messages(
+ (*candidate_stream_iter->current_msg)->libObjPtr(),
+ (*youngest_stream_iter->current_msg)->libObjPtr());
if (ret < 0) {
/*
* The `candidate_stream_iter->current_msg`
* stream iterator current messsage to NULL so that next time
* we fetch the next message of that stream iterator
*/
- BT_MESSAGE_MOVE_REF(msgs[*count], youngest_stream_iter->current_msg);
+ msgs[*count] = youngest_stream_iter->current_msg->release().libObjPtr();
+ youngest_stream_iter->current_msg.reset();
(*count)++;
/* Update the last timestamp in nanoseconds sent downstream. */