#include "common/assert.h"
#include "cpp-common/bt2c/fmt.hpp"
#include "cpp-common/bt2c/glib-up.hpp"
#include "common/assert.h"
#include "cpp-common/bt2c/fmt.hpp"
#include "cpp-common/bt2c/glib-up.hpp"
static struct lttng_live_trace *
lttng_live_session_borrow_trace_by_id(struct lttng_live_session *session, uint64_t trace_id)
{
static struct lttng_live_trace *
lttng_live_session_borrow_trace_by_id(struct lttng_live_session *session, uint64_t trace_id)
{
- uint64_t trace_idx;
- struct lttng_live_trace *ret_trace = NULL;
-
- for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
- struct lttng_live_trace *trace =
- (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
+ for (lttng_live_trace::UP& trace : session->traces) {
-end:
- return ret_trace;
-}
-
-static void lttng_live_destroy_trace(struct lttng_live_trace *trace)
-{
- BT_CPPLOGD_SPEC(trace->logger, "Destroying live trace: trace-id={}", trace->id);
-
- BT_ASSERT(trace->stream_iterators);
- g_ptr_array_free(trace->stream_iterators, TRUE);
-
- delete trace;
BT_CPPLOGD_SPEC(session->logger, "Creating live trace: session-id={}, trace-id={}", session->id,
trace_id);
BT_CPPLOGD_SPEC(session->logger, "Creating live trace: session-id={}, trace-id={}", session->id,
trace_id);
"session-id={}, hostname=\"{}\", session-name=\"{}\"",
session_id, hostname, session_name);
"session-id={}, hostname=\"{}\", session-name=\"{}\"",
session_id, hostname, session_name);
- session->session_name = g_string_new(session_name);
- BT_ASSERT(session->session_name);
-
- g_ptr_array_add(lttng_live_msg_iter->sessions, session);
+ lttng_live_msg_iter->sessions.emplace_back(std::move(session));
- BT_CPPLOGD_SPEC(session->logger,
- "Destroying live session: "
- "session-id={}, session-name=\"{}\"",
- session->id, session->session_name->str);
- if (session->id != -1ULL) {
- if (lttng_live_session_detach(session)) {
- if (!lttng_live_graph_is_canceled(session->lttng_live_msg_iter)) {
+ if (this->id != -1ULL) {
+ if (lttng_live_session_detach(this)) {
+ if (!lttng_live_graph_is_canceled(this->lttng_live_msg_iter)) {
- if (!lttng_live_msg_iter) {
- goto end;
- }
-
- if (lttng_live_msg_iter->sessions) {
- g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
- }
-
- if (lttng_live_msg_iter->viewer_connection) {
- live_viewer_connection_destroy(lttng_live_msg_iter->viewer_connection);
- }
- BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
- BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
+ BT_ASSERT(this->lttng_live_comp);
+ BT_ASSERT(this->lttng_live_comp->has_msg_iter);
- BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0);
- lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false;
-
- delete lttng_live_msg_iter;
-
-end:
- return;
+ BT_ASSERT(this->active_stream_iter == 0);
+ this->lttng_live_comp->has_msg_iter = false;
- struct lttng_live_msg_iter *lttng_live_msg_iter;
-
- BT_ASSERT(self_msg_iter);
-
- lttng_live_msg_iter =
- (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_iter);
- BT_ASSERT(lttng_live_msg_iter);
- lttng_live_msg_iter_destroy(lttng_live_msg_iter);
+ lttng_live_msg_iter::UP {
+ static_cast<lttng_live_msg_iter *>(bt_self_message_iterator_get_data(self_msg_iter))};
- BT_CPPLOGD_SPEC(
- lttng_live_msg_iter->logger,
- "Need to get an update for the metadata stream before proceeding further with this stream: "
- "stream-name=\"{}\"",
- lttng_live_stream->name->str);
+ BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
+ "Need to get an update for the metadata stream before proceeding "
+ "further with this stream: stream-name=\"{}\"",
+ lttng_live_stream->name);
ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
goto end;
}
if (lttng_live_stream->trace->session->new_streams_needed) {
ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
goto end;
}
if (lttng_live_stream->trace->session->new_streams_needed) {
- BT_CPPLOGD_SPEC(
- lttng_live_msg_iter->logger,
- "Need to get an update of all streams before proceeding further with this stream: "
- "stream-name=\"{}\"",
- lttng_live_stream->name->str);
+ BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
+ "Need to get an update of all streams before proceeding further "
+ "with this stream: stream-name=\"{}\"",
+ lttng_live_stream->name);
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Setting live stream reading info: stream-name=\"{}\", "
"viewer-stream-id={}, stream-base-offset={}, stream-offset={}, stream-len={}",
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Setting live stream reading info: stream-name=\"{}\", "
"viewer-stream-id={}, stream-base-offset={}, stream-offset={}, stream-len={}",
if (!session->attached) {
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Attach to session: session-id={}",
if (!session->attached) {
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Attach to session: session-id={}",
- "Updating all data streams: "
- "session-id={}, session-name=\"{}\"",
- session->id, session->session_name->str);
+ "Updating all data streams: session-id={}, session-name=\"{}\"", session->id,
+ session->session_name);
- * We received a `_END` from the `_get_new_streams()` function,
- * which means no more data will ever be received from the data
- * streams of this session. But it's possible that the metadata
- * is incomplete.
- * The live protocol guarantees that we receive all the
- * metadata needed before we receive data streams needing it.
- * But it's possible to receive metadata NOT needed by
- * data streams after the session was closed. For example, this
- * could happen if a new event is registered and the session is
- * stopped before any tracepoint for that event is actually
- * fired.
- */
+ * We received a `_END` from the `_get_new_streams()` function,
+ * which means no more data will ever be received from the data
+ * streams of this session. But it's possible that the metadata
+ * is incomplete.
+ * The live protocol guarantees that we receive all the
+ * metadata needed before we receive data streams needing it.
+ * But it's possible to receive metadata NOT needed by
+ * data streams after the session was closed. For example, this
+ * could happen if a new event is registered and the session is
+ * stopped before any tracepoint for that event is actually
+ * fired.
+ */
BT_CPPLOGD_SPEC(
lttng_live_msg_iter->logger,
"Updating streams returned _END status. Override status to _OK in order fetch any remaining metadata:"
"session-id={}, session-name=\"{}\"",
BT_CPPLOGD_SPEC(
lttng_live_msg_iter->logger,
"Updating streams returned _END status. Override status to _OK in order fetch any remaining metadata:"
"session-id={}, session-name=\"{}\"",
- "Updating metadata stream for session: "
- "session-id={}, session-name=\"{}\"",
- session->id, session->session_name->str);
-
- trace_idx = 0;
- while (trace_idx < session->traces->len) {
- struct lttng_live_trace *trace =
- (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
+ "Updating metadata stream for session: session-id={}, session-name=\"{}\"",
+ session->id, session->session_name);
- uint64_t session_idx, trace_idx;
-
- for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
- struct lttng_live_session *session =
- (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
+ for (const auto& session : lttng_live_msg_iter->sessions) {
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Force marking session as needing new streams: "
"session-id={}",
session->id);
session->new_streams_needed = true;
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Force marking session as needing new streams: "
"session-id={}",
session->id);
session->new_streams_needed = true;
- for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
- struct lttng_live_trace *trace =
- (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
+ for (lttng_live_trace::UP& trace : session->traces) {
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Force marking trace metadata state as needing an update: "
"session-id={}, trace-id={}",
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Force marking trace metadata state as needing an update: "
"session-id={}, trace-id={}",
enum session_not_found_action sess_not_found_act =
lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
enum session_not_found_action sess_not_found_act =
lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
BT_CPPLOGD_SPEC(
lttng_live_msg_iter->logger,
if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
BT_CPPLOGD_SPEC(
lttng_live_msg_iter->logger,
- for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
- session =
- (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
- status = lttng_live_get_session(lttng_live_msg_iter, session);
+ for (const auto& session : lttng_live_msg_iter->sessions) {
+ status = lttng_live_get_session(lttng_live_msg_iter, session.get());
static enum lttng_live_iterator_status
emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
static enum lttng_live_iterator_status
emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
- struct lttng_live_stream_iterator *stream_iter, const bt_message **message,
- uint64_t timestamp)
+ struct lttng_live_stream_iterator *stream_iter,
+ bt2::ConstMessage::Shared& message, uint64_t timestamp)
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Emitting inactivity message for stream: ctf-stream-id={}, "
"viewer-stream-id={}, timestamp={}",
stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id,
timestamp);
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Emitting inactivity message for stream: ctf-stream-id={}, "
"viewer-stream-id={}, timestamp={}",
stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id,
timestamp);
- msg = bt_message_message_iterator_inactivity_create(lttng_live_msg_iter->self_msg_iter,
- stream_iter->trace->clock_class, timestamp);
+ const auto msg = bt_message_message_iterator_inactivity_create(
+ lttng_live_msg_iter->self_msg_iter, stream_iter->trace->clock_class, timestamp);
+
if (!msg) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Error emitting message iterator inactivity message");
if (!msg) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Error emitting message iterator inactivity message");
}
static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
struct lttng_live_msg_iter *lttng_live_msg_iter,
}
static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
struct lttng_live_msg_iter *lttng_live_msg_iter,
static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
struct lttng_live_msg_iter *lttng_live_msg_iter,
static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
struct lttng_live_msg_iter *lttng_live_msg_iter,
{
enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
enum ctf_msg_iter_status status;
{
enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
enum ctf_msg_iter_status status;
- uint64_t session_idx, trace_idx;
-
- for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
- struct lttng_live_session *session =
- (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
if (session->new_streams_needed) {
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Need an update for streams: "
if (session->new_streams_needed) {
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Need an update for streams: "
- for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
- struct lttng_live_trace *trace =
- (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
+ for (lttng_live_trace::UP& trace : session->traces) {
if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Need an update for metadata stream: "
if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Need an update for metadata stream: "
switch (status) {
case CTF_MSG_ITER_STATUS_EOF:
ret = LTTNG_LIVE_ITERATOR_STATUS_END;
break;
case CTF_MSG_ITER_STATUS_OK:
switch (status) {
case CTF_MSG_ITER_STATUS_EOF:
ret = LTTNG_LIVE_ITERATOR_STATUS_END;
break;
case CTF_MSG_ITER_STATUS_OK:
static enum lttng_live_iterator_status
lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_stream_iterator *stream_iter,
static enum lttng_live_iterator_status
lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_stream_iterator *stream_iter,
{
enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Closing live stream iterator: stream-name=\"{}\", "
"viewer-stream-id={}",
{
enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Closing live stream iterator: stream-name=\"{}\", "
"viewer-stream-id={}",
/*
* The viewer has hung up on us so we are closing the stream. The
* `ctf_msg_iter` should simply realize that it needs to close the
* stream properly by emitting the necessary stream end message.
*/
/*
* The viewer has hung up on us so we are closing the stream. The
* `ctf_msg_iter` should simply realize that it needs to close the
* stream properly by emitting the necessary stream end message.
*/
if (status == CTF_MSG_ITER_STATUS_ERROR) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
if (status == CTF_MSG_ITER_STATUS_ERROR) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
static enum lttng_live_iterator_status
lttng_live_iterator_next_msg_on_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_stream_iterator *stream_iter,
static enum lttng_live_iterator_status
lttng_live_iterator_next_msg_on_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_stream_iterator *stream_iter,
{
enum lttng_live_iterator_status live_status;
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Advancing live stream iterator until next message if possible: "
"stream-name=\"{}\", viewer-stream-id={}",
{
enum lttng_live_iterator_status live_status;
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Advancing live stream iterator until next message if possible: "
"stream-name=\"{}\", viewer-stream-id={}",
live_status = lttng_live_iterator_next_handle_one_quiescent_stream(lttng_live_msg_iter,
stream_iter, curr_msg);
if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
live_status = lttng_live_iterator_next_handle_one_quiescent_stream(lttng_live_msg_iter,
stream_iter, curr_msg);
if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
goto end;
}
live_status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live_msg_iter,
stream_iter, curr_msg);
if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
goto end;
}
live_status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live_msg_iter,
stream_iter, curr_msg);
if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Returning from advancing live stream iterator: status={}, "
"stream-name=\"{}\", viewer-stream-id={}",
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Returning from advancing live stream iterator: status={}, "
"stream-name=\"{}\", viewer-stream-id={}",
- const enum bt_message_type msg_type = bt_message_get_type(msg);
-
- return msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS ||
- msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS;
+ return msg.type() == bt2::MessageType::DiscardedEvents ||
+ msg.type() == bt2::MessageType::DiscardedPackets;
}
static enum lttng_live_iterator_status
adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream,
}
static enum lttng_live_iterator_status
adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream,
availability = bt_message_discarded_packets_get_count(msg_in, &count);
BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
availability = bt_message_discarded_packets_get_count(msg_in, &count);
BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
iter, stream, new_begin_ts, end_ts);
iter, stream, new_begin_ts, end_ts);
- bt_message_discarded_packets_set_count(*msg_out, count);
-end:
- return status;
+ bt_message_discarded_packets_set_count(msg, count);
+ msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
static enum lttng_live_iterator_status
adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream,
}
static enum lttng_live_iterator_status
adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream,
availability = bt_message_discarded_events_get_count(msg_in, &count);
BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
availability = bt_message_discarded_events_get_count(msg_in, &count);
BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
iter, stream, new_begin_ts, end_ts);
iter, stream, new_begin_ts, end_ts);
- bt_message_discarded_events_set_count(*msg_out, count);
-end:
- return status;
+ bt_message_discarded_events_set_count(msg, count);
+ msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
static enum lttng_live_iterator_status
handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns,
}
static enum lttng_live_iterator_status
handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns,
int64_t last_inactivity_ts_ns;
enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
enum lttng_live_iterator_status adjust_status;
int64_t last_inactivity_ts_ns;
enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
enum lttng_live_iterator_status adjust_status;
* In short, the only scenario in which it's okay and fixable to
* received a late message is when:
* 1. the late message is a discarded packets or discarded events
* In short, the only scenario in which it's okay and fixable to
* received a late message is when:
* 1. the late message is a discarded packets or discarded events
* 2. this stream produced an inactivity message downstream, and
* 3. the timestamp of the late message is within the inactivity
* 2. this stream produced an inactivity message downstream, and
* 3. the timestamp of the late message is within the inactivity
*/
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Handling late message on live stream iterator: "
"stream-name=\"{}\", viewer-stream-id={}",
*/
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Handling late message on live stream iterator: "
"stream-name=\"{}\", viewer-stream-id={}",
if (!stream_iter->last_inactivity_ts.is_set) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
if (!stream_iter->last_inactivity_ts.is_set) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Invalid live stream state: "
"have a late message that is not a packet discarded or "
"event discarded message: late-msg-type={}",
"Invalid live stream state: "
"have a late message that is not a packet discarded or "
"event discarded message: late-msg-type={}",
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Adjusting the timestamp of late message: late-msg-type={}, "
"msg-new-ts-ns={}",
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Adjusting the timestamp of late message: late-msg-type={}, "
"msg-new-ts-ns={}",
- static_cast<bt2::MessageType>(bt_message_get_type(late_msg)),
- stream_iter->last_inactivity_ts.value);
- switch (bt_message_get_type(late_msg)) {
- case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
+ late_msg.type(), stream_iter->last_inactivity_ts.value);
+ switch (late_msg.type()) {
+ case bt2::MessageType::DiscardedEvents:
- lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(), late_msg,
- &adjusted_message, stream_iter->last_inactivity_ts.value);
+ lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(),
+ late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
- lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(), late_msg,
- &adjusted_message, stream_iter->last_inactivity_ts.value);
+ lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(),
+ late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
BT_ASSERT_DBG(adjusted_message);
stream_iter->current_msg = adjusted_message;
stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
BT_ASSERT_DBG(adjusted_message);
stream_iter->current_msg = adjusted_message;
stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Finding the next stream iterator for trace: "
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Finding the next stream iterator for trace: "
- struct lttng_live_stream_iterator *stream_iter =
- (lttng_live_stream_iterator *) g_ptr_array_index(live_trace->stream_iterators,
- stream_iter_idx);
+ lttng_live_stream_iterator *stream_iter =
+ live_trace->stream_iterators[stream_iter_idx].get();
/*
* If there is no current message for this stream, go fetch
* one.
*/
while (!stream_iter->current_msg) {
/*
* If there is no current message for this stream, go fetch
* one.
*/
while (!stream_iter->current_msg) {
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Live stream iterator returned message: msg-type={}, "
"stream-name=\"{}\", viewer-stream-id={}",
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Live stream iterator returned message: msg-type={}, "
"stream-name=\"{}\", viewer-stream-id={}",
- static_cast<bt2::MessageType>(bt_message_get_type(msg)),
- stream_iter->name->str, stream_iter->viewer_stream_id);
+ msg->type(), stream_iter->name, stream_iter->viewer_stream_id);
- live_get_msg_ts_ns(lttng_live_msg_iter, msg, lttng_live_msg_iter->last_msg_ts_ns,
- &curr_msg_ts_ns);
+ live_get_msg_ts_ns(lttng_live_msg_iter, msg->libObjPtr(),
+ lttng_live_msg_iter->last_msg_ts_ns, &curr_msg_ts_ns);
* iterator. If not, we need to handle it with care.
*/
if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
* iterator. If not, we need to handle it with care.
*/
if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
- handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, msg);
+ handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, *msg);
if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Late message could not be handled correctly: "
"lttng-live-msg-iter-addr={}, "
"stream-name=\"{}\", "
"curr-msg-ts={}, last-msg-ts={}",
if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Late message could not be handled correctly: "
"lttng-live-msg-iter-addr={}, "
"stream-name=\"{}\", "
"curr-msg-ts={}, last-msg-ts={}",
- fmt::ptr(lttng_live_msg_iter),
- stream_iter->name->str, curr_msg_ts_ns,
+ fmt::ptr(lttng_live_msg_iter), stream_iter->name,
+ curr_msg_ts_ns,
*/
BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
int ret = common_muxing_compare_messages(
*/
BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
int ret = common_muxing_compare_messages(
* The only case where we don't have a candidate for this trace
* is if we reached the end of all the iterators.
*/
* The only case where we don't have a candidate for this trace
* is if we reached the end of all the iterators.
*/
(struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_it);
struct lttng_live_component *lttng_live = lttng_live_msg_iter->lttng_live_comp;
enum lttng_live_iterator_status stream_iter_status;
(struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_it);
struct lttng_live_component *lttng_live = lttng_live_msg_iter->lttng_live_comp;
enum lttng_live_iterator_status stream_iter_status;
- * The iterator was interrupted in a previous call to the
- * `_next()` method. We currently do not support generating
- * messages after such event. The babeltrace2 CLI should never
- * be running the graph after being interrupted. So this check
- * is to prevent other graph users from using this live
- * iterator in an messed up internal state.
- */
+ * The iterator was interrupted in a previous call to the
+ * `_next()` method. We currently do not support generating
+ * messages after such event. The babeltrace2 CLI should never
+ * be running the graph after being interrupted. So this check
+ * is to prevent other graph users from using this live
+ * iterator in an messed up internal state.
+ */
status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
BT_CPPLOGE_APPEND_CAUSE_SPEC(
lttng_live_msg_iter->logger,
status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
BT_CPPLOGE_APPEND_CAUSE_SPEC(
lttng_live_msg_iter->logger,
- * If no session are exposed on the relay found at the url provided by
- * the user, session count will be 0. In this case, we return status
- * end to return gracefully.
- */
- if (lttng_live_msg_iter->sessions->len == 0) {
+ * If no session are exposed on the relay found at the url provided by
+ * the user, session count will be 0. In this case, we return status
+ * end to return gracefully.
+ */
+ if (lttng_live_msg_iter->sessions.empty()) {
if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
goto end;
} else {
/*
if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
goto end;
} else {
/*
- * The are no more active session for this session
- * name. Retry to create a viewer session for the
- * requested session name.
- */
+ * The are no more active session for this session
+ * name. Retry to create a viewer session for the
+ * requested session name.
+ */
viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
- * Here the muxing of message is done.
- *
- * We need to iterate over all the streams of all the traces of all the
- * viewer sessions in order to get the message with the smallest
- * timestamp. In this case, a session is a viewer session and there is
- * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
- * kernel). Each viewer session can have multiple traces, for example,
- * 64bit UST viewer sessions could have multiple per-pid traces.
- *
- * We iterate over the streams of each traces to update and see what is
- * their next message's timestamp. From those timestamps, we select the
- * message with the smallest timestamp as the best candidate message
- * for that trace and do the same thing across all the sessions.
- *
- * We then compare the timestamp of best candidate message of all the
- * sessions to pick the message with the smallest timestamp and we
- * return it.
- */
+ * Here the muxing of message is done.
+ *
+ * We need to iterate over all the streams of all the traces of all the
+ * viewer sessions in order to get the message with the smallest
+ * timestamp. In this case, a session is a viewer session and there is
+ * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
+ * kernel). Each viewer session can have multiple traces, for example,
+ * 64bit UST viewer sessions could have multiple per-pid traces.
+ *
+ * We iterate over the streams of each traces to update and see what is
+ * their next message's timestamp. From those timestamps, we select the
+ * message with the smallest timestamp as the best candidate message
+ * for that trace and do the same thing across all the sessions.
+ *
+ * We then compare the timestamp of best candidate message of all the
+ * sessions to pick the message with the smallest timestamp and we
+ * return it.
+ */
while (*count < capacity) {
struct lttng_live_stream_iterator *youngest_stream_iter = NULL,
*candidate_stream_iter = NULL;
int64_t youngest_msg_ts_ns = INT64_MAX;
while (*count < capacity) {
struct lttng_live_stream_iterator *youngest_stream_iter = NULL,
*candidate_stream_iter = NULL;
int64_t youngest_msg_ts_ns = INT64_MAX;
- BT_ASSERT_DBG(lttng_live_msg_iter->sessions);
- session_idx = 0;
- while (session_idx < lttng_live_msg_iter->sessions->len) {
- struct lttng_live_session *session = (lttng_live_session *) g_ptr_array_index(
- lttng_live_msg_iter->sessions, session_idx);
+ uint64_t session_idx = 0;
+ while (session_idx < lttng_live_msg_iter->sessions.size()) {
+ lttng_live_session *session = lttng_live_msg_iter->sessions[session_idx].get();
/* Find the best candidate message to send downstream. */
stream_iter_status = next_stream_iterator_for_session(lttng_live_msg_iter, session,
&candidate_stream_iter);
/* If we receive an END status, it means that either:
/* Find the best candidate message to send downstream. */
stream_iter_status = next_stream_iterator_for_session(lttng_live_msg_iter, session,
&candidate_stream_iter);
/* If we receive an END status, it means that either:
- * - Those traces never had active streams (UST with no
- * data produced yet),
- * - All live stream iterators have ENDed.*/
+ * - Those traces never had active streams (UST with no
+ * data produced yet),
+ * - All live stream iterators have ENDed.
+ */
- * Remove the session from the list.
- * session_idx is not modified since
- * g_ptr_array_remove_index_fast
- * replaces the the removed element with
- * the array's last element.
- */
- g_ptr_array_remove_index_fast(lttng_live_msg_iter->sessions, session_idx);
+ * Remove the session from the list.
+ * session_idx is not modified since
+ * g_ptr_array_remove_index_fast
+ * replaces the the removed element with
+ * the array's last element.
+ */
+ bt2c::vectorFastRemove(lttng_live_msg_iter->sessions, session_idx);
youngest_stream_iter = candidate_stream_iter;
} else if (candidate_stream_iter->current_msg_ts_ns == youngest_msg_ts_ns) {
/*
youngest_stream_iter = candidate_stream_iter;
} else if (candidate_stream_iter->current_msg_ts_ns == youngest_msg_ts_ns) {
/*
- * The currently selected message to be sent
- * downstream next has the exact same timestamp
- * that of the current candidate message. We
- * must break the tie in a predictable manner.
- */
+ * The currently selected message to be sent
+ * downstream next has the exact same timestamp
+ * that of the current candidate message. We
+ * must break the tie in a predictable manner.
+ */
BT_CPPLOGD_STR_SPEC(
lttng_live_msg_iter->logger,
"Two of the next message candidates have the same timestamps, pick one deterministically.");
/*
BT_CPPLOGD_STR_SPEC(
lttng_live_msg_iter->logger,
"Two of the next message candidates have the same timestamps, pick one deterministically.");
/*
- * Order the messages in an arbitrary but
- * deterministic way.
- */
- int ret = common_muxing_compare_messages(candidate_stream_iter->current_msg,
- youngest_stream_iter->current_msg);
+ * Order the messages in an arbitrary but
+ * deterministic way.
+ */
+ int ret = common_muxing_compare_messages(
+ candidate_stream_iter->current_msg->libObjPtr(),
+ youngest_stream_iter->current_msg->libObjPtr());
- * The `candidate_stream_iter->current_msg`
- * should go first. Update the next
- * iterator and the current timestamp.
- */
+ * The `candidate_stream_iter->current_msg`
+ * should go first. Update the next
+ * iterator and the current timestamp.
+ */
youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
youngest_stream_iter = candidate_stream_iter;
} else if (ret == 0) {
youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
youngest_stream_iter = candidate_stream_iter;
} else if (ret == 0) {
- * Insert the next message to the message batch. This will set
- * stream iterator current message to NULL so that next time
- * we fetch the next message of that stream iterator
- */
- BT_MESSAGE_MOVE_REF(msgs[*count], youngest_stream_iter->current_msg);
+ * Insert the next message to the message batch. This will set
+ * stream iterator current message to NULL so that next time
+ * we fetch the next message of that stream iterator
+ */
+ msgs[*count] = youngest_stream_iter->current_msg.release().libObjPtr();
- * If we gathered messages, return _OK even if the graph was
- * interrupted. This allows for the components downstream to at
- * least get the those messages. If the graph was indeed
- * interrupted there should not be another _next() call as the
- * application will tear down the graph. This component class
- * doesn't support restarting after an interruption.
- */
+ * If we gathered messages, return _OK even if the graph was
+ * interrupted. This allows for the components downstream to at
+ * least get the those messages. If the graph was indeed
+ * interrupted there should not be another _next() call as the
+ * application will tear down the graph. This component class
+ * doesn't support restarting after an interruption.
+ */
lttng_live_msg_iter_create(struct lttng_live_component *lttng_live_comp,
bt_self_message_iterator *self_msg_it)
{
lttng_live_msg_iter_create(struct lttng_live_component *lttng_live_comp,
bt_self_message_iterator *self_msg_it)
{
msg_iter->self_comp = lttng_live_comp->self_comp;
msg_iter->lttng_live_comp = lttng_live_comp;
msg_iter->self_msg_iter = self_msg_it;
msg_iter->self_comp = lttng_live_comp->self_comp;
msg_iter->lttng_live_comp = lttng_live_comp;
msg_iter->self_msg_iter = self_msg_it;
msg_iter->active_stream_iter = 0;
msg_iter->last_msg_ts_ns = INT64_MIN;
msg_iter->was_interrupted = false;
msg_iter->active_stream_iter = 0;
msg_iter->last_msg_ts_ns = INT64_MIN;
msg_iter->was_interrupted = false;
enum lttng_live_viewer_status viewer_status;
bt_self_component *self_comp = bt_self_message_iterator_borrow_component(self_msg_it);
enum lttng_live_viewer_status viewer_status;
bt_self_component *self_comp = bt_self_message_iterator_borrow_component(self_msg_it);
if (!lttng_live_msg_iter) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live->logger,
"Failed to create lttng_live_msg_iter");
if (!lttng_live_msg_iter) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live->logger,
"Failed to create lttng_live_msg_iter");
- lttng_live->params.url.c_str(), false, lttng_live_msg_iter, lttng_live_msg_iter->logger,
- <tng_live_msg_iter->viewer_connection);
+ lttng_live->params.url.c_str(), false, lttng_live_msg_iter.get(),
+ lttng_live_msg_iter->logger, lttng_live_msg_iter->viewer_connection);
if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Failed to create viewer connection");
} else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
/*
if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Failed to create viewer connection");
} else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
/*
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Interrupted while creating viewer connection");
}
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Interrupted while creating viewer connection");
}
if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Failed to create viewer session");
} else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
/*
if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Failed to create viewer session");
} else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
/*
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Interrupted when creating viewer session");
}
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Interrupted when creating viewer session");
}
"component parameter: url =\"{}\"",
SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_FAIL_STR,
lttng_live->params.url);
"component parameter: url =\"{}\"",
SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_FAIL_STR,
lttng_live->params.url);
case SESSION_NOT_FOUND_ACTION_END:
BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger,
"Unable to connect to the requested live viewer session. "
case SESSION_NOT_FOUND_ACTION_END:
BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger,
"Unable to connect to the requested live viewer session. "
- bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
- status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
- goto end;
-
-error:
- lttng_live_msg_iter_destroy(lttng_live_msg_iter);
-end:
- return status;
+ bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter.release());
+ return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
} catch (const std::bad_alloc&) {
return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
} catch (const bt2::Error&) {
} catch (const std::bad_alloc&) {
return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
} catch (const bt2::Error&) {
enum lttng_live_viewer_status viewer_status;
enum bt_param_validation_status validation_status;
gchar *validate_error = NULL;
enum lttng_live_viewer_status viewer_status;
enum bt_param_validation_status validation_status;
gchar *validate_error = NULL;
if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
goto error;
if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
goto error;
- viewer_status = live_viewer_connection_create(url, true, NULL, logger, &viewer_connection);
+ viewer_status = live_viewer_connection_create(url, true, NULL, logger, viewer_connection);
if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Failed to create viewer connection");
if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Failed to create viewer connection");
if (status != BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Failed to list viewer sessions");
goto error;
if (status != BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Failed to list viewer sessions");
goto error;
const bt2c::Logger& logger)
{
bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
const bt2c::Logger& logger)
{
bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
bt2c::Logger logger {bt2::SelfComponentClass {comp_class},
bt2::PrivateQueryExecutor {priv_query_exec},
"PLUGIN/SRC.CTF.LTTNG-LIVE/QUERY"};
bt2c::Logger logger {bt2::SelfComponentClass {comp_class},
bt2::PrivateQueryExecutor {priv_query_exec},
"PLUGIN/SRC.CTF.LTTNG-LIVE/QUERY"};
} else {
BT_CPPLOGI_SPEC(logger, "Unknown query object `{}`", object);
status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT;
} else {
BT_CPPLOGI_SPEC(logger, "Unknown query object `{}`", object);
status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT;