#include "common/assert.h"
#include "cpp-common/bt2c/fmt.hpp"
#include "cpp-common/bt2c/glib-up.hpp"
+#include "cpp-common/bt2c/vector.hpp"
#include "cpp-common/bt2s/make-unique.hpp"
#include "cpp-common/vendor/fmt/format.h"
static struct lttng_live_trace *
lttng_live_session_borrow_trace_by_id(struct lttng_live_session *session, uint64_t trace_id)
{
- uint64_t trace_idx;
- struct lttng_live_trace *ret_trace = NULL;
-
- for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
- struct lttng_live_trace *trace =
- (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
+ for (lttng_live_trace::UP& trace : session->traces) {
if (trace->id == trace_id) {
- ret_trace = trace;
- goto end;
+ return trace.get();
}
}
-end:
- return ret_trace;
-}
-
-static void lttng_live_destroy_trace(struct lttng_live_trace *trace)
-{
- BT_CPPLOGD_SPEC(trace->logger, "Destroying live trace: trace-id={}", trace->id);
-
- BT_ASSERT(trace->stream_iterators);
- g_ptr_array_free(trace->stream_iterators, TRUE);
-
- delete trace;
+ return nullptr;
}
static struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
BT_CPPLOGD_SPEC(session->logger, "Creating live trace: session-id={}, trace-id={}", session->id,
trace_id);
- lttng_live_trace *trace = new lttng_live_trace {session->logger};
+ auto trace = bt2s::make_unique<lttng_live_trace>(session->logger);
+
trace->session = session;
trace->id = trace_id;
- trace->stream_iterators =
- g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_stream_iterator_destroy);
- BT_ASSERT(trace->stream_iterators);
trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
- g_ptr_array_add(session->traces, trace);
- return trace;
+ const auto ret = trace.get();
+ session->traces.emplace_back(std::move(trace));
+ return ret;
}
struct lttng_live_trace *
lttng_live_session *session = new lttng_live_session {lttng_live_msg_iter->logger};
session->self_comp = lttng_live_msg_iter->self_comp;
session->id = session_id;
- session->traces = g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_trace);
- BT_ASSERT(session->traces);
session->lttng_live_msg_iter = lttng_live_msg_iter;
session->new_streams_needed = true;
session->hostname = g_string_new(hostname);
session->id = -1ULL;
}
- if (session->traces) {
- g_ptr_array_free(session->traces, TRUE);
- }
-
if (session->hostname) {
g_string_free(session->hostname, TRUE);
}
struct lttng_live_session *session)
{
enum lttng_live_iterator_status status;
- uint64_t trace_idx;
if (!session->attached) {
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Attach to session: session-id={}",
break;
case LTTNG_LIVE_ITERATOR_STATUS_END:
/*
- * We received a `_END` from the `_get_new_streams()` function,
- * which means no more data will ever be received from the data
- * streams of this session. But it's possible that the metadata
- * is incomplete.
- * The live protocol guarantees that we receive all the
- * metadata needed before we receive data streams needing it.
- * But it's possible to receive metadata NOT needed by
- * data streams after the session was closed. For example, this
- * could happen if a new event is registered and the session is
- * stopped before any tracepoint for that event is actually
- * fired.
- */
+ * We received a `_END` from the `_get_new_streams()` function,
+ * which means no more data will ever be received from the data
+ * streams of this session. But it's possible that the metadata
+ * is incomplete.
+ * The live protocol guarantees that we receive all the
+ * metadata needed before we receive data streams needing it.
+ * But it's possible to receive metadata NOT needed by
+ * data streams after the session was closed. For example, this
+ * could happen if a new event is registered and the session is
+ * stopped before any tracepoint for that event is actually
+ * fired.
+ */
BT_CPPLOGD_SPEC(
lttng_live_msg_iter->logger,
"Updating streams returned _END status. Override status to _OK in order fetch any remaining metadata:"
"session-id={}, session-name=\"{}\"",
session->id, session->session_name->str);
- trace_idx = 0;
- while (trace_idx < session->traces->len) {
- struct lttng_live_trace *trace =
- (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
-
- status = lttng_live_metadata_update(trace);
+ for (lttng_live_trace::UP& trace : session->traces) {
+ status = lttng_live_metadata_update(trace.get());
switch (status) {
case LTTNG_LIVE_ITERATOR_STATUS_END:
case LTTNG_LIVE_ITERATOR_STATUS_OK:
- trace_idx++;
break;
case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
static void
lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
{
- uint64_t session_idx, trace_idx;
+ uint64_t session_idx;
for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
struct lttng_live_session *session =
"session-id={}",
session->id);
session->new_streams_needed = true;
- for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
- struct lttng_live_trace *trace =
- (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
+ for (lttng_live_trace::UP& trace : session->traces) {
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Force marking trace metadata state as needing an update: "
"session-id={}, trace-id={}",
static enum lttng_live_iterator_status
emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
- struct lttng_live_stream_iterator *stream_iter, const bt_message **message,
- uint64_t timestamp)
+ struct lttng_live_stream_iterator *stream_iter,
+ bt2::ConstMessage::Shared& message, uint64_t timestamp)
{
- enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
- bt_message *msg = NULL;
-
BT_ASSERT(stream_iter->trace->clock_class);
-
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Emitting inactivity message for stream: ctf-stream-id={}, "
"viewer-stream-id={}, timestamp={}",
stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id,
timestamp);
- msg = bt_message_message_iterator_inactivity_create(lttng_live_msg_iter->self_msg_iter,
- stream_iter->trace->clock_class, timestamp);
+ const auto msg = bt_message_message_iterator_inactivity_create(
+ lttng_live_msg_iter->self_msg_iter, stream_iter->trace->clock_class, timestamp);
+
if (!msg) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Error emitting message iterator inactivity message");
- goto error;
+ return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
}
- *message = msg;
-end:
- return ret;
-
-error:
- ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- bt_message_put_ref(msg);
- goto end;
+ message = bt2::ConstMessage::Shared::createWithoutRef(msg);
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
struct lttng_live_msg_iter *lttng_live_msg_iter,
- struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
+ struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message)
{
enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
struct lttng_live_msg_iter *lttng_live_msg_iter,
- struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
+ struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message)
{
enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
enum ctf_msg_iter_status status;
- uint64_t session_idx, trace_idx;
+ uint64_t session_idx;
for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
struct lttng_live_session *session =
ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
goto end;
}
- for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
- struct lttng_live_trace *trace =
- (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
+ for (lttng_live_trace::UP& trace : session->traces) {
if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Need an update for metadata stream: "
goto end;
}
- status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), message);
+ const bt_message *msg;
+ status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), &msg);
switch (status) {
case CTF_MSG_ITER_STATUS_EOF:
ret = LTTNG_LIVE_ITERATOR_STATUS_END;
break;
case CTF_MSG_ITER_STATUS_OK:
+ message = bt2::ConstMessage::Shared::createWithoutRef(msg);
ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
break;
case CTF_MSG_ITER_STATUS_AGAIN:
static enum lttng_live_iterator_status
lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_stream_iterator *stream_iter,
- const bt_message **curr_msg)
+ bt2::ConstMessage::Shared& curr_msg)
{
enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
* `ctf_msg_iter` should simply realize that it needs to close the
* stream properly by emitting the necessary stream end message.
*/
+ const bt_message *msg;
enum ctf_msg_iter_status status =
- ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), curr_msg);
+ ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), &msg);
if (status == CTF_MSG_ITER_STATUS_ERROR) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
+ curr_msg = bt2::ConstMessage::Shared::createWithoutRef(msg);
+
end:
return live_status;
}
static enum lttng_live_iterator_status
lttng_live_iterator_next_msg_on_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_stream_iterator *stream_iter,
- const bt_message **curr_msg)
+ bt2::ConstMessage::Shared& curr_msg)
{
enum lttng_live_iterator_status live_status;
live_status = lttng_live_iterator_next_handle_one_quiescent_stream(lttng_live_msg_iter,
stream_iter, curr_msg);
if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
- BT_ASSERT(!*curr_msg);
+ BT_ASSERT(!curr_msg);
goto end;
}
- if (*curr_msg) {
+ if (curr_msg) {
goto end;
}
live_status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live_msg_iter,
stream_iter, curr_msg);
if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
- BT_ASSERT(!*curr_msg);
+ BT_ASSERT(!curr_msg);
}
end:
return live_status;
}
-static bool is_discarded_packet_or_event_message(const bt_message *msg)
+static bool is_discarded_packet_or_event_message(const bt2::ConstMessage msg)
{
- const enum bt_message_type msg_type = bt_message_get_type(msg);
-
- return msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS ||
- msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS;
+ return msg.type() == bt2::MessageType::DiscardedEvents ||
+ msg.type() == bt2::MessageType::DiscardedPackets;
}
static enum lttng_live_iterator_status
adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream,
- const bt_message *msg_in, bt_message **msg_out,
+ const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out,
uint64_t new_begin_ts)
{
- enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
enum bt_property_availability availability;
const bt_clock_snapshot *clock_snapshot;
uint64_t end_ts;
availability = bt_message_discarded_packets_get_count(msg_in, &count);
BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
- *msg_out = bt_message_discarded_packets_create_with_default_clock_snapshots(
+ const auto msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
iter, stream, new_begin_ts, end_ts);
- if (!*msg_out) {
- status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
- goto end;
+
+ if (!msg) {
+ return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
}
- bt_message_discarded_packets_set_count(*msg_out, count);
-end:
- return status;
+ bt_message_discarded_packets_set_count(msg, count);
+ msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
static enum lttng_live_iterator_status
adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream,
- const bt_message *msg_in, bt_message **msg_out,
+ const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out,
uint64_t new_begin_ts)
{
- enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
enum bt_property_availability availability;
const bt_clock_snapshot *clock_snapshot;
uint64_t end_ts;
availability = bt_message_discarded_events_get_count(msg_in, &count);
BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
- *msg_out = bt_message_discarded_events_create_with_default_clock_snapshots(
+ const auto msg = bt_message_discarded_events_create_with_default_clock_snapshots(
iter, stream, new_begin_ts, end_ts);
- if (!*msg_out) {
- status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
- goto end;
+
+ if (!msg) {
+ return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
}
- bt_message_discarded_events_set_count(*msg_out, count);
-end:
- return status;
+ bt_message_discarded_events_set_count(msg, count);
+ msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
static enum lttng_live_iterator_status
handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns,
- const bt_message *late_msg)
+ const bt2::ConstMessage& late_msg)
{
const bt_clock_class *clock_class;
const bt_stream_class *stream_class;
int64_t last_inactivity_ts_ns;
enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
enum lttng_live_iterator_status adjust_status;
- bt_message *adjusted_message;
+ bt2::ConstMessage::Shared adjusted_message;
/*
* The timestamp of the current message is before the last message sent
* In short, the only scenario in which it's okay and fixable to
* received a late message is when:
* 1. the late message is a discarded packets or discarded events
- * message,
+ * message,
* 2. this stream produced an inactivity message downstream, and
* 3. the timestamp of the late message is within the inactivity
- * timespan we sent downstream through the inactivity message.
+ * timespan we sent downstream through the inactivity message.
*/
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Invalid live stream state: "
"have a late message that is not a packet discarded or "
"event discarded message: late-msg-type={}",
- static_cast<bt2::MessageType>(bt_message_get_type(late_msg)));
+ late_msg.type());
stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
goto end;
}
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Adjusting the timestamp of late message: late-msg-type={}, "
"msg-new-ts-ns={}",
- static_cast<bt2::MessageType>(bt_message_get_type(late_msg)),
- stream_iter->last_inactivity_ts.value);
- switch (bt_message_get_type(late_msg)) {
- case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
+ late_msg.type(), stream_iter->last_inactivity_ts.value);
+ switch (late_msg.type()) {
+ case bt2::MessageType::DiscardedEvents:
adjust_status = adjust_discarded_events_message(
- lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(), late_msg,
- &adjusted_message, stream_iter->last_inactivity_ts.value);
+ lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(),
+ late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
break;
- case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
+ case bt2::MessageType::DiscardedPackets:
adjust_status = adjust_discarded_packets_message(
- lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(), late_msg,
- &adjusted_message, stream_iter->last_inactivity_ts.value);
+ lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(),
+ late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
break;
default:
bt_common_abort();
BT_ASSERT_DBG(adjusted_message);
stream_iter->current_msg = adjusted_message;
stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
- bt_message_put_ref(late_msg);
end:
return stream_iter_status;
uint64_t stream_iter_idx;
BT_ASSERT_DBG(live_trace);
- BT_ASSERT_DBG(live_trace->stream_iterators);
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Finding the next stream iterator for trace: "
* ensure monotonicity.
*/
stream_iter_idx = 0;
- while (stream_iter_idx < live_trace->stream_iterators->len) {
+ while (stream_iter_idx < live_trace->stream_iterators.size()) {
bool stream_iter_is_ended = false;
- struct lttng_live_stream_iterator *stream_iter =
- (lttng_live_stream_iterator *) g_ptr_array_index(live_trace->stream_iterators,
- stream_iter_idx);
+ lttng_live_stream_iterator *stream_iter =
+ live_trace->stream_iterators[stream_iter_idx].get();
/*
* If there is no current message for this stream, go fetch
* one.
*/
while (!stream_iter->current_msg) {
- const bt_message *msg = NULL;
+ bt2::ConstMessage::Shared msg;
int64_t curr_msg_ts_ns = INT64_MAX;
stream_iter_status =
- lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, &msg);
+ lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, msg);
if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
stream_iter_is_ended = true;
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Live stream iterator returned message: msg-type={}, "
"stream-name=\"{}\", viewer-stream-id={}",
- static_cast<bt2::MessageType>(bt_message_get_type(msg)),
- stream_iter->name, stream_iter->viewer_stream_id);
+ msg->type(), stream_iter->name, stream_iter->viewer_stream_id);
/*
* Get the timestamp in nanoseconds from origin of this
* message.
*/
- live_get_msg_ts_ns(lttng_live_msg_iter, msg, lttng_live_msg_iter->last_msg_ts_ns,
- &curr_msg_ts_ns);
+ live_get_msg_ts_ns(lttng_live_msg_iter, msg->libObjPtr(),
+ lttng_live_msg_iter->last_msg_ts_ns, &curr_msg_ts_ns);
/*
* Check if the message of the current live stream
* iterator. If not, we need to handle it with care.
*/
if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
- stream_iter->current_msg = msg;
+ stream_iter->current_msg = std::move(msg);
stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
} else {
/*
* may be fixable but it can also be an error.
*/
stream_iter_status =
- handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, msg);
+ handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, *msg);
if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Late message could not be handled correctly: "
*/
BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
int ret = common_muxing_compare_messages(
- stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
+ stream_iter->current_msg->libObjPtr(),
+ youngest_candidate_stream_iter->current_msg->libObjPtr());
if (ret < 0) {
/*
* The `youngest_candidate_stream_iter->current_msg`
* removed element with the array's last
* element.
*/
- g_ptr_array_remove_index_fast(live_trace->stream_iterators, stream_iter_idx);
+ bt2c::vectorFastRemove(live_trace->stream_iterators, stream_iter_idx);
}
}
* The only case where we don't have a candidate for this trace
* is if we reached the end of all the iterators.
*/
- BT_ASSERT(live_trace->stream_iterators->len == 0);
+ BT_ASSERT(live_trace->stream_iterators.empty());
stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
}
goto end;
}
- BT_ASSERT_DBG(session->traces);
-
- while (trace_idx < session->traces->len) {
+ while (trace_idx < session->traces.size()) {
bool trace_is_ended = false;
struct lttng_live_stream_iterator *stream_iter;
- struct lttng_live_trace *trace =
- (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
+ lttng_live_trace *trace = session->traces[trace_idx].get();
stream_iter_status =
next_stream_iterator_for_trace(lttng_live_msg_iter, trace, &stream_iter);
* deterministic way.
*/
int ret = common_muxing_compare_messages(
- stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
+ stream_iter->current_msg->libObjPtr(),
+ youngest_candidate_stream_iter->current_msg->libObjPtr());
if (ret < 0) {
/*
* The `youngest_candidate_stream_iter->current_msg`
} else {
/*
* trace_idx is not incremented since
- * g_ptr_array_remove_index_fast replaces the
+ * vectorFastRemove replaces the
* element at trace_idx with the array's last element.
*/
- g_ptr_array_remove_index_fast(session->traces, trace_idx);
+ bt2c::vectorFastRemove(session->traces, trace_idx);
}
}
if (youngest_candidate_stream_iter) {
*
* In either cases, we return END.
*/
- BT_ASSERT(session->traces->len == 0);
+ BT_ASSERT(session->traces.empty());
stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
}
end:
if (G_UNLIKELY(lttng_live_msg_iter->was_interrupted)) {
/*
- * The iterator was interrupted in a previous call to the
- * `_next()` method. We currently do not support generating
- * messages after such event. The babeltrace2 CLI should never
- * be running the graph after being interrupted. So this check
- * is to prevent other graph users from using this live
- * iterator in an messed up internal state.
- */
+ * The iterator was interrupted in a previous call to the
+ * `_next()` method. We currently do not support generating
+ * messages after such event. The babeltrace2 CLI should never
+ * be running the graph after being interrupted. So this check
+ * is to prevent other graph users from using this live
+ * iterator in an messed up internal state.
+ */
status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
BT_CPPLOGE_APPEND_CAUSE_SPEC(
lttng_live_msg_iter->logger,
}
/*
- * Clear all the invalid message reference that might be left over in
- * the output array.
- */
+ * Clear all the invalid message reference that might be left over in
+ * the output array.
+ */
memset(msgs, 0, capacity * sizeof(*msgs));
/*
- * If no session are exposed on the relay found at the url provided by
- * the user, session count will be 0. In this case, we return status
- * end to return gracefully.
- */
+ * If no session are exposed on the relay found at the url provided by
+ * the user, session count will be 0. In this case, we return status
+ * end to return gracefully.
+ */
if (lttng_live_msg_iter->sessions->len == 0) {
if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
goto end;
} else {
/*
- * The are no more active session for this session
- * name. Retry to create a viewer session for the
- * requested session name.
- */
+ * The are no more active session for this session
+ * name. Retry to create a viewer session for the
+ * requested session name.
+ */
viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
}
/*
- * Here the muxing of message is done.
- *
- * We need to iterate over all the streams of all the traces of all the
- * viewer sessions in order to get the message with the smallest
- * timestamp. In this case, a session is a viewer session and there is
- * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
- * kernel). Each viewer session can have multiple traces, for example,
- * 64bit UST viewer sessions could have multiple per-pid traces.
- *
- * We iterate over the streams of each traces to update and see what is
- * their next message's timestamp. From those timestamps, we select the
- * message with the smallest timestamp as the best candidate message
- * for that trace and do the same thing across all the sessions.
- *
- * We then compare the timestamp of best candidate message of all the
- * sessions to pick the message with the smallest timestamp and we
- * return it.
- */
+ * Here the muxing of message is done.
+ *
+ * We need to iterate over all the streams of all the traces of all the
+ * viewer sessions in order to get the message with the smallest
+ * timestamp. In this case, a session is a viewer session and there is
+ * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
+ * kernel). Each viewer session can have multiple traces, for example,
+ * 64bit UST viewer sessions could have multiple per-pid traces.
+ *
+ * We iterate over the streams of each traces to update and see what is
+ * their next message's timestamp. From those timestamps, we select the
+ * message with the smallest timestamp as the best candidate message
+ * for that trace and do the same thing across all the sessions.
+ *
+ * We then compare the timestamp of best candidate message of all the
+ * sessions to pick the message with the smallest timestamp and we
+ * return it.
+ */
while (*count < capacity) {
struct lttng_live_stream_iterator *youngest_stream_iter = NULL,
*candidate_stream_iter = NULL;
&candidate_stream_iter);
/* If we receive an END status, it means that either:
- * - Those traces never had active streams (UST with no
- * data produced yet),
- * - All live stream iterators have ENDed.*/
+ * - Those traces never had active streams (UST with no
+ * data produced yet),
+ * - All live stream iterators have ENDed.
+ */
if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
- if (session->closed && session->traces->len == 0) {
+ if (session->closed && session->traces.empty()) {
/*
- * Remove the session from the list.
- * session_idx is not modified since
- * g_ptr_array_remove_index_fast
- * replaces the the removed element with
- * the array's last element.
- */
+ * Remove the session from the list.
+ * session_idx is not modified since
+ * g_ptr_array_remove_index_fast
+ * replaces the the removed element with
+ * the array's last element.
+ */
g_ptr_array_remove_index_fast(lttng_live_msg_iter->sessions, session_idx);
} else {
session_idx++;
youngest_stream_iter = candidate_stream_iter;
} else if (candidate_stream_iter->current_msg_ts_ns == youngest_msg_ts_ns) {
/*
- * The currently selected message to be sent
- * downstream next has the exact same timestamp
- * that of the current candidate message. We
- * must break the tie in a predictable manner.
- */
+ * The currently selected message to be sent
+ * downstream next has the exact same timestamp
+ * that of the current candidate message. We
+ * must break the tie in a predictable manner.
+ */
BT_CPPLOGD_STR_SPEC(
lttng_live_msg_iter->logger,
"Two of the next message candidates have the same timestamps, pick one deterministically.");
/*
- * Order the messages in an arbitrary but
- * deterministic way.
- */
- int ret = common_muxing_compare_messages(candidate_stream_iter->current_msg,
- youngest_stream_iter->current_msg);
+ * Order the messages in an arbitrary but
+ * deterministic way.
+ */
+ int ret = common_muxing_compare_messages(
+ candidate_stream_iter->current_msg->libObjPtr(),
+ youngest_stream_iter->current_msg->libObjPtr());
if (ret < 0) {
/*
- * The `candidate_stream_iter->current_msg`
- * should go first. Update the next
- * iterator and the current timestamp.
- */
+ * The `candidate_stream_iter->current_msg`
+ * should go first. Update the next
+ * iterator and the current timestamp.
+ */
youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
youngest_stream_iter = candidate_stream_iter;
} else if (ret == 0) {
youngest_stream_iter->current_msg_ts_ns);
/*
- * Insert the next message to the message batch. This will set
- * stream iterator current message to NULL so that next time
- * we fetch the next message of that stream iterator
- */
- BT_MESSAGE_MOVE_REF(msgs[*count], youngest_stream_iter->current_msg);
+ * Insert the next message to the message batch. This will set
+ * stream iterator current message to NULL so that next time
+ * we fetch the next message of that stream iterator
+ */
+ msgs[*count] = youngest_stream_iter->current_msg.release().libObjPtr();
(*count)++;
/* Update the last timestamp in nanoseconds sent downstream. */
case LTTNG_LIVE_ITERATOR_STATUS_OK:
case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
/*
- * If we gathered messages, return _OK even if the graph was
- * interrupted. This allows for the components downstream to at
- * least get the those messages. If the graph was indeed
- * interrupted there should not be another _next() call as the
- * application will tear down the graph. This component class
- * doesn't support restarting after an interruption.
- */
+ * If we gathered messages, return _OK even if the graph was
+ * interrupted. This allows for the components downstream to at
+ * least get the those messages. If the graph was indeed
+ * interrupted there should not be another _next() call as the
+ * application will tear down the graph. This component class
+ * doesn't support restarting after an interruption.
+ */
if (*count > 0) {
status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
} else {
"Failed to create viewer connection");
} else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
/*
- * Interruption in the _iter_init() method is not
- * supported. Return an error.
- */
+ * Interruption in the _iter_init() method is not
+ * supported. Return an error.
+ */
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Interrupted while creating viewer connection");
}
"Failed to create viewer session");
} else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
/*
- * Interruption in the _iter_init() method is not
- * supported. Return an error.
- */
+ * Interruption in the _iter_init() method is not
+ * supported. Return an error.
+ */
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Interrupted when creating viewer session");
}