X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fplugins%2Fctf%2Flttng-live%2Flttng-live.cpp;h=6368181000f641718cb9c192cda053ba131b2341;hb=HEAD;hp=9bfc2f6f9c66ff2b4040b8867d76e82d0652af9a;hpb=a52f1f2ebb7b16014d1d5df8ad36285521be09ec;p=babeltrace.git diff --git a/src/plugins/ctf/lttng-live/lttng-live.cpp b/src/plugins/ctf/lttng-live/lttng-live.cpp index 9bfc2f6f..63681810 100644 --- a/src/plugins/ctf/lttng-live/lttng-live.cpp +++ b/src/plugins/ctf/lttng-live/lttng-live.cpp @@ -13,6 +13,9 @@ #include "common/assert.h" #include "cpp-common/bt2c/fmt.hpp" +#include "cpp-common/bt2c/glib-up.hpp" +#include "cpp-common/bt2c/vector.hpp" +#include "cpp-common/bt2s/make-unique.hpp" #include "cpp-common/vendor/fmt/format.h" #include "plugins/common/muxing/muxing.h" @@ -54,49 +57,23 @@ void lttng_live_stream_iterator_set_state(struct lttng_live_stream_iterator *str bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter) { - bool ret; - if (!msg_iter) { - ret = false; - goto end; + return false; } - ret = bt_self_message_iterator_is_interrupted(msg_iter->self_msg_iter); - -end: - return ret; + return bt_self_message_iterator_is_interrupted(msg_iter->self_msg_iter); } static struct lttng_live_trace * lttng_live_session_borrow_trace_by_id(struct lttng_live_session *session, uint64_t trace_id) { - uint64_t trace_idx; - struct lttng_live_trace *ret_trace = NULL; - - for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) { - struct lttng_live_trace *trace = - (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx); + for (lttng_live_trace::UP& trace : session->traces) { if (trace->id == trace_id) { - ret_trace = trace; - goto end; + return trace.get(); } } -end: - return ret_trace; -} - -static void lttng_live_destroy_trace(struct lttng_live_trace *trace) -{ - BT_CPPLOGD_SPEC(trace->logger, "Destroying live trace: trace-id={}", trace->id); - - BT_ASSERT(trace->stream_iterators); - g_ptr_array_free(trace->stream_iterators, TRUE); - - BT_TRACE_PUT_REF_AND_RESET(trace->trace); - - lttng_live_metadata_fini(trace); - delete trace; + return nullptr; } static struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session, @@ -105,35 +82,27 @@ static struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_sessio BT_CPPLOGD_SPEC(session->logger, "Creating live trace: session-id={}, trace-id={}", session->id, trace_id); - lttng_live_trace *trace = new lttng_live_trace {session->logger}; + auto trace = bt2s::make_unique(session->logger); + trace->session = session; trace->id = trace_id; - trace->trace = NULL; - trace->stream_iterators = - g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_stream_iterator_destroy); - BT_ASSERT(trace->stream_iterators); trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED; - g_ptr_array_add(session->traces, trace); - return trace; + const auto ret = trace.get(); + session->traces.emplace_back(std::move(trace)); + return ret; } struct lttng_live_trace * lttng_live_session_borrow_or_create_trace_by_id(struct lttng_live_session *session, uint64_t trace_id) { - struct lttng_live_trace *trace; - - trace = lttng_live_session_borrow_trace_by_id(session, trace_id); - if (trace) { - goto end; + if (lttng_live_trace *trace = lttng_live_session_borrow_trace_by_id(session, trace_id)) { + return trace; } /* The session is the owner of the newly created trace. */ - trace = lttng_live_create_trace(session, trace_id); - -end: - return trace; + return lttng_live_create_trace(session, trace_id); } int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint64_t session_id, @@ -144,99 +113,51 @@ int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint "session-id={}, hostname=\"{}\", session-name=\"{}\"", session_id, hostname, session_name); - lttng_live_session *session = new lttng_live_session {lttng_live_msg_iter->logger}; + auto session = bt2s::make_unique(lttng_live_msg_iter->logger); + session->self_comp = lttng_live_msg_iter->self_comp; session->id = session_id; - session->traces = g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_trace); - BT_ASSERT(session->traces); session->lttng_live_msg_iter = lttng_live_msg_iter; session->new_streams_needed = true; - session->hostname = g_string_new(hostname); - BT_ASSERT(session->hostname); - - session->session_name = g_string_new(session_name); - BT_ASSERT(session->session_name); + session->hostname = hostname; + session->session_name = session_name; - g_ptr_array_add(lttng_live_msg_iter->sessions, session); + lttng_live_msg_iter->sessions.emplace_back(std::move(session)); return 0; } -static void lttng_live_destroy_session(struct lttng_live_session *session) +lttng_live_session::~lttng_live_session() { - if (!session) { - goto end; - } + BT_CPPLOGD_SPEC(this->logger, "Destroying live session: session-id={}, session-name=\"{}\"", + this->id, this->session_name); - BT_CPPLOGD_SPEC(session->logger, - "Destroying live session: " - "session-id={}, session-name=\"{}\"", - session->id, session->session_name->str); - if (session->id != -1ULL) { - if (lttng_live_session_detach(session)) { - if (!lttng_live_graph_is_canceled(session->lttng_live_msg_iter)) { + if (this->id != -1ULL) { + if (lttng_live_session_detach(this)) { + if (!lttng_live_graph_is_canceled(this->lttng_live_msg_iter)) { /* Old relayd cannot detach sessions. */ - BT_CPPLOGD_SPEC(session->logger, "Unable to detach lttng live session {}", - session->id); + BT_CPPLOGD_SPEC(this->logger, "Unable to detach lttng live session {}", this->id); } } - session->id = -1ULL; - } - - if (session->traces) { - g_ptr_array_free(session->traces, TRUE); - } - if (session->hostname) { - g_string_free(session->hostname, TRUE); + this->id = -1ULL; } - - if (session->session_name) { - g_string_free(session->session_name, TRUE); - } - - delete session; - -end: - return; } -static void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter) +lttng_live_msg_iter::~lttng_live_msg_iter() { - if (!lttng_live_msg_iter) { - goto end; - } - - if (lttng_live_msg_iter->sessions) { - g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE); - } - - if (lttng_live_msg_iter->viewer_connection) { - live_viewer_connection_destroy(lttng_live_msg_iter->viewer_connection); - } - BT_ASSERT(lttng_live_msg_iter->lttng_live_comp); - BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter); + BT_ASSERT(this->lttng_live_comp); + BT_ASSERT(this->lttng_live_comp->has_msg_iter); /* All stream iterators must be destroyed at this point. */ - BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0); - lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false; - - delete lttng_live_msg_iter; - -end: - return; + BT_ASSERT(this->active_stream_iter == 0); + this->lttng_live_comp->has_msg_iter = false; } void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter) { - struct lttng_live_msg_iter *lttng_live_msg_iter; - - BT_ASSERT(self_msg_iter); - - lttng_live_msg_iter = - (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_iter); - BT_ASSERT(lttng_live_msg_iter); - lttng_live_msg_iter_destroy(lttng_live_msg_iter); + lttng_live_msg_iter::UP { + static_cast(bt_self_message_iterator_get_data(self_msg_iter))}; } static enum lttng_live_iterator_status @@ -278,21 +199,19 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_da if (lttng_live_stream->trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) { - BT_CPPLOGD_SPEC( - lttng_live_msg_iter->logger, - "Need to get an update for the metadata stream before proceeding further with this stream: " - "stream-name=\"{}\"", - lttng_live_stream->name->str); + BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, + "Need to get an update for the metadata stream before proceeding " + "further with this stream: stream-name=\"{}\"", + lttng_live_stream->name); ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; goto end; } if (lttng_live_stream->trace->session->new_streams_needed) { - BT_CPPLOGD_SPEC( - lttng_live_msg_iter->logger, - "Need to get an update of all streams before proceeding further with this stream: " - "stream-name=\"{}\"", - lttng_live_stream->name->str); + BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, + "Need to get an update of all streams before proceeding further " + "with this stream: stream-name=\"{}\"", + lttng_live_stream->name); ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; goto end; } @@ -335,7 +254,7 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_da BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Setting live stream reading info: stream-name=\"{}\", " "viewer-stream-id={}, stream-base-offset={}, stream-offset={}, stream-len={}", - lttng_live_stream->name->str, lttng_live_stream->viewer_stream_id, + lttng_live_stream->name, lttng_live_stream->viewer_stream_id, lttng_live_stream->base_offset, lttng_live_stream->offset, lttng_live_stream->len); @@ -358,7 +277,6 @@ lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_session *session) { enum lttng_live_iterator_status status; - uint64_t trace_idx; if (!session->attached) { BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Attach to session: session-id={}", @@ -374,20 +292,18 @@ lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter, * cancelled. */ bt_current_thread_clear_error(); - status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + return LTTNG_LIVE_ITERATOR_STATUS_AGAIN; } else { - status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, "Error attaching to LTTng live session"); + return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } - goto end; } } BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, - "Updating all data streams: " - "session-id={}, session-name=\"{}\"", - session->id, session->session_name->str); + "Updating all data streams: session-id={}, session-name=\"{}\"", session->id, + session->session_name); status = lttng_live_session_get_new_streams(session, lttng_live_msg_iter->self_msg_iter); switch (status) { @@ -395,54 +311,47 @@ lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter, break; case LTTNG_LIVE_ITERATOR_STATUS_END: /* - * We received a `_END` from the `_get_new_streams()` function, - * which means no more data will ever be received from the data - * streams of this session. But it's possible that the metadata - * is incomplete. - * The live protocol guarantees that we receive all the - * metadata needed before we receive data streams needing it. - * But it's possible to receive metadata NOT needed by - * data streams after the session was closed. For example, this - * could happen if a new event is registered and the session is - * stopped before any tracepoint for that event is actually - * fired. - */ + * We received a `_END` from the `_get_new_streams()` function, + * which means no more data will ever be received from the data + * streams of this session. But it's possible that the metadata + * is incomplete. + * The live protocol guarantees that we receive all the + * metadata needed before we receive data streams needing it. + * But it's possible to receive metadata NOT needed by + * data streams after the session was closed. For example, this + * could happen if a new event is registered and the session is + * stopped before any tracepoint for that event is actually + * fired. + */ BT_CPPLOGD_SPEC( lttng_live_msg_iter->logger, "Updating streams returned _END status. Override status to _OK in order fetch any remaining metadata:" "session-id={}, session-name=\"{}\"", - session->id, session->session_name->str); - status = LTTNG_LIVE_ITERATOR_STATUS_OK; - break; + session->id, session->session_name); + return LTTNG_LIVE_ITERATOR_STATUS_OK; default: - goto end; + return status; } BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, - "Updating metadata stream for session: " - "session-id={}, session-name=\"{}\"", - session->id, session->session_name->str); - - trace_idx = 0; - while (trace_idx < session->traces->len) { - struct lttng_live_trace *trace = - (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx); + "Updating metadata stream for session: session-id={}, session-name=\"{}\"", + session->id, session->session_name); - status = lttng_live_metadata_update(trace); + for (lttng_live_trace::UP& trace : session->traces) { + status = lttng_live_metadata_update(trace.get()); switch (status) { case LTTNG_LIVE_ITERATOR_STATUS_END: case LTTNG_LIVE_ITERATOR_STATUS_OK: - trace_idx++; break; case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE: case LTTNG_LIVE_ITERATOR_STATUS_AGAIN: - goto end; + return status; default: - BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, - "Error updating trace metadata: " - "stream-iter-status={}, trace-id={}", - status, trace->id); - goto end; + BT_CPPLOGE_APPEND_CAUSE_SPEC( + lttng_live_msg_iter->logger, + "Error updating trace metadata: stream-iter-status={}, trace-id={}", status, + trace->id); + return status; } } @@ -450,28 +359,19 @@ lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter, * Now that we have the metadata we can initialize the downstream * iterator. */ - status = lttng_live_lazy_msg_init(session, lttng_live_msg_iter->self_msg_iter); - -end: - return status; + return lttng_live_lazy_msg_init(session, lttng_live_msg_iter->self_msg_iter); } static void lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter) { - uint64_t session_idx, trace_idx; - - for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) { - struct lttng_live_session *session = - (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx); + for (const auto& session : lttng_live_msg_iter->sessions) { BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Force marking session as needing new streams: " "session-id={}", session->id); session->new_streams_needed = true; - for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) { - struct lttng_live_trace *trace = - (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx); + for (lttng_live_trace::UP& trace : session->traces) { BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Force marking trace metadata state as needing an update: " "session-id={}, trace-id={}", @@ -487,10 +387,8 @@ lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live static enum lttng_live_iterator_status lttng_live_iterator_handle_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter) { - enum lttng_live_iterator_status status; enum lttng_live_viewer_status viewer_status; - uint64_t session_idx = 0, nr_sessions_opened = 0; - struct lttng_live_session *session; + uint64_t nr_sessions_opened = 0; enum session_not_found_action sess_not_found_act = lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act; @@ -504,13 +402,12 @@ lttng_live_iterator_handle_new_streams_and_metadata(struct lttng_live_msg_iter * * need to query for new sessions even though we have sessions * currently ongoing. */ - if (lttng_live_msg_iter->sessions->len == 0) { + if (lttng_live_msg_iter->sessions.empty()) { if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) { BT_CPPLOGD_SPEC( lttng_live_msg_iter->logger, "No session found. Exiting in accordance with the `session-not-found-action` parameter"); - status = LTTNG_LIVE_ITERATOR_STATUS_END; - goto end; + return LTTNG_LIVE_ITERATOR_STATUS_END; } else { BT_CPPLOGD_SPEC( lttng_live_msg_iter->logger, @@ -522,23 +419,20 @@ lttng_live_iterator_handle_new_streams_and_metadata(struct lttng_live_msg_iter * viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter); if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { - status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, "Error creating LTTng live viewer session"); + return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { - status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + return LTTNG_LIVE_ITERATOR_STATUS_AGAIN; } else { bt_common_abort(); } - goto end; } } } - for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) { - session = - (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx); - status = lttng_live_get_session(lttng_live_msg_iter, session); + for (const auto& session : lttng_live_msg_iter->sessions) { + const auto status = lttng_live_get_session(lttng_live_msg_iter, session.get()); switch (status) { case LTTNG_LIVE_ITERATOR_STATUS_OK: case LTTNG_LIVE_ITERATOR_STATUS_END: @@ -549,7 +443,7 @@ lttng_live_iterator_handle_new_streams_and_metadata(struct lttng_live_msg_iter * */ break; default: - goto end; + return status; } if (!session->closed) { nr_sessions_opened++; @@ -557,55 +451,41 @@ lttng_live_iterator_handle_new_streams_and_metadata(struct lttng_live_msg_iter * } if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE && nr_sessions_opened == 0) { - status = LTTNG_LIVE_ITERATOR_STATUS_END; + return LTTNG_LIVE_ITERATOR_STATUS_END; } else { - status = LTTNG_LIVE_ITERATOR_STATUS_OK; + return LTTNG_LIVE_ITERATOR_STATUS_OK; } - -end: - return status; } static enum lttng_live_iterator_status emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter, - struct lttng_live_stream_iterator *stream_iter, const bt_message **message, - uint64_t timestamp) + struct lttng_live_stream_iterator *stream_iter, + bt2::ConstMessage::Shared& message, uint64_t timestamp) { - enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK; - bt_message *msg = NULL; - BT_ASSERT(stream_iter->trace->clock_class); - BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Emitting inactivity message for stream: ctf-stream-id={}, " "viewer-stream-id={}, timestamp={}", stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id, timestamp); - msg = bt_message_message_iterator_inactivity_create(lttng_live_msg_iter->self_msg_iter, - stream_iter->trace->clock_class, timestamp); + const auto msg = bt_message_message_iterator_inactivity_create( + lttng_live_msg_iter->self_msg_iter, stream_iter->trace->clock_class, timestamp); + if (!msg) { BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, "Error emitting message iterator inactivity message"); - goto error; + return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } - *message = msg; -end: - return ret; - -error: - ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - bt_message_put_ref(msg); - goto end; + message = bt2::ConstMessage::Shared::createWithoutRef(msg); + return LTTNG_LIVE_ITERATOR_STATUS_OK; } static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream( struct lttng_live_msg_iter *lttng_live_msg_iter, - struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message) + struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message) { - enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK; - if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) { return LTTNG_LIVE_ITERATOR_STATUS_OK; } @@ -619,17 +499,20 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quies lttng_live_stream_iterator_set_state(lttng_live_stream, LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA); - ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; - goto end; + return LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; } - ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream, message, - lttng_live_stream->current_inactivity_ts); + const auto status = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream, message, + lttng_live_stream->current_inactivity_ts); + + if (status != LTTNG_LIVE_ITERATOR_STATUS_OK) { + return status; + } lttng_live_stream->last_inactivity_ts.value = lttng_live_stream->current_inactivity_ts; lttng_live_stream->last_inactivity_ts.is_set = true; -end: - return ret; + + return LTTNG_LIVE_ITERATOR_STATUS_OK; } static int live_get_msg_ts_ns(struct lttng_live_msg_iter *lttng_live_msg_iter, @@ -669,10 +552,12 @@ static int live_get_msg_ts_ns(struct lttng_live_msg_iter *lttng_live_msg_iter, break; default: /* All the other messages have a higher priority */ - BT_CPPLOGD_STR_SPEC(lttng_live_msg_iter->logger, - "Message has no timestamp: using the last message timestamp."); + BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, + "Message has no timestamp, using the last message timestamp: " + "iter-data-addr={}, msg-addr={}, last-msg-ts={}, ts={}", + fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns, *ts_ns); *ts_ns = last_msg_ts_ns; - goto end; + return 0; } ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns); @@ -681,134 +566,105 @@ static int live_get_msg_ts_ns(struct lttng_live_msg_iter *lttng_live_msg_iter, "Cannot get nanoseconds from Epoch of clock snapshot: " "clock-snapshot-addr={}", fmt::ptr(clock_snapshot)); - goto error; + return -1; } - goto end; - -error: - ret = -1; + BT_CPPLOGD_SPEC( + lttng_live_msg_iter->logger, + "Found message's timestamp: iter-data-addr={}, msg-addr={}, last-msg-ts={}, ts={}", + fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns, *ts_ns); -end: - if (ret == 0) { - BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, - "Found message's timestamp: iter-data-addr={}, msg-addr={}, " - "last-msg-ts={}, ts={}", - fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns, *ts_ns); - } - - return ret; + return 0; } static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream( struct lttng_live_msg_iter *lttng_live_msg_iter, - struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message) + struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message) { - enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK; enum ctf_msg_iter_status status; - uint64_t session_idx, trace_idx; - - for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) { - struct lttng_live_session *session = - (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx); + for (const auto& session : lttng_live_msg_iter->sessions) { if (session->new_streams_needed) { BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, - "Need an update for streams: " - "session-id={}", - session->id); - ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; - goto end; + "Need an update for streams: session-id={}", session->id); + return LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; } - for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) { - struct lttng_live_trace *trace = - (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx); + for (lttng_live_trace::UP& trace : session->traces) { if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) { BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, - "Need an update for metadata stream: " - "session-id={}, trace-id={}", + "Need an update for metadata stream: session-id={}, trace-id={}", session->id, trace->id); - ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; - goto end; + return LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; } } } if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) { - ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR; BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, - "Invalid state of live stream iterator" - "stream-iter-status={}", + "Invalid state of live stream iterator: stream-iter-status={}", lttng_live_stream->state); - goto end; + return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } - status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter, message); + const bt_message *msg; + status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), &msg); switch (status) { case CTF_MSG_ITER_STATUS_EOF: - ret = LTTNG_LIVE_ITERATOR_STATUS_END; - break; + return LTTNG_LIVE_ITERATOR_STATUS_END; case CTF_MSG_ITER_STATUS_OK: - ret = LTTNG_LIVE_ITERATOR_STATUS_OK; - break; + message = bt2::ConstMessage::Shared::createWithoutRef(msg); + return LTTNG_LIVE_ITERATOR_STATUS_OK; case CTF_MSG_ITER_STATUS_AGAIN: /* * Continue immediately (end of packet). The next * get_index may return AGAIN to delay the following * attempt. */ - ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; - break; + return LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; case CTF_MSG_ITER_STATUS_ERROR: default: - ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, - "CTF message iterator failed to get next message: " - "msg-iter={}, msg-iter-status={}", - fmt::ptr(lttng_live_stream->msg_iter), status); - break; + BT_CPPLOGE_APPEND_CAUSE_SPEC( + lttng_live_msg_iter->logger, + "CTF message iterator failed to get next message: msg-iter={}, msg-iter-status={}", + fmt::ptr(lttng_live_stream->msg_iter), status); + return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } - -end: - return ret; } static enum lttng_live_iterator_status lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator *stream_iter, - const bt_message **curr_msg) + bt2::ConstMessage::Shared& curr_msg) { - enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK; - BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Closing live stream iterator: stream-name=\"{}\", " "viewer-stream-id={}", - stream_iter->name->str, stream_iter->viewer_stream_id); + stream_iter->name, stream_iter->viewer_stream_id); /* * The viewer has hung up on us so we are closing the stream. The * `ctf_msg_iter` should simply realize that it needs to close the * stream properly by emitting the necessary stream end message. */ + const bt_message *msg; enum ctf_msg_iter_status status = - ctf_msg_iter_get_next_message(stream_iter->msg_iter, curr_msg); + ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), &msg); if (status == CTF_MSG_ITER_STATUS_ERROR) { BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, "Error getting the next message from CTF message iterator"); - live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - goto end; + return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } else if (status == CTF_MSG_ITER_STATUS_EOF) { BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger, "Reached the end of the live stream iterator."); - live_status = LTTNG_LIVE_ITERATOR_STATUS_END; - goto end; + return LTTNG_LIVE_ITERATOR_STATUS_END; } BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK); -end: - return live_status; + curr_msg = bt2::ConstMessage::Shared::createWithoutRef(msg); + + return LTTNG_LIVE_ITERATOR_STATUS_OK; } /* @@ -862,14 +718,14 @@ end: static enum lttng_live_iterator_status lttng_live_iterator_next_msg_on_stream(struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator *stream_iter, - const bt_message **curr_msg) + bt2::ConstMessage::Shared& curr_msg) { enum lttng_live_iterator_status live_status; BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Advancing live stream iterator until next message if possible: " "stream-name=\"{}\", viewer-stream-id={}", - stream_iter->name->str, stream_iter->viewer_stream_id); + stream_iter->name, stream_iter->viewer_stream_id); if (stream_iter->has_stream_hung_up) { /* @@ -911,16 +767,16 @@ retry: live_status = lttng_live_iterator_next_handle_one_quiescent_stream(lttng_live_msg_iter, stream_iter, curr_msg); if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { - BT_ASSERT(!*curr_msg); + BT_ASSERT(!curr_msg); goto end; } - if (*curr_msg) { + if (curr_msg) { goto end; } live_status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live_msg_iter, stream_iter, curr_msg); if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { - BT_ASSERT(!*curr_msg); + BT_ASSERT(!curr_msg); } end: @@ -934,25 +790,22 @@ end: BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Returning from advancing live stream iterator: status={}, " "stream-name=\"{}\", viewer-stream-id={}", - live_status, stream_iter->name->str, stream_iter->viewer_stream_id); + live_status, stream_iter->name, stream_iter->viewer_stream_id); return live_status; } -static bool is_discarded_packet_or_event_message(const bt_message *msg) +static bool is_discarded_packet_or_event_message(const bt2::ConstMessage msg) { - const enum bt_message_type msg_type = bt_message_get_type(msg); - - return msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS || - msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS; + return msg.type() == bt2::MessageType::DiscardedEvents || + msg.type() == bt2::MessageType::DiscardedPackets; } static enum lttng_live_iterator_status adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream, - const bt_message *msg_in, bt_message **msg_out, + const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out, uint64_t new_begin_ts) { - enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK; enum bt_property_availability availability; const bt_clock_snapshot *clock_snapshot; uint64_t end_ts; @@ -964,24 +817,23 @@ adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream availability = bt_message_discarded_packets_get_count(msg_in, &count); BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE); - *msg_out = bt_message_discarded_packets_create_with_default_clock_snapshots( + const auto msg = bt_message_discarded_packets_create_with_default_clock_snapshots( iter, stream, new_begin_ts, end_ts); - if (!*msg_out) { - status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM; - goto end; + + if (!msg) { + return LTTNG_LIVE_ITERATOR_STATUS_NOMEM; } - bt_message_discarded_packets_set_count(*msg_out, count); -end: - return status; + bt_message_discarded_packets_set_count(msg, count); + msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg); + return LTTNG_LIVE_ITERATOR_STATUS_OK; } static enum lttng_live_iterator_status adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream, - const bt_message *msg_in, bt_message **msg_out, + const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out, uint64_t new_begin_ts) { - enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK; enum bt_property_availability availability; const bt_clock_snapshot *clock_snapshot; uint64_t end_ts; @@ -993,30 +845,29 @@ adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream availability = bt_message_discarded_events_get_count(msg_in, &count); BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE); - *msg_out = bt_message_discarded_events_create_with_default_clock_snapshots( + const auto msg = bt_message_discarded_events_create_with_default_clock_snapshots( iter, stream, new_begin_ts, end_ts); - if (!*msg_out) { - status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM; - goto end; + + if (!msg) { + return LTTNG_LIVE_ITERATOR_STATUS_NOMEM; } - bt_message_discarded_events_set_count(*msg_out, count); -end: - return status; + bt_message_discarded_events_set_count(msg, count); + msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg); + return LTTNG_LIVE_ITERATOR_STATUS_OK; } static enum lttng_live_iterator_status handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns, - const bt_message *late_msg) + const bt2::ConstMessage& late_msg) { const bt_clock_class *clock_class; const bt_stream_class *stream_class; enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status; int64_t last_inactivity_ts_ns; - enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK; enum lttng_live_iterator_status adjust_status; - bt_message *adjusted_message; + bt2::ConstMessage::Shared adjusted_message; /* * The timestamp of the current message is before the last message sent @@ -1041,24 +892,23 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter, * In short, the only scenario in which it's okay and fixable to * received a late message is when: * 1. the late message is a discarded packets or discarded events - * message, + * message, * 2. this stream produced an inactivity message downstream, and * 3. the timestamp of the late message is within the inactivity - * timespan we sent downstream through the inactivity message. + * timespan we sent downstream through the inactivity message. */ BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Handling late message on live stream iterator: " "stream-name=\"{}\", viewer-stream-id={}", - stream_iter->name->str, stream_iter->viewer_stream_id); + stream_iter->name, stream_iter->viewer_stream_id); if (!stream_iter->last_inactivity_ts.is_set) { BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, "Invalid live stream state: " "have a late message when no inactivity message " "was ever sent for that stream."); - stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - goto end; + return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } if (!is_discarded_packet_or_event_message(late_msg)) { @@ -1066,12 +916,11 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter, "Invalid live stream state: " "have a late message that is not a packet discarded or " "event discarded message: late-msg-type={}", - static_cast(bt_message_get_type(late_msg))); - stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - goto end; + late_msg.type()); + return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } - stream_class = bt_stream_borrow_class_const(stream_iter->stream); + stream_class = bt_stream_borrow_class_const(stream_iter->stream->libObjPtr()); clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class); ts_ns_status = bt_clock_class_cycles_to_ns_from_origin( @@ -1080,8 +929,7 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter, BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, "Error converting last " "inactivity message timestamp to nanoseconds"); - stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - goto end; + return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } if (last_inactivity_ts_ns <= late_msg_ts_ns) { @@ -1091,8 +939,7 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter, "inactivity timespan: last-inactivity-ts-ns={}, " "late-msg-ts-ns={}", last_inactivity_ts_ns, late_msg_ts_ns); - stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - goto end; + return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } /* @@ -1102,35 +949,31 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter, BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Adjusting the timestamp of late message: late-msg-type={}, " "msg-new-ts-ns={}", - static_cast(bt_message_get_type(late_msg)), - stream_iter->last_inactivity_ts.value); - switch (bt_message_get_type(late_msg)) { - case BT_MESSAGE_TYPE_DISCARDED_EVENTS: + late_msg.type(), stream_iter->last_inactivity_ts.value); + switch (late_msg.type()) { + case bt2::MessageType::DiscardedEvents: adjust_status = adjust_discarded_events_message( - lttng_live_msg_iter->self_msg_iter, stream_iter->stream, late_msg, &adjusted_message, - stream_iter->last_inactivity_ts.value); + lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(), + late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value); break; - case BT_MESSAGE_TYPE_DISCARDED_PACKETS: + case bt2::MessageType::DiscardedPackets: adjust_status = adjust_discarded_packets_message( - lttng_live_msg_iter->self_msg_iter, stream_iter->stream, late_msg, &adjusted_message, - stream_iter->last_inactivity_ts.value); + lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(), + late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value); break; default: bt_common_abort(); } if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { - stream_iter_status = adjust_status; - goto end; + return adjust_status; } BT_ASSERT_DBG(adjusted_message); stream_iter->current_msg = adjusted_message; stream_iter->current_msg_ts_ns = last_inactivity_ts_ns; - bt_message_put_ref(late_msg); -end: - return stream_iter_status; + return LTTNG_LIVE_ITERATOR_STATUS_OK; } static enum lttng_live_iterator_status @@ -1139,12 +982,10 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator **youngest_trace_stream_iter) { struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL; - enum lttng_live_iterator_status stream_iter_status; int64_t youngest_candidate_msg_ts = INT64_MAX; uint64_t stream_iter_idx; BT_ASSERT_DBG(live_trace); - BT_ASSERT_DBG(live_trace->stream_iterators); BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Finding the next stream iterator for trace: " @@ -1157,22 +998,21 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter, * ensure monotonicity. */ stream_iter_idx = 0; - while (stream_iter_idx < live_trace->stream_iterators->len) { + while (stream_iter_idx < live_trace->stream_iterators.size()) { bool stream_iter_is_ended = false; - struct lttng_live_stream_iterator *stream_iter = - (lttng_live_stream_iterator *) g_ptr_array_index(live_trace->stream_iterators, - stream_iter_idx); + lttng_live_stream_iterator *stream_iter = + live_trace->stream_iterators[stream_iter_idx].get(); /* * If there is no current message for this stream, go fetch * one. */ while (!stream_iter->current_msg) { - const bt_message *msg = NULL; + bt2::ConstMessage::Shared msg; int64_t curr_msg_ts_ns = INT64_MAX; - stream_iter_status = - lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, &msg); + const auto stream_iter_status = + lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, msg); if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) { stream_iter_is_ended = true; @@ -1180,7 +1020,7 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter, } if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { - goto end; + return stream_iter_status; } BT_ASSERT_DBG(msg); @@ -1188,15 +1028,14 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter, BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Live stream iterator returned message: msg-type={}, " "stream-name=\"{}\", viewer-stream-id={}", - static_cast(bt_message_get_type(msg)), - stream_iter->name->str, stream_iter->viewer_stream_id); + msg->type(), stream_iter->name, stream_iter->viewer_stream_id); /* * Get the timestamp in nanoseconds from origin of this * message. */ - live_get_msg_ts_ns(lttng_live_msg_iter, msg, lttng_live_msg_iter->last_msg_ts_ns, - &curr_msg_ts_ns); + live_get_msg_ts_ns(lttng_live_msg_iter, msg->libObjPtr(), + lttng_live_msg_iter->last_msg_ts_ns, &curr_msg_ts_ns); /* * Check if the message of the current live stream @@ -1205,26 +1044,24 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter, * iterator. If not, we need to handle it with care. */ if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) { - stream_iter->current_msg = msg; + stream_iter->current_msg = std::move(msg); stream_iter->current_msg_ts_ns = curr_msg_ts_ns; } else { /* * We received a message from the past. This * may be fixable but it can also be an error. */ - stream_iter_status = - handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, msg); - if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { + if (handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, *msg) != + LTTNG_LIVE_ITERATOR_STATUS_OK) { BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, "Late message could not be handled correctly: " "lttng-live-msg-iter-addr={}, " "stream-name=\"{}\", " "curr-msg-ts={}, last-msg-ts={}", - fmt::ptr(lttng_live_msg_iter), - stream_iter->name->str, curr_msg_ts_ns, + fmt::ptr(lttng_live_msg_iter), stream_iter->name, + curr_msg_ts_ns, lttng_live_msg_iter->last_msg_ts_ns); - stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - goto end; + return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } } } @@ -1248,7 +1085,8 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter, */ BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter); int ret = common_muxing_compare_messages( - stream_iter->current_msg, youngest_candidate_stream_iter->current_msg); + stream_iter->current_msg->libObjPtr(), + youngest_candidate_stream_iter->current_msg->libObjPtr()); if (ret < 0) { /* * The `youngest_candidate_stream_iter->current_msg` @@ -1282,24 +1120,21 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter, * removed element with the array's last * element. */ - g_ptr_array_remove_index_fast(live_trace->stream_iterators, stream_iter_idx); + bt2c::vectorFastRemove(live_trace->stream_iterators, stream_iter_idx); } } if (youngest_candidate_stream_iter) { *youngest_trace_stream_iter = youngest_candidate_stream_iter; - stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK; + return LTTNG_LIVE_ITERATOR_STATUS_OK; } else { /* * The only case where we don't have a candidate for this trace * is if we reached the end of all the iterators. */ - BT_ASSERT(live_trace->stream_iterators->len == 0); - stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END; + BT_ASSERT(live_trace->stream_iterators.empty()); + return LTTNG_LIVE_ITERATOR_STATUS_END; } - -end: - return stream_iter_status; } static enum lttng_live_iterator_status @@ -1324,16 +1159,13 @@ next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK && stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE && stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) { - goto end; + return stream_iter_status; } - BT_ASSERT_DBG(session->traces); - - while (trace_idx < session->traces->len) { + while (trace_idx < session->traces.size()) { bool trace_is_ended = false; struct lttng_live_stream_iterator *stream_iter; - struct lttng_live_trace *trace = - (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx); + lttng_live_trace *trace = session->traces[trace_idx].get(); stream_iter_status = next_stream_iterator_for_trace(lttng_live_msg_iter, trace, &stream_iter); @@ -1344,7 +1176,7 @@ next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter */ trace_is_ended = true; } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { - goto end; + return stream_iter_status; } if (!trace_is_ended) { @@ -1360,7 +1192,8 @@ next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter * deterministic way. */ int ret = common_muxing_compare_messages( - stream_iter->current_msg, youngest_candidate_stream_iter->current_msg); + stream_iter->current_msg->libObjPtr(), + youngest_candidate_stream_iter->current_msg->libObjPtr()); if (ret < 0) { /* * The `youngest_candidate_stream_iter->current_msg` @@ -1383,15 +1216,15 @@ next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter } else { /* * trace_idx is not incremented since - * g_ptr_array_remove_index_fast replaces the + * vectorFastRemove replaces the * element at trace_idx with the array's last element. */ - g_ptr_array_remove_index_fast(session->traces, trace_idx); + bt2c::vectorFastRemove(session->traces, trace_idx); } } if (youngest_candidate_stream_iter) { *youngest_session_stream_iter = youngest_candidate_stream_iter; - stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK; + return LTTNG_LIVE_ITERATOR_STATUS_OK; } else { /* * The only cases where we don't have a candidate for this @@ -1402,11 +1235,9 @@ next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter * * In either cases, we return END. */ - BT_ASSERT(session->traces->len == 0); - stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END; + BT_ASSERT(session->traces.empty()); + return LTTNG_LIVE_ITERATOR_STATUS_END; } -end: - return stream_iter_status; } static inline void put_messages(bt_message_array_const msgs, uint64_t count) @@ -1429,7 +1260,6 @@ lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_it); struct lttng_live_component *lttng_live = lttng_live_msg_iter->lttng_live_comp; enum lttng_live_iterator_status stream_iter_status; - uint64_t session_idx; *count = 0; @@ -1437,53 +1267,50 @@ lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array if (G_UNLIKELY(lttng_live_msg_iter->was_interrupted)) { /* - * The iterator was interrupted in a previous call to the - * `_next()` method. We currently do not support generating - * messages after such event. The babeltrace2 CLI should never - * be running the graph after being interrupted. So this check - * is to prevent other graph users from using this live - * iterator in an messed up internal state. - */ - status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; + * The iterator was interrupted in a previous call to the + * `_next()` method. We currently do not support generating + * messages after such event. The babeltrace2 CLI should never + * be running the graph after being interrupted. So this check + * is to prevent other graph users from using this live + * iterator in an messed up internal state. + */ BT_CPPLOGE_APPEND_CAUSE_SPEC( lttng_live_msg_iter->logger, "Message iterator was interrupted during a previous call to the `next()` and currently does not support continuing after such event."); - goto end; + return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; } /* - * Clear all the invalid message reference that might be left over in - * the output array. - */ + * Clear all the invalid message reference that might be left over in + * the output array. + */ memset(msgs, 0, capacity * sizeof(*msgs)); /* - * If no session are exposed on the relay found at the url provided by - * the user, session count will be 0. In this case, we return status - * end to return gracefully. - */ - if (lttng_live_msg_iter->sessions->len == 0) { + * If no session are exposed on the relay found at the url provided by + * the user, session count will be 0. In this case, we return status + * end to return gracefully. + */ + if (lttng_live_msg_iter->sessions.empty()) { if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) { - status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END; - goto end; + return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END; } else { /* - * The are no more active session for this session - * name. Retry to create a viewer session for the - * requested session name. - */ + * The are no more active session for this session + * name. Retry to create a viewer session for the + * requested session name. + */ viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter); if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { - status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, "Error creating LTTng live viewer session"); + return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { - status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN; + return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN; } else { bt_common_abort(); } - goto end; } } } @@ -1493,53 +1320,52 @@ lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array } /* - * Here the muxing of message is done. - * - * We need to iterate over all the streams of all the traces of all the - * viewer sessions in order to get the message with the smallest - * timestamp. In this case, a session is a viewer session and there is - * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or - * kernel). Each viewer session can have multiple traces, for example, - * 64bit UST viewer sessions could have multiple per-pid traces. - * - * We iterate over the streams of each traces to update and see what is - * their next message's timestamp. From those timestamps, we select the - * message with the smallest timestamp as the best candidate message - * for that trace and do the same thing across all the sessions. - * - * We then compare the timestamp of best candidate message of all the - * sessions to pick the message with the smallest timestamp and we - * return it. - */ + * Here the muxing of message is done. + * + * We need to iterate over all the streams of all the traces of all the + * viewer sessions in order to get the message with the smallest + * timestamp. In this case, a session is a viewer session and there is + * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or + * kernel). Each viewer session can have multiple traces, for example, + * 64bit UST viewer sessions could have multiple per-pid traces. + * + * We iterate over the streams of each traces to update and see what is + * their next message's timestamp. From those timestamps, we select the + * message with the smallest timestamp as the best candidate message + * for that trace and do the same thing across all the sessions. + * + * We then compare the timestamp of best candidate message of all the + * sessions to pick the message with the smallest timestamp and we + * return it. + */ while (*count < capacity) { struct lttng_live_stream_iterator *youngest_stream_iter = NULL, *candidate_stream_iter = NULL; int64_t youngest_msg_ts_ns = INT64_MAX; - BT_ASSERT_DBG(lttng_live_msg_iter->sessions); - session_idx = 0; - while (session_idx < lttng_live_msg_iter->sessions->len) { - struct lttng_live_session *session = (lttng_live_session *) g_ptr_array_index( - lttng_live_msg_iter->sessions, session_idx); + uint64_t session_idx = 0; + while (session_idx < lttng_live_msg_iter->sessions.size()) { + lttng_live_session *session = lttng_live_msg_iter->sessions[session_idx].get(); /* Find the best candidate message to send downstream. */ stream_iter_status = next_stream_iterator_for_session(lttng_live_msg_iter, session, &candidate_stream_iter); /* If we receive an END status, it means that either: - * - Those traces never had active streams (UST with no - * data produced yet), - * - All live stream iterators have ENDed.*/ + * - Those traces never had active streams (UST with no + * data produced yet), + * - All live stream iterators have ENDed. + */ if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) { - if (session->closed && session->traces->len == 0) { + if (session->closed && session->traces.empty()) { /* - * Remove the session from the list. - * session_idx is not modified since - * g_ptr_array_remove_index_fast - * replaces the the removed element with - * the array's last element. - */ - g_ptr_array_remove_index_fast(lttng_live_msg_iter->sessions, session_idx); + * Remove the session from the list. + * session_idx is not modified since + * g_ptr_array_remove_index_fast + * replaces the the removed element with + * the array's last element. + */ + bt2c::vectorFastRemove(lttng_live_msg_iter->sessions, session_idx); } else { session_idx++; } @@ -1556,26 +1382,27 @@ lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array youngest_stream_iter = candidate_stream_iter; } else if (candidate_stream_iter->current_msg_ts_ns == youngest_msg_ts_ns) { /* - * The currently selected message to be sent - * downstream next has the exact same timestamp - * that of the current candidate message. We - * must break the tie in a predictable manner. - */ - BT_CPPLOGD_STR_SPEC( + * The currently selected message to be sent + * downstream next has the exact same timestamp + * that of the current candidate message. We + * must break the tie in a predictable manner. + */ + BT_CPPLOGD_SPEC( lttng_live_msg_iter->logger, "Two of the next message candidates have the same timestamps, pick one deterministically."); /* - * Order the messages in an arbitrary but - * deterministic way. - */ - int ret = common_muxing_compare_messages(candidate_stream_iter->current_msg, - youngest_stream_iter->current_msg); + * Order the messages in an arbitrary but + * deterministic way. + */ + int ret = common_muxing_compare_messages( + candidate_stream_iter->current_msg->libObjPtr(), + youngest_stream_iter->current_msg->libObjPtr()); if (ret < 0) { /* - * The `candidate_stream_iter->current_msg` - * should go first. Update the next - * iterator and the current timestamp. - */ + * The `candidate_stream_iter->current_msg` + * should go first. Update the next + * iterator and the current timestamp. + */ youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns; youngest_stream_iter = candidate_stream_iter; } else if (ret == 0) { @@ -1603,11 +1430,11 @@ lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array youngest_stream_iter->current_msg_ts_ns); /* - * Insert the next message to the message batch. This will set - * stream iterator current message to NULL so that next time - * we fetch the next message of that stream iterator - */ - BT_MESSAGE_MOVE_REF(msgs[*count], youngest_stream_iter->current_msg); + * Insert the next message to the message batch. This will set + * stream iterator current message to NULL so that next time + * we fetch the next message of that stream iterator + */ + msgs[*count] = youngest_stream_iter->current_msg.release().libObjPtr(); (*count)++; /* Update the last timestamp in nanoseconds sent downstream. */ @@ -1622,13 +1449,13 @@ return_status: case LTTNG_LIVE_ITERATOR_STATUS_OK: case LTTNG_LIVE_ITERATOR_STATUS_AGAIN: /* - * If we gathered messages, return _OK even if the graph was - * interrupted. This allows for the components downstream to at - * least get the those messages. If the graph was indeed - * interrupted there should not be another _next() call as the - * application will tear down the graph. This component class - * doesn't support restarting after an interruption. - */ + * If we gathered messages, return _OK even if the graph was + * interrupted. This allows for the components downstream to at + * least get the those messages. If the graph was indeed + * interrupted there should not be another _next() call as the + * application will tear down the graph. This component class + * doesn't support restarting after an interruption. + */ if (*count > 0) { status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; } else { @@ -1661,7 +1488,6 @@ return_status: bt_common_abort(); } -end: return status; } catch (const std::bad_alloc&) { return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR; @@ -1670,23 +1496,19 @@ end: } } -static struct lttng_live_msg_iter * +static lttng_live_msg_iter::UP lttng_live_msg_iter_create(struct lttng_live_component *lttng_live_comp, bt_self_message_iterator *self_msg_it) { - lttng_live_msg_iter *msg_iter = new lttng_live_msg_iter {lttng_live_comp->logger}; + auto msg_iter = bt2s::make_unique(lttng_live_comp->logger); + msg_iter->self_comp = lttng_live_comp->self_comp; msg_iter->lttng_live_comp = lttng_live_comp; msg_iter->self_msg_iter = self_msg_it; - msg_iter->active_stream_iter = 0; msg_iter->last_msg_ts_ns = INT64_MIN; msg_iter->was_interrupted = false; - msg_iter->sessions = - g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_session); - BT_ASSERT(msg_iter->sessions); - return msg_iter; } @@ -1695,9 +1517,7 @@ lttng_live_msg_iter_init(bt_self_message_iterator *self_msg_it, bt_self_message_iterator_configuration *, bt_self_component_port_output *) { try { - bt_message_iterator_class_initialize_method_status status; struct lttng_live_component *lttng_live; - struct lttng_live_msg_iter *lttng_live_msg_iter; enum lttng_live_viewer_status viewer_status; bt_self_component *self_comp = bt_self_message_iterator_borrow_component(self_msg_it); @@ -1707,53 +1527,50 @@ lttng_live_msg_iter_init(bt_self_message_iterator *self_msg_it, BT_ASSERT(!lttng_live->has_msg_iter); lttng_live->has_msg_iter = true; - lttng_live_msg_iter = lttng_live_msg_iter_create(lttng_live, self_msg_it); + auto lttng_live_msg_iter = lttng_live_msg_iter_create(lttng_live, self_msg_it); if (!lttng_live_msg_iter) { BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live->logger, "Failed to create lttng_live_msg_iter"); - status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; - goto error; + return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; } viewer_status = live_viewer_connection_create( - lttng_live->params.url->str, false, lttng_live_msg_iter, lttng_live_msg_iter->logger, - <tng_live_msg_iter->viewer_connection); + lttng_live->params.url.c_str(), false, lttng_live_msg_iter.get(), + lttng_live_msg_iter->logger, lttng_live_msg_iter->viewer_connection); if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, "Failed to create viewer connection"); } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { /* - * Interruption in the _iter_init() method is not - * supported. Return an error. - */ + * Interruption in the _iter_init() method is not + * supported. Return an error. + */ BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, "Interrupted while creating viewer connection"); } - status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR; - goto error; + return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR; } - viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter); + viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter.get()); if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, "Failed to create viewer session"); } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { /* - * Interruption in the _iter_init() method is not - * supported. Return an error. - */ + * Interruption in the _iter_init() method is not + * supported. Return an error. + */ BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, "Interrupted when creating viewer session"); } - status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR; - goto error; + return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR; } - if (lttng_live_msg_iter->sessions->len == 0) { + if (lttng_live_msg_iter->sessions.empty()) { switch (lttng_live->params.sess_not_found_act) { case SESSION_NOT_FOUND_ACTION_CONTINUE: BT_CPPLOGI_SPEC( @@ -1761,7 +1578,7 @@ lttng_live_msg_iter_init(bt_self_message_iterator *self_msg_it, "Unable to connect to the requested live viewer session. " "Keep trying to connect because of {}=\"{}\" component parameter: url=\"{}\"", SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR, - lttng_live->params.url->str); + lttng_live->params.url); break; case SESSION_NOT_FOUND_ACTION_FAIL: BT_CPPLOGE_APPEND_CAUSE_SPEC( @@ -1770,30 +1587,23 @@ lttng_live_msg_iter_init(bt_self_message_iterator *self_msg_it, "Fail the message iterator initialization because of {}=\"{}\" " "component parameter: url =\"{}\"", SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_FAIL_STR, - lttng_live->params.url->str); - status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR; - goto error; + lttng_live->params.url); + return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR; case SESSION_NOT_FOUND_ACTION_END: BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger, "Unable to connect to the requested live viewer session. " "End gracefully at the first _next() call because of {}=\"{}\"" " component parameter: url=\"{}\"", SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_END_STR, - lttng_live->params.url->str); + lttng_live->params.url); break; default: bt_common_abort(); } } - bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter); - status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK; - goto end; - -error: - lttng_live_msg_iter_destroy(lttng_live_msg_iter); -end: - return status; + bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter.release()); + return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK; } catch (const std::bad_alloc&) { return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; } catch (const bt2::Error&) { @@ -1806,138 +1616,88 @@ static struct bt_param_validation_map_value_entry_descr list_sessions_params[] = bt_param_validation_value_descr::makeString()}, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END}; -static bt_component_class_query_method_status -lttng_live_query_list_sessions(const bt_value *params, const bt_value **result, - const bt2c::Logger& logger) +static bt2::Value::Shared lttng_live_query_list_sessions(const bt2::ConstMapValue params, + const bt2c::Logger& logger) { - bt_component_class_query_method_status status; - const bt_value *url_value = NULL; const char *url; - struct live_viewer_connection *viewer_connection = NULL; + live_viewer_connection::UP viewer_connection; enum lttng_live_viewer_status viewer_status; enum bt_param_validation_status validation_status; gchar *validate_error = NULL; - validation_status = bt_param_validation_validate(params, list_sessions_params, &validate_error); + validation_status = + bt_param_validation_validate(params.libObjPtr(), list_sessions_params, &validate_error); if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) { - status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR; - goto error; + throw bt2c::MemoryError {}; } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) { - status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; - BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "{}", validate_error); - goto error; + bt2c::GCharUP errorFreer {validate_error}; + + BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(logger, bt2::Error, "{}", validate_error); } - url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM); - url = bt_value_string_get(url_value); + url = params[URL_PARAM]->asString().value(); - viewer_status = live_viewer_connection_create(url, true, NULL, logger, &viewer_connection); + viewer_status = live_viewer_connection_create(url, true, NULL, logger, viewer_connection); if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { - BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Failed to create viewer connection"); - status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; + BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(logger, bt2::Error, + "Failed to create viewer connection"); } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { - status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN; + throw bt2c::TryAgain {}; } else { bt_common_abort(); } - goto error; - } - - status = live_viewer_connection_list_sessions(viewer_connection, result); - if (status != BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK) { - BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Failed to list viewer sessions"); - goto error; - } - - goto end; - -error: - BT_VALUE_PUT_REF_AND_RESET(*result); - - if (status >= 0) { - status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; - } - -end: - if (viewer_connection) { - live_viewer_connection_destroy(viewer_connection); } - g_free(validate_error); - - return status; + return live_viewer_connection_list_sessions(viewer_connection.get()); } -static bt_component_class_query_method_status -lttng_live_query_support_info(const bt_value *params, const bt_value **result, - const bt2c::Logger& logger) +static bt2::Value::Shared lttng_live_query_support_info(const bt2::ConstMapValue params, + const bt2c::Logger& logger) { - bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK; - const bt_value *input_type_value; - const bt_value *input_value; - double weight = 0; struct bt_common_lttng_live_url_parts parts = {}; + bt_common_lttng_live_url_parts_deleter partsDeleter {parts}; /* Used by the logging macros */ __attribute__((unused)) bt_self_component *self_comp = NULL; - *result = NULL; - input_type_value = bt_value_map_borrow_entry_value_const(params, "type"); - if (!input_type_value) { - BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Missing expected `type` parameter."); - goto error; + const auto typeValue = params["type"]; + if (!typeValue) { + BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(logger, bt2::Error, + "Missing expected `type` parameter."); } - if (!bt_value_is_string(input_type_value)) { - BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "`type` parameter is not a string value."); - goto error; + if (!typeValue->isString()) { + BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(logger, bt2::Error, + "`type` parameter is not a string value."); } - if (strcmp(bt_value_string_get(input_type_value), "string") != 0) { + if (strcmp(typeValue->asString().value(), "string") != 0) { /* We don't handle file system paths */ - goto create_result; + return bt2::RealValue::create(); } - input_value = bt_value_map_borrow_entry_value_const(params, "input"); - if (!input_value) { - BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Missing expected `input` parameter."); - goto error; + const auto inputValue = params["input"]; + if (!inputValue) { + BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(logger, bt2::Error, + "Missing expected `input` parameter."); } - if (!bt_value_is_string(input_value)) { - BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "`input` parameter is not a string value."); - goto error; + if (!inputValue->isString()) { + BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(logger, bt2::Error, + "`input` parameter is not a string value."); } - parts = bt_common_parse_lttng_live_url(bt_value_string_get(input_value), NULL, 0); + parts = bt_common_parse_lttng_live_url(inputValue->asString().value(), NULL, 0); if (parts.session_name) { /* * Looks pretty much like an LTTng live URL: we got the * session name part, which forms a complete URL. */ - weight = .75; - } - -create_result: - *result = bt_value_real_create_init(weight); - if (!*result) { - status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR; - goto error; + return bt2::RealValue::create(.75); } - goto end; - -error: - if (status >= 0) { - status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; - } - - BT_ASSERT(!*result); - -end: - bt_common_destroy_lttng_live_url_parts(&parts); - return status; + return bt2::RealValue::create(); } bt_component_class_query_method_status lttng_live_query(bt_self_component_class_source *comp_class, @@ -1947,23 +1707,26 @@ bt_component_class_query_method_status lttng_live_query(bt_self_component_class_ const bt_value **result) { try { - bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK; bt2c::Logger logger {bt2::SelfComponentClass {comp_class}, bt2::PrivateQueryExecutor {priv_query_exec}, "PLUGIN/SRC.CTF.LTTNG-LIVE/QUERY"}; + const bt2::ConstMapValue paramsObj(params); + bt2::Value::Shared resultObj; if (strcmp(object, "sessions") == 0) { - status = lttng_live_query_list_sessions(params, result, logger); + resultObj = lttng_live_query_list_sessions(paramsObj, logger); } else if (strcmp(object, "babeltrace.support-info") == 0) { - status = lttng_live_query_support_info(params, result, logger); + resultObj = lttng_live_query_support_info(paramsObj, logger); } else { BT_CPPLOGI_SPEC(logger, "Unknown query object `{}`", object); - status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT; - goto end; + return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT; } -end: - return status; + *result = resultObj.release().libObjPtr(); + + return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK; + } catch (const bt2c::TryAgain&) { + return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR; } catch (const std::bad_alloc&) { return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR; } catch (const bt2::Error&) { @@ -1971,28 +1734,10 @@ end: } } -static void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live) -{ - if (!lttng_live) { - return; - } - - if (lttng_live->params.url) { - g_string_free(lttng_live->params.url, TRUE); - } - - delete lttng_live; -} - void lttng_live_component_finalize(bt_self_component_source *component) { - lttng_live_component *data = (lttng_live_component *) bt_self_component_get_data( - bt_self_component_source_as_self_component(component)); - - if (!data) { - return; - } - lttng_live_component_destroy_data(data); + lttng_live_component::UP {static_cast( + bt_self_component_get_data(bt_self_component_source_as_self_component(component)))}; } static enum session_not_found_action @@ -2031,42 +1776,32 @@ static struct bt_param_validation_map_value_entry_descr params_descr[] = { static bt_component_class_initialize_method_status lttng_live_component_create(const bt_value *params, bt_self_component_source *self_comp, - struct lttng_live_component **component) + lttng_live_component::UP& component) { - struct lttng_live_component *lttng_live = NULL; const bt_value *inputs_value; const bt_value *url_value; const bt_value *value; - const char *url; enum bt_param_validation_status validation_status; gchar *validation_error = NULL; - bt_component_class_initialize_method_status status; bt2c::Logger logger {bt2::SelfSourceComponent {self_comp}, "PLUGIN/SRC.CTF.LTTNG-LIVE/COMP"}; validation_status = bt_param_validation_validate(params, params_descr, &validation_error); if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) { - status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; - goto error; + return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) { + bt2c::GCharUP errorFreer {validation_error}; BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "{}", validation_error); - status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR; - goto error; + return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR; } - lttng_live = new lttng_live_component {std::move(logger)}; + auto lttng_live = bt2s::make_unique(std::move(logger)); lttng_live->self_comp = bt_self_component_source_as_self_component(self_comp); lttng_live->max_query_size = MAX_QUERY_SIZE; lttng_live->has_msg_iter = false; inputs_value = bt_value_map_borrow_entry_value_const(params, INPUTS_PARAM); url_value = bt_value_array_borrow_element_by_index_const(inputs_value, 0); - url = bt_value_string_get(url_value); - - lttng_live->params.url = g_string_new(url); - if (!lttng_live->params.url) { - status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; - goto error; - } + lttng_live->params.url = bt_value_string_get(url_value); value = bt_value_map_borrow_entry_value_const(params, SESS_NOT_FOUND_ACTION_PARAM); if (value) { @@ -2078,17 +1813,8 @@ lttng_live_component_create(const bt_value *params, bt_self_component_source *se lttng_live->params.sess_not_found_act = SESSION_NOT_FOUND_ACTION_CONTINUE; } - status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK; - goto end; - -error: - lttng_live_component_destroy_data(lttng_live); - lttng_live = NULL; -end: - g_free(validation_error); - - *component = lttng_live; - return status; + component = std::move(lttng_live); + return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK; } bt_component_class_initialize_method_status @@ -2096,31 +1822,26 @@ lttng_live_component_init(bt_self_component_source *self_comp_src, bt_self_component_source_configuration *, const bt_value *params, void *) { try { - struct lttng_live_component *lttng_live; + lttng_live_component::UP lttng_live; bt_component_class_initialize_method_status ret; bt_self_component *self_comp = bt_self_component_source_as_self_component(self_comp_src); bt_self_component_add_port_status add_port_status; - ret = lttng_live_component_create(params, self_comp_src, <tng_live); + ret = lttng_live_component_create(params, self_comp_src, lttng_live); if (ret != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) { - goto error; + return ret; } add_port_status = bt_self_component_source_add_output_port(self_comp_src, "out", NULL, NULL); if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) { ret = (bt_component_class_initialize_method_status) add_port_status; - goto end; + return ret; } - bt_self_component_set_data(self_comp, lttng_live); - goto end; + bt_self_component_set_data(self_comp, lttng_live.release()); -error: - lttng_live_component_destroy_data(lttng_live); - lttng_live = NULL; -end: - return ret; + return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK; } catch (const std::bad_alloc&) { return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; } catch (const bt2::Error&) {