X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fplugins%2Fctf%2Flttng-live%2Flttng-live.cpp;h=6368181000f641718cb9c192cda053ba131b2341;hb=refs%2Fheads%2Fmaster;hp=344fbe27c17441fa88b4b94403f9cc48b3cb2c70;hpb=88730e42fa22a5d7abfc9912cb89ed85db4631d5;p=babeltrace.git diff --git a/src/plugins/ctf/lttng-live/lttng-live.cpp b/src/plugins/ctf/lttng-live/lttng-live.cpp index 344fbe27..63681810 100644 --- a/src/plugins/ctf/lttng-live/lttng-live.cpp +++ b/src/plugins/ctf/lttng-live/lttng-live.cpp @@ -8,28 +8,22 @@ * Babeltrace CTF LTTng-live Client Component */ -#define BT_COMP_LOG_SELF_COMP self_comp -#define BT_LOG_OUTPUT_LEVEL log_level -#define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE" -#include "logging/comp-logging.h" - -#include -#include -#include - #include +#include #include "common/assert.h" -#include -#include "compat/compiler.h" -#include +#include "cpp-common/bt2c/fmt.hpp" +#include "cpp-common/bt2c/glib-up.hpp" +#include "cpp-common/bt2c/vector.hpp" +#include "cpp-common/bt2s/make-unique.hpp" +#include "cpp-common/vendor/fmt/format.h" #include "plugins/common/muxing/muxing.h" #include "plugins/common/param-validation/param-validation.h" #include "data-stream.hpp" -#include "metadata.hpp" #include "lttng-live.hpp" +#include "metadata.hpp" #define MAX_QUERY_SIZE (256 * 1024) #define URL_PARAM "url" @@ -39,318 +33,147 @@ #define SESS_NOT_FOUND_ACTION_FAIL_STR "fail" #define SESS_NOT_FOUND_ACTION_END_STR "end" -#define print_dbg(fmt, ...) BT_COMP_LOGD(fmt, ##__VA_ARGS__) - -static const char *lttng_live_iterator_status_string(enum lttng_live_iterator_status status) -{ - switch (status) { - case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE: - return "LTTNG_LIVE_ITERATOR_STATUS_CONTINUE"; - case LTTNG_LIVE_ITERATOR_STATUS_AGAIN: - return "LTTNG_LIVE_ITERATOR_STATUS_AGAIN"; - case LTTNG_LIVE_ITERATOR_STATUS_END: - return "LTTNG_LIVE_ITERATOR_STATUS_END"; - case LTTNG_LIVE_ITERATOR_STATUS_OK: - return "LTTNG_LIVE_ITERATOR_STATUS_OK"; - case LTTNG_LIVE_ITERATOR_STATUS_INVAL: - return "LTTNG_LIVE_ITERATOR_STATUS_INVAL"; - case LTTNG_LIVE_ITERATOR_STATUS_ERROR: - return "LTTNG_LIVE_ITERATOR_STATUS_ERROR"; - case LTTNG_LIVE_ITERATOR_STATUS_NOMEM: - return "LTTNG_LIVE_ITERATOR_STATUS_NOMEM"; - case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED: - return "LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED"; - default: - bt_common_abort(); - } -} - -static const char *lttng_live_stream_state_string(enum lttng_live_stream_state state) -{ - switch (state) { - case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA: - return "ACTIVE_NO_DATA"; - case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA: - return "QUIESCENT_NO_DATA"; - case LTTNG_LIVE_STREAM_QUIESCENT: - return "QUIESCENT"; - case LTTNG_LIVE_STREAM_ACTIVE_DATA: - return "ACTIVE_DATA"; - case LTTNG_LIVE_STREAM_EOF: - return "EOF"; - default: - return "ERROR"; - } -} - void lttng_live_stream_iterator_set_state(struct lttng_live_stream_iterator *stream_iter, enum lttng_live_stream_state new_state) { - bt_self_component *self_comp = stream_iter->self_comp; - bt_logging_level log_level = stream_iter->log_level; - - BT_COMP_LOGD("Setting live stream iterator state: viewer-stream-id=%" PRIu64 - ", old-state=%s, new-state=%s", - stream_iter->viewer_stream_id, lttng_live_stream_state_string(stream_iter->state), - lttng_live_stream_state_string(new_state)); + BT_CPPLOGD_SPEC(stream_iter->logger, + "Setting live stream iterator state: viewer-stream-id={}, " + "old-state={}, new-state={}", + stream_iter->viewer_stream_id, stream_iter->state, new_state); stream_iter->state = new_state; } #define LTTNG_LIVE_LOGD_STREAM_ITER(live_stream_iter) \ do { \ - BT_COMP_LOGD("Live stream iterator state=%s, " \ - "last-inact-ts-is-set=%d, last-inact-ts-value=%" PRId64 ", " \ - "curr-inact-ts=%" PRId64, \ - lttng_live_stream_state_string(live_stream_iter->state), \ - live_stream_iter->last_inactivity_ts.is_set, \ - live_stream_iter->last_inactivity_ts.value, \ - live_stream_iter->current_inactivity_ts); \ + BT_CPPLOGD_SPEC((live_stream_iter)->logger, \ + "Live stream iterator state={}, " \ + "last-inact-ts-is-set={}, last-inact-ts-value={}, " \ + "curr-inact-ts={}", \ + (live_stream_iter)->state, (live_stream_iter)->last_inactivity_ts.is_set, \ + (live_stream_iter)->last_inactivity_ts.value, \ + (live_stream_iter)->current_inactivity_ts); \ } while (0); -BT_HIDDEN bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter) { - bool ret; - if (!msg_iter) { - ret = false; - goto end; + return false; } - ret = bt_self_message_iterator_is_interrupted(msg_iter->self_msg_iter); - -end: - return ret; + return bt_self_message_iterator_is_interrupted(msg_iter->self_msg_iter); } static struct lttng_live_trace * lttng_live_session_borrow_trace_by_id(struct lttng_live_session *session, uint64_t trace_id) { - uint64_t trace_idx; - struct lttng_live_trace *ret_trace = NULL; - - for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) { - struct lttng_live_trace *trace = - (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx); + for (lttng_live_trace::UP& trace : session->traces) { if (trace->id == trace_id) { - ret_trace = trace; - goto end; + return trace.get(); } } -end: - return ret_trace; -} - -static void lttng_live_destroy_trace(struct lttng_live_trace *trace) -{ - bt_logging_level log_level = trace->log_level; - bt_self_component *self_comp = trace->self_comp; - - BT_COMP_LOGD("Destroying live trace: trace-id=%" PRIu64, trace->id); - - BT_ASSERT(trace->stream_iterators); - g_ptr_array_free(trace->stream_iterators, TRUE); - - BT_TRACE_PUT_REF_AND_RESET(trace->trace); - BT_TRACE_CLASS_PUT_REF_AND_RESET(trace->trace_class); - - lttng_live_metadata_fini(trace); - g_free(trace); + return nullptr; } static struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session, uint64_t trace_id) { - struct lttng_live_trace *trace = NULL; - bt_logging_level log_level = session->log_level; - bt_self_component *self_comp = session->self_comp; - - BT_COMP_LOGD("Creating live trace: " - "session-id=%" PRIu64 ", trace-id=%" PRIu64, - session->id, trace_id); - trace = g_new0(struct lttng_live_trace, 1); - if (!trace) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate live trace"); - goto error; - } - trace->log_level = session->log_level; - trace->self_comp = session->self_comp; + BT_CPPLOGD_SPEC(session->logger, "Creating live trace: session-id={}, trace-id={}", session->id, + trace_id); + + auto trace = bt2s::make_unique(session->logger); + trace->session = session; trace->id = trace_id; - trace->trace_class = NULL; - trace->trace = NULL; - trace->stream_iterators = - g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_stream_iterator_destroy); - BT_ASSERT(trace->stream_iterators); trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED; - g_ptr_array_add(session->traces, trace); - goto end; -error: - g_free(trace); - trace = NULL; -end: - return trace; + const auto ret = trace.get(); + session->traces.emplace_back(std::move(trace)); + return ret; } -BT_HIDDEN struct lttng_live_trace * lttng_live_session_borrow_or_create_trace_by_id(struct lttng_live_session *session, uint64_t trace_id) { - struct lttng_live_trace *trace; - - trace = lttng_live_session_borrow_trace_by_id(session, trace_id); - if (trace) { - goto end; + if (lttng_live_trace *trace = lttng_live_session_borrow_trace_by_id(session, trace_id)) { + return trace; } /* The session is the owner of the newly created trace. */ - trace = lttng_live_create_trace(session, trace_id); - -end: - return trace; + return lttng_live_create_trace(session, trace_id); } -BT_HIDDEN int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint64_t session_id, const char *hostname, const char *session_name) { - int ret = 0; - struct lttng_live_session *session; - bt_logging_level log_level = lttng_live_msg_iter->log_level; - bt_self_component *self_comp = lttng_live_msg_iter->self_comp; + BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, + "Adding live session: " + "session-id={}, hostname=\"{}\", session-name=\"{}\"", + session_id, hostname, session_name); - BT_COMP_LOGD("Adding live session: " - "session-id=%" PRIu64 ", hostname=\"%s\" session-name=\"%s\"", - session_id, hostname, session_name); + auto session = bt2s::make_unique(lttng_live_msg_iter->logger); - session = g_new0(struct lttng_live_session, 1); - if (!session) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate live session"); - goto error; - } - - session->log_level = lttng_live_msg_iter->log_level; session->self_comp = lttng_live_msg_iter->self_comp; session->id = session_id; - session->traces = g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_trace); - BT_ASSERT(session->traces); session->lttng_live_msg_iter = lttng_live_msg_iter; session->new_streams_needed = true; - session->hostname = g_string_new(hostname); - BT_ASSERT(session->hostname); + session->hostname = hostname; + session->session_name = session_name; - session->session_name = g_string_new(session_name); - BT_ASSERT(session->session_name); + lttng_live_msg_iter->sessions.emplace_back(std::move(session)); - g_ptr_array_add(lttng_live_msg_iter->sessions, session); - goto end; -error: - g_free(session); - ret = -1; -end: - return ret; + return 0; } -static void lttng_live_destroy_session(struct lttng_live_session *session) +lttng_live_session::~lttng_live_session() { - bt_logging_level log_level; - bt_self_component *self_comp; - - if (!session) { - goto end; - } + BT_CPPLOGD_SPEC(this->logger, "Destroying live session: session-id={}, session-name=\"{}\"", + this->id, this->session_name); - log_level = session->log_level; - self_comp = session->self_comp; - BT_COMP_LOGD("Destroying live session: " - "session-id=%" PRIu64 ", session-name=\"%s\"", - session->id, session->session_name->str); - if (session->id != -1ULL) { - if (lttng_live_session_detach(session)) { - if (!lttng_live_graph_is_canceled(session->lttng_live_msg_iter)) { + if (this->id != -1ULL) { + if (lttng_live_session_detach(this)) { + if (!lttng_live_graph_is_canceled(this->lttng_live_msg_iter)) { /* Old relayd cannot detach sessions. */ - BT_COMP_LOGD("Unable to detach lttng live session %" PRIu64, session->id); + BT_CPPLOGD_SPEC(this->logger, "Unable to detach lttng live session {}", this->id); } } - session->id = -1ULL; - } - if (session->traces) { - g_ptr_array_free(session->traces, TRUE); + this->id = -1ULL; } - - if (session->hostname) { - g_string_free(session->hostname, TRUE); - } - if (session->session_name) { - g_string_free(session->session_name, TRUE); - } - g_free(session); - -end: - return; } -static void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter) +lttng_live_msg_iter::~lttng_live_msg_iter() { - if (!lttng_live_msg_iter) { - goto end; - } - - if (lttng_live_msg_iter->sessions) { - g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE); - } - - if (lttng_live_msg_iter->viewer_connection) { - live_viewer_connection_destroy(lttng_live_msg_iter->viewer_connection); - } - BT_ASSERT(lttng_live_msg_iter->lttng_live_comp); - BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter); + BT_ASSERT(this->lttng_live_comp); + BT_ASSERT(this->lttng_live_comp->has_msg_iter); /* All stream iterators must be destroyed at this point. */ - BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0); - lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false; - - g_free(lttng_live_msg_iter); - -end: - return; + BT_ASSERT(this->active_stream_iter == 0); + this->lttng_live_comp->has_msg_iter = false; } -BT_HIDDEN void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter) { - struct lttng_live_msg_iter *lttng_live_msg_iter; - - BT_ASSERT(self_msg_iter); - - lttng_live_msg_iter = - (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_iter); - BT_ASSERT(lttng_live_msg_iter); - lttng_live_msg_iter_destroy(lttng_live_msg_iter); + lttng_live_msg_iter::UP { + static_cast(bt_self_message_iterator_get_data(self_msg_iter))}; } static enum lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(struct lttng_live_stream_iterator *lttng_live_stream) { - bt_logging_level log_level = lttng_live_stream->log_level; - bt_self_component *self_comp = lttng_live_stream->self_comp; - switch (lttng_live_stream->state) { case LTTNG_LIVE_STREAM_QUIESCENT: case LTTNG_LIVE_STREAM_ACTIVE_DATA: break; case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA: /* Invalid state. */ - BT_COMP_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\""); + BT_CPPLOGF_SPEC(lttng_live_stream->logger, "Unexpected stream state \"ACTIVE_NO_DATA\""); bt_common_abort(); case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA: /* Invalid state. */ - BT_COMP_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\""); + BT_CPPLOGF_SPEC(lttng_live_stream->logger, "Unexpected stream state \"QUIESCENT_NO_DATA\""); bt_common_abort(); case LTTNG_LIVE_STREAM_EOF: break; @@ -370,27 +193,25 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_da struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator *lttng_live_stream) { - bt_logging_level log_level = lttng_live_msg_iter->log_level; - bt_self_component *self_comp = lttng_live_msg_iter->self_comp; enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK; enum lttng_live_stream_state orig_state = lttng_live_stream->state; struct packet_index index; if (lttng_live_stream->trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) { - BT_COMP_LOGD( - "Need to get an update for the metadata stream before proceeding further with this stream: " - "stream-name=\"%s\"", - lttng_live_stream->name->str); + BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, + "Need to get an update for the metadata stream before proceeding " + "further with this stream: stream-name=\"{}\"", + lttng_live_stream->name); ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; goto end; } if (lttng_live_stream->trace->session->new_streams_needed) { - BT_COMP_LOGD( - "Need to get an update of all streams before proceeding further with this stream: " - "stream-name=\"%s\"", - lttng_live_stream->name->str); + BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, + "Need to get an update of all streams before proceeding further " + "with this stream: stream-name=\"{}\"", + lttng_live_stream->name); ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; goto end; } @@ -412,10 +233,10 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_da if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA && last_inact_ts == curr_inact_ts) { /* - * Because the stream is in the QUIESCENT_NO_DATA - * state, we can assert that the last_inactivity_ts was - * set and can be safely used in the `if` above. - */ + * Because the stream is in the QUIESCENT_NO_DATA + * state, we can assert that the last_inactivity_ts was + * set and can be safely used in the `if` above. + */ BT_ASSERT(lttng_live_stream->last_inactivity_ts.is_set); ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; @@ -430,11 +251,12 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_da lttng_live_stream->offset = index.offset; lttng_live_stream->len = index.packet_size / CHAR_BIT; - BT_COMP_LOGD("Setting live stream reading info: stream-name=\"%s\", " - "viewer-stream-id=%" PRIu64 ", stream-base-offset=%" PRIu64 - ", stream-offset=%" PRIu64 ", stream-len=%" PRIu64, - lttng_live_stream->name->str, lttng_live_stream->viewer_stream_id, - lttng_live_stream->base_offset, lttng_live_stream->offset, lttng_live_stream->len); + BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, + "Setting live stream reading info: stream-name=\"{}\", " + "viewer-stream-id={}, stream-base-offset={}, stream-offset={}, stream-len={}", + lttng_live_stream->name, lttng_live_stream->viewer_stream_id, + lttng_live_stream->base_offset, lttng_live_stream->offset, + lttng_live_stream->len); end: if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) { @@ -454,13 +276,11 @@ static enum lttng_live_iterator_status lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_session *session) { - bt_logging_level log_level = lttng_live_msg_iter->log_level; - bt_self_component *self_comp = lttng_live_msg_iter->self_comp; enum lttng_live_iterator_status status; - uint64_t trace_idx; if (!session->attached) { - BT_COMP_LOGD("Attach to session: session-id=%" PRIu64, session->id); + BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Attach to session: session-id={}", + session->id); enum lttng_live_viewer_status attach_status = lttng_live_session_attach(session, lttng_live_msg_iter->self_msg_iter); if (attach_status != LTTNG_LIVE_VIEWER_STATUS_OK) { @@ -472,44 +292,66 @@ lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter, * cancelled. */ bt_current_thread_clear_error(); - status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + return LTTNG_LIVE_ITERATOR_STATUS_AGAIN; } else { - status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error attaching to LTTng live session"); + BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, + "Error attaching to LTTng live session"); + return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } - goto end; } } - BT_COMP_LOGD("Updating all streams and metadata for session: " - "session-id=%" PRIu64 ", session-name=\"%s\"", - session->id, session->session_name->str); + BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, + "Updating all data streams: session-id={}, session-name=\"{}\"", session->id, + session->session_name); status = lttng_live_session_get_new_streams(session, lttng_live_msg_iter->self_msg_iter); - if (status != LTTNG_LIVE_ITERATOR_STATUS_OK && status != LTTNG_LIVE_ITERATOR_STATUS_END) { - goto end; + switch (status) { + case LTTNG_LIVE_ITERATOR_STATUS_OK: + break; + case LTTNG_LIVE_ITERATOR_STATUS_END: + /* + * We received a `_END` from the `_get_new_streams()` function, + * which means no more data will ever be received from the data + * streams of this session. But it's possible that the metadata + * is incomplete. + * The live protocol guarantees that we receive all the + * metadata needed before we receive data streams needing it. + * But it's possible to receive metadata NOT needed by + * data streams after the session was closed. For example, this + * could happen if a new event is registered and the session is + * stopped before any tracepoint for that event is actually + * fired. + */ + BT_CPPLOGD_SPEC( + lttng_live_msg_iter->logger, + "Updating streams returned _END status. Override status to _OK in order fetch any remaining metadata:" + "session-id={}, session-name=\"{}\"", + session->id, session->session_name); + return LTTNG_LIVE_ITERATOR_STATUS_OK; + default: + return status; } - trace_idx = 0; - while (trace_idx < session->traces->len) { - struct lttng_live_trace *trace = - (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx); + BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, + "Updating metadata stream for session: session-id={}, session-name=\"{}\"", + session->id, session->session_name); - status = lttng_live_metadata_update(trace); + for (lttng_live_trace::UP& trace : session->traces) { + status = lttng_live_metadata_update(trace.get()); switch (status) { case LTTNG_LIVE_ITERATOR_STATUS_END: case LTTNG_LIVE_ITERATOR_STATUS_OK: - trace_idx++; break; case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE: case LTTNG_LIVE_ITERATOR_STATUS_AGAIN: - goto end; + return status; default: - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Error updating trace metadata: " - "stream-iter-status=%s, trace-id=%" PRIu64, - lttng_live_iterator_status_string(status), trace->id); - goto end; + BT_CPPLOGE_APPEND_CAUSE_SPEC( + lttng_live_msg_iter->logger, + "Error updating trace metadata: stream-iter-status={}, trace-id={}", status, + trace->id); + return status; } } @@ -517,32 +359,23 @@ lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter, * Now that we have the metadata we can initialize the downstream * iterator. */ - status = lttng_live_lazy_msg_init(session, lttng_live_msg_iter->self_msg_iter); - -end: - return status; + return lttng_live_lazy_msg_init(session, lttng_live_msg_iter->self_msg_iter); } static void lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter) { - uint64_t session_idx, trace_idx; - bt_logging_level log_level = lttng_live_msg_iter->log_level; - bt_self_component *self_comp = lttng_live_msg_iter->self_comp; - - for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) { - struct lttng_live_session *session = - (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx); - BT_COMP_LOGD("Force marking session as needing new streams: " - "session-id=%" PRIu64, - session->id); + for (const auto& session : lttng_live_msg_iter->sessions) { + BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, + "Force marking session as needing new streams: " + "session-id={}", + session->id); session->new_streams_needed = true; - for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) { - struct lttng_live_trace *trace = - (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx); - BT_COMP_LOGD("Force marking trace metadata state as needing an update: " - "session-id=%" PRIu64 ", trace-id=%" PRIu64, - session->id, trace->id); + for (lttng_live_trace::UP& trace : session->traces) { + BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, + "Force marking trace metadata state as needing an update: " + "session-id={}, trace-id={}", + session->id, trace->id); BT_ASSERT(trace->metadata_stream_state != LTTNG_LIVE_METADATA_STREAM_STATE_CLOSED); @@ -554,32 +387,30 @@ lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live static enum lttng_live_iterator_status lttng_live_iterator_handle_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter) { - enum lttng_live_iterator_status status; enum lttng_live_viewer_status viewer_status; - bt_logging_level log_level = lttng_live_msg_iter->log_level; - bt_self_component *self_comp = lttng_live_msg_iter->self_comp; - uint64_t session_idx = 0, nr_sessions_opened = 0; - struct lttng_live_session *session; + uint64_t nr_sessions_opened = 0; enum session_not_found_action sess_not_found_act = lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act; - BT_COMP_LOGD("Update data and metadata of all sessions: " - "live-msg-iter-addr=%p", - lttng_live_msg_iter); + BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, + "Update data and metadata of all sessions: " + "live-msg-iter-addr={}", + fmt::ptr(lttng_live_msg_iter)); /* * In a remotely distant future, we could add a "new * session" flag to the protocol, which would tell us that we * need to query for new sessions even though we have sessions * currently ongoing. */ - if (lttng_live_msg_iter->sessions->len == 0) { + if (lttng_live_msg_iter->sessions.empty()) { if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) { - BT_COMP_LOGD( + BT_CPPLOGD_SPEC( + lttng_live_msg_iter->logger, "No session found. Exiting in accordance with the `session-not-found-action` parameter"); - status = LTTNG_LIVE_ITERATOR_STATUS_END; - goto end; + return LTTNG_LIVE_ITERATOR_STATUS_END; } else { - BT_COMP_LOGD( + BT_CPPLOGD_SPEC( + lttng_live_msg_iter->logger, "No session found. Try creating a new one in accordance with the `session-not-found-action` parameter"); /* * Retry to create a viewer session for the requested @@ -588,31 +419,31 @@ lttng_live_iterator_handle_new_streams_and_metadata(struct lttng_live_msg_iter * viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter); if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { - status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Error creating LTTng live viewer session"); + BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, + "Error creating LTTng live viewer session"); + return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { - status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + return LTTNG_LIVE_ITERATOR_STATUS_AGAIN; } else { bt_common_abort(); } - goto end; } } } - for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) { - session = - (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx); - status = lttng_live_get_session(lttng_live_msg_iter, session); + for (const auto& session : lttng_live_msg_iter->sessions) { + const auto status = lttng_live_get_session(lttng_live_msg_iter, session.get()); switch (status) { case LTTNG_LIVE_ITERATOR_STATUS_OK: - break; case LTTNG_LIVE_ITERATOR_STATUS_END: - status = LTTNG_LIVE_ITERATOR_STATUS_OK; + /* + * A session returned `_END`. Other sessions may still + * be active so we override the status and continue + * looping if needed. + */ break; default: - goto end; + return status; } if (!session->closed) { nr_sessions_opened++; @@ -620,54 +451,41 @@ lttng_live_iterator_handle_new_streams_and_metadata(struct lttng_live_msg_iter * } if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE && nr_sessions_opened == 0) { - status = LTTNG_LIVE_ITERATOR_STATUS_END; + return LTTNG_LIVE_ITERATOR_STATUS_END; } else { - status = LTTNG_LIVE_ITERATOR_STATUS_OK; + return LTTNG_LIVE_ITERATOR_STATUS_OK; } - -end: - return status; } static enum lttng_live_iterator_status emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter, - struct lttng_live_stream_iterator *stream_iter, const bt_message **message, - uint64_t timestamp) + struct lttng_live_stream_iterator *stream_iter, + bt2::ConstMessage::Shared& message, uint64_t timestamp) { - enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK; - bt_logging_level log_level = lttng_live_msg_iter->log_level; - bt_self_component *self_comp = lttng_live_msg_iter->self_comp; - bt_message *msg = NULL; - BT_ASSERT(stream_iter->trace->clock_class); + BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, + "Emitting inactivity message for stream: ctf-stream-id={}, " + "viewer-stream-id={}, timestamp={}", + stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id, + timestamp); - BT_COMP_LOGD("Emitting inactivity message for stream: ctf-stream-id=%" PRIu64 - ", viewer-stream-id=%" PRIu64 ", timestamp=%" PRIu64, - stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id, timestamp); + const auto msg = bt_message_message_iterator_inactivity_create( + lttng_live_msg_iter->self_msg_iter, stream_iter->trace->clock_class, timestamp); - msg = bt_message_message_iterator_inactivity_create(lttng_live_msg_iter->self_msg_iter, - stream_iter->trace->clock_class, timestamp); if (!msg) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error emitting message iterator inactivity message"); - goto error; + BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, + "Error emitting message iterator inactivity message"); + return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } - *message = msg; -end: - return ret; - -error: - ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - bt_message_put_ref(msg); - goto end; + message = bt2::ConstMessage::Shared::createWithoutRef(msg); + return LTTNG_LIVE_ITERATOR_STATUS_OK; } static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream( struct lttng_live_msg_iter *lttng_live_msg_iter, - struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message) + struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message) { - enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK; - if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) { return LTTNG_LIVE_ITERATOR_STATUS_OK; } @@ -681,69 +499,51 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quies lttng_live_stream_iterator_set_state(lttng_live_stream, LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA); - ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; - goto end; + return LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; } - ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream, message, - lttng_live_stream->current_inactivity_ts); + const auto status = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream, message, + lttng_live_stream->current_inactivity_ts); + + if (status != LTTNG_LIVE_ITERATOR_STATUS_OK) { + return status; + } lttng_live_stream->last_inactivity_ts.value = lttng_live_stream->current_inactivity_ts; lttng_live_stream->last_inactivity_ts.is_set = true; -end: - return ret; + + return LTTNG_LIVE_ITERATOR_STATUS_OK; } -static int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter, - struct lttng_live_msg_iter *lttng_live_msg_iter, +static int live_get_msg_ts_ns(struct lttng_live_msg_iter *lttng_live_msg_iter, const bt_message *msg, int64_t last_msg_ts_ns, int64_t *ts_ns) { - const bt_clock_class *clock_class = NULL; const bt_clock_snapshot *clock_snapshot = NULL; int ret = 0; - bt_logging_level log_level = lttng_live_msg_iter->log_level; - bt_self_component *self_comp = lttng_live_msg_iter->self_comp; BT_ASSERT_DBG(msg); BT_ASSERT_DBG(ts_ns); - BT_COMP_LOGD("Getting message's timestamp: iter-data-addr=%p, msg-addr=%p, " - "last-msg-ts=%" PRId64, - lttng_live_msg_iter, msg, last_msg_ts_ns); + BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, + "Getting message's timestamp: iter-data-addr={}, msg-addr={}, " + "last-msg-ts={}", + fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns); switch (bt_message_get_type(msg)) { case BT_MESSAGE_TYPE_EVENT: - clock_class = bt_message_event_borrow_stream_class_default_clock_class_const(msg); - BT_ASSERT_DBG(clock_class); - clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(msg); break; case BT_MESSAGE_TYPE_PACKET_BEGINNING: - clock_class = - bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(msg); - BT_ASSERT(clock_class); - clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg); break; case BT_MESSAGE_TYPE_PACKET_END: - clock_class = bt_message_packet_end_borrow_stream_class_default_clock_class_const(msg); - BT_ASSERT(clock_class); - clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(msg); break; case BT_MESSAGE_TYPE_DISCARDED_EVENTS: - clock_class = - bt_message_discarded_events_borrow_stream_class_default_clock_class_const(msg); - BT_ASSERT(clock_class); - clock_snapshot = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg); break; case BT_MESSAGE_TYPE_DISCARDED_PACKETS: - clock_class = - bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(msg); - BT_ASSERT(clock_class); - clock_snapshot = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg); break; @@ -752,148 +552,119 @@ static int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter, break; default: /* All the other messages have a higher priority */ - BT_COMP_LOGD_STR("Message has no timestamp: using the last message timestamp."); + BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, + "Message has no timestamp, using the last message timestamp: " + "iter-data-addr={}, msg-addr={}, last-msg-ts={}, ts={}", + fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns, *ts_ns); *ts_ns = last_msg_ts_ns; - goto end; + return 0; } - clock_class = bt_clock_snapshot_borrow_clock_class_const(clock_snapshot); - BT_ASSERT_DBG(clock_class); - ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns); if (ret) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Cannot get nanoseconds from Epoch of clock snapshot: " - "clock-snapshot-addr=%p", - clock_snapshot); - goto error; + BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, + "Cannot get nanoseconds from Epoch of clock snapshot: " + "clock-snapshot-addr={}", + fmt::ptr(clock_snapshot)); + return -1; } - goto end; - -error: - ret = -1; - -end: - if (ret == 0) { - BT_COMP_LOGD("Found message's timestamp: " - "iter-data-addr=%p, msg-addr=%p, " - "last-msg-ts=%" PRId64 ", ts=%" PRId64, - lttng_live_msg_iter, msg, last_msg_ts_ns, *ts_ns); - } + BT_CPPLOGD_SPEC( + lttng_live_msg_iter->logger, + "Found message's timestamp: iter-data-addr={}, msg-addr={}, last-msg-ts={}, ts={}", + fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns, *ts_ns); - return ret; + return 0; } static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream( struct lttng_live_msg_iter *lttng_live_msg_iter, - struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message) + struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message) { - enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK; - bt_logging_level log_level = lttng_live_msg_iter->log_level; - bt_self_component *self_comp = lttng_live_msg_iter->self_comp; enum ctf_msg_iter_status status; - uint64_t session_idx, trace_idx; - - for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) { - struct lttng_live_session *session = - (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx); + for (const auto& session : lttng_live_msg_iter->sessions) { if (session->new_streams_needed) { - BT_COMP_LOGD("Need an update for streams: " - "session-id=%" PRIu64, - session->id); - ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; - goto end; + BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, + "Need an update for streams: session-id={}", session->id); + return LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; } - for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) { - struct lttng_live_trace *trace = - (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx); + for (lttng_live_trace::UP& trace : session->traces) { if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) { - BT_COMP_LOGD("Need an update for metadata stream: " - "session-id=%" PRIu64 ", trace-id=%" PRIu64, - session->id, trace->id); - ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; - goto end; + BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, + "Need an update for metadata stream: session-id={}, trace-id={}", + session->id, trace->id); + return LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; } } } if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) { - ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Invalid state of live stream iterator" - "stream-iter-status=%s", - lttng_live_stream_state_string(lttng_live_stream->state)); - goto end; + BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, + "Invalid state of live stream iterator: stream-iter-status={}", + lttng_live_stream->state); + return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } - status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter, message); + const bt_message *msg; + status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), &msg); switch (status) { case CTF_MSG_ITER_STATUS_EOF: - ret = LTTNG_LIVE_ITERATOR_STATUS_END; - break; + return LTTNG_LIVE_ITERATOR_STATUS_END; case CTF_MSG_ITER_STATUS_OK: - ret = LTTNG_LIVE_ITERATOR_STATUS_OK; - break; + message = bt2::ConstMessage::Shared::createWithoutRef(msg); + return LTTNG_LIVE_ITERATOR_STATUS_OK; case CTF_MSG_ITER_STATUS_AGAIN: /* * Continue immediately (end of packet). The next * get_index may return AGAIN to delay the following * attempt. */ - ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; - break; + return LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; case CTF_MSG_ITER_STATUS_ERROR: default: - ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "CTF message iterator failed to get next message: " - "msg-iter=%p, msg-iter-status=%s", - lttng_live_stream->msg_iter, ctf_msg_iter_status_string(status)); - break; + BT_CPPLOGE_APPEND_CAUSE_SPEC( + lttng_live_msg_iter->logger, + "CTF message iterator failed to get next message: msg-iter={}, msg-iter-status={}", + fmt::ptr(lttng_live_stream->msg_iter), status); + return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } - -end: - return ret; } static enum lttng_live_iterator_status lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator *stream_iter, - const bt_message **curr_msg) + bt2::ConstMessage::Shared& curr_msg) { - enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK; - bt_logging_level log_level = lttng_live_msg_iter->log_level; - bt_self_component *self_comp = lttng_live_msg_iter->self_comp; - - BT_COMP_LOGD("Closing live stream iterator: stream-name=\"%s\", " - "viewer-stream-id=%" PRIu64, - stream_iter->name->str, stream_iter->viewer_stream_id); + BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, + "Closing live stream iterator: stream-name=\"{}\", " + "viewer-stream-id={}", + stream_iter->name, stream_iter->viewer_stream_id); /* * The viewer has hung up on us so we are closing the stream. The * `ctf_msg_iter` should simply realize that it needs to close the * stream properly by emitting the necessary stream end message. */ + const bt_message *msg; enum ctf_msg_iter_status status = - ctf_msg_iter_get_next_message(stream_iter->msg_iter, curr_msg); + ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), &msg); if (status == CTF_MSG_ITER_STATUS_ERROR) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Error getting the next message from CTF message iterator"); - live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - goto end; + BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, + "Error getting the next message from CTF message iterator"); + return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } else if (status == CTF_MSG_ITER_STATUS_EOF) { - BT_COMP_LOGI("Reached the end of the live stream iterator."); - live_status = LTTNG_LIVE_ITERATOR_STATUS_END; - goto end; + BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger, + "Reached the end of the live stream iterator."); + return LTTNG_LIVE_ITERATOR_STATUS_END; } BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK); -end: - return live_status; + curr_msg = bt2::ConstMessage::Shared::createWithoutRef(msg); + + return LTTNG_LIVE_ITERATOR_STATUS_OK; } /* @@ -947,15 +718,14 @@ end: static enum lttng_live_iterator_status lttng_live_iterator_next_msg_on_stream(struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator *stream_iter, - const bt_message **curr_msg) + bt2::ConstMessage::Shared& curr_msg) { - bt_logging_level log_level = lttng_live_msg_iter->log_level; - bt_self_component *self_comp = lttng_live_msg_iter->self_comp; enum lttng_live_iterator_status live_status; - BT_COMP_LOGD("Advancing live stream iterator until next message if possible: " - "stream-name=\"%s\", viewer-stream-id=%" PRIu64, - stream_iter->name->str, stream_iter->viewer_stream_id); + BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, + "Advancing live stream iterator until next message if possible: " + "stream-name=\"{}\", viewer-stream-id={}", + stream_iter->name, stream_iter->viewer_stream_id); if (stream_iter->has_stream_hung_up) { /* @@ -997,46 +767,45 @@ retry: live_status = lttng_live_iterator_next_handle_one_quiescent_stream(lttng_live_msg_iter, stream_iter, curr_msg); if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { - BT_ASSERT(!*curr_msg); + BT_ASSERT(!curr_msg); goto end; } - if (*curr_msg) { + if (curr_msg) { goto end; } live_status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live_msg_iter, stream_iter, curr_msg); if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { - BT_ASSERT(!*curr_msg); + BT_ASSERT(!curr_msg); } end: if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) { - BT_COMP_LOGD("Ask the relay daemon for an updated view of the data and metadata streams"); + BT_CPPLOGD_SPEC( + lttng_live_msg_iter->logger, + "Ask the relay daemon for an updated view of the data and metadata streams"); goto retry; } - BT_COMP_LOGD("Returning from advancing live stream iterator: status=%s, " - "stream-name=\"%s\", viewer-stream-id=%" PRIu64, - lttng_live_iterator_status_string(live_status), stream_iter->name->str, - stream_iter->viewer_stream_id); + BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, + "Returning from advancing live stream iterator: status={}, " + "stream-name=\"{}\", viewer-stream-id={}", + live_status, stream_iter->name, stream_iter->viewer_stream_id); return live_status; } -static bool is_discarded_packet_or_event_message(const bt_message *msg) +static bool is_discarded_packet_or_event_message(const bt2::ConstMessage msg) { - const enum bt_message_type msg_type = bt_message_get_type(msg); - - return msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS || - msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS; + return msg.type() == bt2::MessageType::DiscardedEvents || + msg.type() == bt2::MessageType::DiscardedPackets; } static enum lttng_live_iterator_status adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream, - const bt_message *msg_in, bt_message **msg_out, + const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out, uint64_t new_begin_ts) { - enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK; enum bt_property_availability availability; const bt_clock_snapshot *clock_snapshot; uint64_t end_ts; @@ -1048,24 +817,23 @@ adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream availability = bt_message_discarded_packets_get_count(msg_in, &count); BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE); - *msg_out = bt_message_discarded_packets_create_with_default_clock_snapshots( + const auto msg = bt_message_discarded_packets_create_with_default_clock_snapshots( iter, stream, new_begin_ts, end_ts); - if (!*msg_out) { - status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM; - goto end; + + if (!msg) { + return LTTNG_LIVE_ITERATOR_STATUS_NOMEM; } - bt_message_discarded_packets_set_count(*msg_out, count); -end: - return status; + bt_message_discarded_packets_set_count(msg, count); + msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg); + return LTTNG_LIVE_ITERATOR_STATUS_OK; } static enum lttng_live_iterator_status adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream, - const bt_message *msg_in, bt_message **msg_out, + const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out, uint64_t new_begin_ts) { - enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK; enum bt_property_availability availability; const bt_clock_snapshot *clock_snapshot; uint64_t end_ts; @@ -1077,39 +845,36 @@ adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream availability = bt_message_discarded_events_get_count(msg_in, &count); BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE); - *msg_out = bt_message_discarded_events_create_with_default_clock_snapshots( + const auto msg = bt_message_discarded_events_create_with_default_clock_snapshots( iter, stream, new_begin_ts, end_ts); - if (!*msg_out) { - status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM; - goto end; + + if (!msg) { + return LTTNG_LIVE_ITERATOR_STATUS_NOMEM; } - bt_message_discarded_events_set_count(*msg_out, count); -end: - return status; + bt_message_discarded_events_set_count(msg, count); + msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg); + return LTTNG_LIVE_ITERATOR_STATUS_OK; } static enum lttng_live_iterator_status handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns, - const bt_message *late_msg) + const bt2::ConstMessage& late_msg) { - bt_self_component *self_comp = lttng_live_msg_iter->self_comp; - bt_logging_level log_level = lttng_live_msg_iter->log_level; const bt_clock_class *clock_class; const bt_stream_class *stream_class; enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status; int64_t last_inactivity_ts_ns; - enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK; enum lttng_live_iterator_status adjust_status; - bt_message *adjusted_message; + bt2::ConstMessage::Shared adjusted_message; /* * The timestamp of the current message is before the last message sent * by this component. We CANNOT send it as is. * * The only expected scenario in which that could happen is the - * following, everything else is a bug in this component, relay deamon, + * following, everything else is a bug in this component, relay daemon, * or CTF parser. * * Expected scenario: The CTF message iterator emitted discarded @@ -1127,92 +892,88 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter, * In short, the only scenario in which it's okay and fixable to * received a late message is when: * 1. the late message is a discarded packets or discarded events - * message, + * message, * 2. this stream produced an inactivity message downstream, and * 3. the timestamp of the late message is within the inactivity - * timespan we sent downstream through the inactivity message. + * timespan we sent downstream through the inactivity message. */ - BT_COMP_LOGD("Handling late message on live stream iterator: " - "stream-name=\"%s\", viewer-stream-id=%" PRIu64, - stream_iter->name->str, stream_iter->viewer_stream_id); + BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, + "Handling late message on live stream iterator: " + "stream-name=\"{}\", viewer-stream-id={}", + stream_iter->name, stream_iter->viewer_stream_id); if (!stream_iter->last_inactivity_ts.is_set) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Invalid live stream state: " - "have a late message when no inactivity message " - "was ever sent for that stream."); - stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - goto end; + BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, + "Invalid live stream state: " + "have a late message when no inactivity message " + "was ever sent for that stream."); + return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } if (!is_discarded_packet_or_event_message(late_msg)) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Invalid live stream state: " - "have a late message that is not a packet discarded or " - "event discarded message: late-msg-type=%s", - bt_common_message_type_string(bt_message_get_type(late_msg))); - stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - goto end; + BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, + "Invalid live stream state: " + "have a late message that is not a packet discarded or " + "event discarded message: late-msg-type={}", + late_msg.type()); + return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } - stream_class = bt_stream_borrow_class_const(stream_iter->stream); + stream_class = bt_stream_borrow_class_const(stream_iter->stream->libObjPtr()); clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class); ts_ns_status = bt_clock_class_cycles_to_ns_from_origin( clock_class, stream_iter->last_inactivity_ts.value, &last_inactivity_ts_ns); if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error converting last " - "inactivity message timestamp to nanoseconds"); - stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - goto end; + BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, + "Error converting last " + "inactivity message timestamp to nanoseconds"); + return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } if (last_inactivity_ts_ns <= late_msg_ts_ns) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Invalid live stream state: " - "have a late message that is none included in a stream " - "inactivity timespan: last-inactivity-ts-ns=%" PRIu64 - "late-msg-ts-ns=%" PRIu64, - last_inactivity_ts_ns, late_msg_ts_ns); - stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - goto end; + BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, + "Invalid live stream state: " + "have a late message that is none included in a stream " + "inactivity timespan: last-inactivity-ts-ns={}, " + "late-msg-ts-ns={}", + last_inactivity_ts_ns, late_msg_ts_ns); + return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } /* * We now know that it's okay for this message to be late, we can now * adjust its timestamp to ensure monotonicity. */ - BT_COMP_LOGD("Adjusting the timestamp of late message: late-msg-type=%s, " - "msg-new-ts-ns=%" PRIu64, - bt_common_message_type_string(bt_message_get_type(late_msg)), - stream_iter->last_inactivity_ts.value); - switch (bt_message_get_type(late_msg)) { - case BT_MESSAGE_TYPE_DISCARDED_EVENTS: + BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, + "Adjusting the timestamp of late message: late-msg-type={}, " + "msg-new-ts-ns={}", + late_msg.type(), stream_iter->last_inactivity_ts.value); + switch (late_msg.type()) { + case bt2::MessageType::DiscardedEvents: adjust_status = adjust_discarded_events_message( - lttng_live_msg_iter->self_msg_iter, stream_iter->stream, late_msg, &adjusted_message, - stream_iter->last_inactivity_ts.value); + lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(), + late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value); break; - case BT_MESSAGE_TYPE_DISCARDED_PACKETS: + case bt2::MessageType::DiscardedPackets: adjust_status = adjust_discarded_packets_message( - lttng_live_msg_iter->self_msg_iter, stream_iter->stream, late_msg, &adjusted_message, - stream_iter->last_inactivity_ts.value); + lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(), + late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value); break; default: bt_common_abort(); } if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { - stream_iter_status = adjust_status; - goto end; + return adjust_status; } BT_ASSERT_DBG(adjusted_message); stream_iter->current_msg = adjusted_message; stream_iter->current_msg_ts_ns = last_inactivity_ts_ns; - bt_message_put_ref(late_msg); -end: - return stream_iter_status; + return LTTNG_LIVE_ITERATOR_STATUS_OK; } static enum lttng_live_iterator_status @@ -1221,19 +982,15 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator **youngest_trace_stream_iter) { struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL; - bt_logging_level log_level = lttng_live_msg_iter->log_level; - bt_self_component *self_comp = lttng_live_msg_iter->self_comp; - enum lttng_live_iterator_status stream_iter_status; - ; int64_t youngest_candidate_msg_ts = INT64_MAX; uint64_t stream_iter_idx; BT_ASSERT_DBG(live_trace); - BT_ASSERT_DBG(live_trace->stream_iterators); - BT_COMP_LOGD("Finding the next stream iterator for trace: " - "trace-id=%" PRIu64, - live_trace->id); + BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, + "Finding the next stream iterator for trace: " + "trace-id={}", + live_trace->id); /* * Update the current message of every stream iterators of this trace. * The current msg of every stream must have a timestamp equal or @@ -1241,22 +998,21 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter, * ensure monotonicity. */ stream_iter_idx = 0; - while (stream_iter_idx < live_trace->stream_iterators->len) { + while (stream_iter_idx < live_trace->stream_iterators.size()) { bool stream_iter_is_ended = false; - struct lttng_live_stream_iterator *stream_iter = - (lttng_live_stream_iterator *) g_ptr_array_index(live_trace->stream_iterators, - stream_iter_idx); + lttng_live_stream_iterator *stream_iter = + live_trace->stream_iterators[stream_iter_idx].get(); /* * If there is no current message for this stream, go fetch * one. */ while (!stream_iter->current_msg) { - const bt_message *msg = NULL; + bt2::ConstMessage::Shared msg; int64_t curr_msg_ts_ns = INT64_MAX; - stream_iter_status = - lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, &msg); + const auto stream_iter_status = + lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, msg); if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) { stream_iter_is_ended = true; @@ -1264,21 +1020,21 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter, } if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { - goto end; + return stream_iter_status; } BT_ASSERT_DBG(msg); - BT_COMP_LOGD("Live stream iterator returned message: msg-type=%s, " - "stream-name=\"%s\", viewer-stream-id=%" PRIu64, - bt_common_message_type_string(bt_message_get_type(msg)), - stream_iter->name->str, stream_iter->viewer_stream_id); + BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, + "Live stream iterator returned message: msg-type={}, " + "stream-name=\"{}\", viewer-stream-id={}", + msg->type(), stream_iter->name, stream_iter->viewer_stream_id); /* * Get the timestamp in nanoseconds from origin of this - * messsage. + * message. */ - live_get_msg_ts_ns(stream_iter, lttng_live_msg_iter, msg, + live_get_msg_ts_ns(lttng_live_msg_iter, msg->libObjPtr(), lttng_live_msg_iter->last_msg_ts_ns, &curr_msg_ts_ns); /* @@ -1288,25 +1044,24 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter, * iterator. If not, we need to handle it with care. */ if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) { - stream_iter->current_msg = msg; + stream_iter->current_msg = std::move(msg); stream_iter->current_msg_ts_ns = curr_msg_ts_ns; } else { /* * We received a message from the past. This * may be fixable but it can also be an error. */ - stream_iter_status = - handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, msg); - if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Late message could not be handled correctly: " - "lttng-live-msg-iter-addr=%p, " - "stream-name=\"%s\", " - "curr-msg-ts=%" PRId64 ", last-msg-ts=%" PRId64, - lttng_live_msg_iter, stream_iter->name->str, - curr_msg_ts_ns, lttng_live_msg_iter->last_msg_ts_ns); - stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - goto end; + if (handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, *msg) != + LTTNG_LIVE_ITERATOR_STATUS_OK) { + BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, + "Late message could not be handled correctly: " + "lttng-live-msg-iter-addr={}, " + "stream-name=\"{}\", " + "curr-msg-ts={}, last-msg-ts={}", + fmt::ptr(lttng_live_msg_iter), stream_iter->name, + curr_msg_ts_ns, + lttng_live_msg_iter->last_msg_ts_ns); + return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } } } @@ -1330,7 +1085,8 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter, */ BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter); int ret = common_muxing_compare_messages( - stream_iter->current_msg, youngest_candidate_stream_iter->current_msg); + stream_iter->current_msg->libObjPtr(), + youngest_candidate_stream_iter->current_msg->libObjPtr()); if (ret < 0) { /* * The `youngest_candidate_stream_iter->current_msg` @@ -1344,11 +1100,12 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter, * Unable to pick which one should go * first. */ - BT_COMP_LOGW( + BT_CPPLOGW_SPEC( + lttng_live_msg_iter->logger, "Cannot deterministically pick next live stream message iterator because they have identical next messages: " - "stream-iter-addr=%p" - "stream-iter-addr=%p", - stream_iter, youngest_candidate_stream_iter); + "stream-iter-addr={}" + "stream-iter-addr={}", + fmt::ptr(stream_iter), fmt::ptr(youngest_candidate_stream_iter)); } } @@ -1363,24 +1120,21 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter, * removed element with the array's last * element. */ - g_ptr_array_remove_index_fast(live_trace->stream_iterators, stream_iter_idx); + bt2c::vectorFastRemove(live_trace->stream_iterators, stream_iter_idx); } } if (youngest_candidate_stream_iter) { *youngest_trace_stream_iter = youngest_candidate_stream_iter; - stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK; + return LTTNG_LIVE_ITERATOR_STATUS_OK; } else { /* * The only case where we don't have a candidate for this trace * is if we reached the end of all the iterators. */ - BT_ASSERT(live_trace->stream_iterators->len == 0); - stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END; + BT_ASSERT(live_trace->stream_iterators.empty()); + return LTTNG_LIVE_ITERATOR_STATUS_END; } - -end: - return stream_iter_status; } static enum lttng_live_iterator_status @@ -1388,16 +1142,15 @@ next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter struct lttng_live_session *session, struct lttng_live_stream_iterator **youngest_session_stream_iter) { - bt_self_component *self_comp = lttng_live_msg_iter->self_comp; - bt_logging_level log_level = lttng_live_msg_iter->log_level; enum lttng_live_iterator_status stream_iter_status; uint64_t trace_idx = 0; int64_t youngest_candidate_msg_ts = INT64_MAX; struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL; - BT_COMP_LOGD("Finding the next stream iterator for session: " - "session-id=%" PRIu64, - session->id); + BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, + "Finding the next stream iterator for session: " + "session-id={}", + session->id); /* * Make sure we are attached to the session and look for new streams * and metadata. @@ -1406,16 +1159,13 @@ next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK && stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE && stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) { - goto end; + return stream_iter_status; } - BT_ASSERT_DBG(session->traces); - - while (trace_idx < session->traces->len) { + while (trace_idx < session->traces.size()) { bool trace_is_ended = false; struct lttng_live_stream_iterator *stream_iter; - struct lttng_live_trace *trace = - (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx); + lttng_live_trace *trace = session->traces[trace_idx].get(); stream_iter_status = next_stream_iterator_for_trace(lttng_live_msg_iter, trace, &stream_iter); @@ -1426,7 +1176,7 @@ next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter */ trace_is_ended = true; } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { - goto end; + return stream_iter_status; } if (!trace_is_ended) { @@ -1442,7 +1192,8 @@ next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter * deterministic way. */ int ret = common_muxing_compare_messages( - stream_iter->current_msg, youngest_candidate_stream_iter->current_msg); + stream_iter->current_msg->libObjPtr(), + youngest_candidate_stream_iter->current_msg->libObjPtr()); if (ret < 0) { /* * The `youngest_candidate_stream_iter->current_msg` @@ -1453,26 +1204,27 @@ next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter youngest_candidate_stream_iter = stream_iter; } else if (ret == 0) { /* Unable to pick which one should go first. */ - BT_COMP_LOGW( + BT_CPPLOGW_SPEC( + lttng_live_msg_iter->logger, "Cannot deterministically pick next live stream message iterator because they have identical next messages: " - "stream-iter-addr=%p" - "stream-iter-addr=%p", - stream_iter, youngest_candidate_stream_iter); + "stream-iter-addr={}" + "youngest-candidate-stream-iter-addr={}", + fmt::ptr(stream_iter), fmt::ptr(youngest_candidate_stream_iter)); } } trace_idx++; } else { /* * trace_idx is not incremented since - * g_ptr_array_remove_index_fast replaces the + * vectorFastRemove replaces the * element at trace_idx with the array's last element. */ - g_ptr_array_remove_index_fast(session->traces, trace_idx); + bt2c::vectorFastRemove(session->traces, trace_idx); } } if (youngest_candidate_stream_iter) { *youngest_session_stream_iter = youngest_candidate_stream_iter; - stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK; + return LTTNG_LIVE_ITERATOR_STATUS_OK; } else { /* * The only cases where we don't have a candidate for this @@ -1483,11 +1235,9 @@ next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter * * In either cases, we return END. */ - BT_ASSERT(session->traces->len == 0); - stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END; + BT_ASSERT(session->traces.empty()); + return LTTNG_LIVE_ITERATOR_STATUS_END; } -end: - return stream_iter_status; } static inline void put_messages(bt_message_array_const msgs, uint64_t count) @@ -1499,380 +1249,366 @@ static inline void put_messages(bt_message_array_const msgs, uint64_t count) } } -BT_HIDDEN bt_message_iterator_class_next_method_status lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array_const msgs, uint64_t capacity, uint64_t *count) { - bt_message_iterator_class_next_method_status status; - enum lttng_live_viewer_status viewer_status; - struct lttng_live_msg_iter *lttng_live_msg_iter = - (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_it); - struct lttng_live_component *lttng_live = lttng_live_msg_iter->lttng_live_comp; - bt_self_component *self_comp = lttng_live_msg_iter->self_comp; - bt_logging_level log_level = lttng_live_msg_iter->log_level; - enum lttng_live_iterator_status stream_iter_status; - uint64_t session_idx; + try { + bt_message_iterator_class_next_method_status status; + enum lttng_live_viewer_status viewer_status; + struct lttng_live_msg_iter *lttng_live_msg_iter = + (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_it); + struct lttng_live_component *lttng_live = lttng_live_msg_iter->lttng_live_comp; + enum lttng_live_iterator_status stream_iter_status; + + *count = 0; - *count = 0; + BT_ASSERT_DBG(lttng_live_msg_iter); - BT_ASSERT_DBG(lttng_live_msg_iter); + if (G_UNLIKELY(lttng_live_msg_iter->was_interrupted)) { + /* + * The iterator was interrupted in a previous call to the + * `_next()` method. We currently do not support generating + * messages after such event. The babeltrace2 CLI should never + * be running the graph after being interrupted. So this check + * is to prevent other graph users from using this live + * iterator in an messed up internal state. + */ + BT_CPPLOGE_APPEND_CAUSE_SPEC( + lttng_live_msg_iter->logger, + "Message iterator was interrupted during a previous call to the `next()` and currently does not support continuing after such event."); + return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; + } - if (G_UNLIKELY(lttng_live_msg_iter->was_interrupted)) { /* - * The iterator was interrupted in a previous call to the - * `_next()` method. We currently do not support generating - * messages after such event. The babeltrace2 CLI should never - * be running the graph after being interrupted. So this check - * is to prevent other graph users from using this live - * iterator in an messed up internal state. + * Clear all the invalid message reference that might be left over in + * the output array. */ - status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; - BT_COMP_LOGE_APPEND_CAUSE( - self_comp, - "Message iterator was interrupted during a previous call to the `next()` and currently does not support continuing after such event."); - goto end; - } - - /* - * Clear all the invalid message reference that might be left over in - * the output array. - */ - memset(msgs, 0, capacity * sizeof(*msgs)); + memset(msgs, 0, capacity * sizeof(*msgs)); - /* - * If no session are exposed on the relay found at the url provided by - * the user, session count will be 0. In this case, we return status - * end to return gracefully. - */ - if (lttng_live_msg_iter->sessions->len == 0) { - if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) { - status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END; - goto end; - } else { - /* - * The are no more active session for this session - * name. Retry to create a viewer session for the - * requested session name. - */ - viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter); - if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { - if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { - status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Error creating LTTng live viewer session"); - } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { - status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN; - } else { - bt_common_abort(); + /* + * If no session are exposed on the relay found at the url provided by + * the user, session count will be 0. In this case, we return status + * end to return gracefully. + */ + if (lttng_live_msg_iter->sessions.empty()) { + if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) { + return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END; + } else { + /* + * The are no more active session for this session + * name. Retry to create a viewer session for the + * requested session name. + */ + viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { + BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, + "Error creating LTTng live viewer session"); + return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; + } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { + return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN; + } else { + bt_common_abort(); + } } - goto end; } } - } - if (lttng_live_msg_iter->active_stream_iter == 0) { - lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter); - } + if (lttng_live_msg_iter->active_stream_iter == 0) { + lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter); + } - /* - * Here the muxing of message is done. - * - * We need to iterate over all the streams of all the traces of all the - * viewer sessions in order to get the message with the smallest - * timestamp. In this case, a session is a viewer session and there is - * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or - * kernel). Each viewer session can have multiple traces, for example, - * 64bit UST viewer sessions could have multiple per-pid traces. - * - * We iterate over the streams of each traces to update and see what is - * their next message's timestamp. From those timestamps, we select the - * message with the smallest timestamp as the best candidate message - * for that trace and do the same thing across all the sessions. - * - * We then compare the timestamp of best candidate message of all the - * sessions to pick the message with the smallest timestamp and we - * return it. - */ - while (*count < capacity) { - struct lttng_live_stream_iterator *youngest_stream_iter = NULL, - *candidate_stream_iter = NULL; - int64_t youngest_msg_ts_ns = INT64_MAX; - - BT_ASSERT_DBG(lttng_live_msg_iter->sessions); - session_idx = 0; - while (session_idx < lttng_live_msg_iter->sessions->len) { - struct lttng_live_session *session = (lttng_live_session *) g_ptr_array_index( - lttng_live_msg_iter->sessions, session_idx); - - /* Find the best candidate message to send downstream. */ - stream_iter_status = next_stream_iterator_for_session(lttng_live_msg_iter, session, - &candidate_stream_iter); - - /* If we receive an END status, it means that either: - * - Those traces never had active streams (UST with no - * data produced yet), - * - All live stream iterators have ENDed.*/ - if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) { - if (session->closed && session->traces->len == 0) { - /* - * Remove the session from the list. - * session_idx is not modified since - * g_ptr_array_remove_index_fast - * replaces the the removed element with - * the array's last element. - */ - g_ptr_array_remove_index_fast(lttng_live_msg_iter->sessions, session_idx); - } else { - session_idx++; + /* + * Here the muxing of message is done. + * + * We need to iterate over all the streams of all the traces of all the + * viewer sessions in order to get the message with the smallest + * timestamp. In this case, a session is a viewer session and there is + * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or + * kernel). Each viewer session can have multiple traces, for example, + * 64bit UST viewer sessions could have multiple per-pid traces. + * + * We iterate over the streams of each traces to update and see what is + * their next message's timestamp. From those timestamps, we select the + * message with the smallest timestamp as the best candidate message + * for that trace and do the same thing across all the sessions. + * + * We then compare the timestamp of best candidate message of all the + * sessions to pick the message with the smallest timestamp and we + * return it. + */ + while (*count < capacity) { + struct lttng_live_stream_iterator *youngest_stream_iter = NULL, + *candidate_stream_iter = NULL; + int64_t youngest_msg_ts_ns = INT64_MAX; + + uint64_t session_idx = 0; + while (session_idx < lttng_live_msg_iter->sessions.size()) { + lttng_live_session *session = lttng_live_msg_iter->sessions[session_idx].get(); + + /* Find the best candidate message to send downstream. */ + stream_iter_status = next_stream_iterator_for_session(lttng_live_msg_iter, session, + &candidate_stream_iter); + + /* If we receive an END status, it means that either: + * - Those traces never had active streams (UST with no + * data produced yet), + * - All live stream iterators have ENDed. + */ + if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) { + if (session->closed && session->traces.empty()) { + /* + * Remove the session from the list. + * session_idx is not modified since + * g_ptr_array_remove_index_fast + * replaces the the removed element with + * the array's last element. + */ + bt2c::vectorFastRemove(lttng_live_msg_iter->sessions, session_idx); + } else { + session_idx++; + } + continue; } - continue; - } - if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { - goto return_status; - } + if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { + goto return_status; + } - if (G_UNLIKELY(youngest_stream_iter == NULL) || - candidate_stream_iter->current_msg_ts_ns < youngest_msg_ts_ns) { - youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns; - youngest_stream_iter = candidate_stream_iter; - } else if (candidate_stream_iter->current_msg_ts_ns == youngest_msg_ts_ns) { - /* - * The currently selected message to be sent - * downstream next has the exact same timestamp - * that of the current candidate message. We - * must break the tie in a predictable manner. - */ - BT_COMP_LOGD_STR( - "Two of the next message candidates have the same timestamps, pick one deterministically."); - /* - * Order the messages in an arbitrary but - * deterministic way. - */ - int ret = common_muxing_compare_messages(candidate_stream_iter->current_msg, - youngest_stream_iter->current_msg); - if (ret < 0) { - /* - * The `candidate_stream_iter->current_msg` - * should go first. Update the next - * iterator and the current timestamp. - */ + if (G_UNLIKELY(youngest_stream_iter == NULL) || + candidate_stream_iter->current_msg_ts_ns < youngest_msg_ts_ns) { youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns; youngest_stream_iter = candidate_stream_iter; - } else if (ret == 0) { - /* Unable to pick which one should go first. */ - BT_COMP_LOGW( - "Cannot deterministically pick next live stream message iterator because they have identical next messages: " - "next-stream-iter-addr=%p" - "candidate-stream-iter-addr=%p", - youngest_stream_iter, candidate_stream_iter); + } else if (candidate_stream_iter->current_msg_ts_ns == youngest_msg_ts_ns) { + /* + * The currently selected message to be sent + * downstream next has the exact same timestamp + * that of the current candidate message. We + * must break the tie in a predictable manner. + */ + BT_CPPLOGD_SPEC( + lttng_live_msg_iter->logger, + "Two of the next message candidates have the same timestamps, pick one deterministically."); + /* + * Order the messages in an arbitrary but + * deterministic way. + */ + int ret = common_muxing_compare_messages( + candidate_stream_iter->current_msg->libObjPtr(), + youngest_stream_iter->current_msg->libObjPtr()); + if (ret < 0) { + /* + * The `candidate_stream_iter->current_msg` + * should go first. Update the next + * iterator and the current timestamp. + */ + youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns; + youngest_stream_iter = candidate_stream_iter; + } else if (ret == 0) { + /* Unable to pick which one should go first. */ + BT_CPPLOGW_SPEC( + lttng_live_msg_iter->logger, + "Cannot deterministically pick next live stream message iterator because they have identical next messages: " + "next-stream-iter-addr={}" + "candidate-stream-iter-addr={}", + fmt::ptr(youngest_stream_iter), fmt::ptr(candidate_stream_iter)); + } } - } - session_idx++; - } + session_idx++; + } - if (!youngest_stream_iter) { - stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; - goto return_status; - } + if (!youngest_stream_iter) { + stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + goto return_status; + } - BT_ASSERT_DBG(youngest_stream_iter->current_msg); - /* Ensure monotonicity. */ - BT_ASSERT_DBG(lttng_live_msg_iter->last_msg_ts_ns <= - youngest_stream_iter->current_msg_ts_ns); + BT_ASSERT_DBG(youngest_stream_iter->current_msg); + /* Ensure monotonicity. */ + BT_ASSERT_DBG(lttng_live_msg_iter->last_msg_ts_ns <= + youngest_stream_iter->current_msg_ts_ns); - /* - * Insert the next message to the message batch. This will set - * stream iterator current messsage to NULL so that next time - * we fetch the next message of that stream iterator - */ - BT_MESSAGE_MOVE_REF(msgs[*count], youngest_stream_iter->current_msg); - (*count)++; + /* + * Insert the next message to the message batch. This will set + * stream iterator current message to NULL so that next time + * we fetch the next message of that stream iterator + */ + msgs[*count] = youngest_stream_iter->current_msg.release().libObjPtr(); + (*count)++; - /* Update the last timestamp in nanoseconds sent downstream. */ - lttng_live_msg_iter->last_msg_ts_ns = youngest_msg_ts_ns; - youngest_stream_iter->current_msg_ts_ns = INT64_MAX; + /* Update the last timestamp in nanoseconds sent downstream. */ + lttng_live_msg_iter->last_msg_ts_ns = youngest_msg_ts_ns; + youngest_stream_iter->current_msg_ts_ns = INT64_MAX; - stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK; - } + stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK; + } return_status: - switch (stream_iter_status) { - case LTTNG_LIVE_ITERATOR_STATUS_OK: - case LTTNG_LIVE_ITERATOR_STATUS_AGAIN: - /* - * If we gathered messages, return _OK even if the graph was - * interrupted. This allows for the components downstream to at - * least get the thoses messages. If the graph was indeed - * interrupted there should not be another _next() call as the - * application will tear down the graph. This component class - * doesn't support restarting after an interruption. - */ - if (*count > 0) { - status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; - } else { - status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN; + switch (stream_iter_status) { + case LTTNG_LIVE_ITERATOR_STATUS_OK: + case LTTNG_LIVE_ITERATOR_STATUS_AGAIN: + /* + * If we gathered messages, return _OK even if the graph was + * interrupted. This allows for the components downstream to at + * least get the those messages. If the graph was indeed + * interrupted there should not be another _next() call as the + * application will tear down the graph. This component class + * doesn't support restarting after an interruption. + */ + if (*count > 0) { + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; + } else { + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN; + } + break; + case LTTNG_LIVE_ITERATOR_STATUS_END: + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END; + break; + case LTTNG_LIVE_ITERATOR_STATUS_NOMEM: + BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, + "Memory error preparing the next batch of messages: " + "live-iter-status={}", + stream_iter_status); + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR; + break; + case LTTNG_LIVE_ITERATOR_STATUS_ERROR: + case LTTNG_LIVE_ITERATOR_STATUS_INVAL: + case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED: + BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, + "Error preparing the next batch of messages: " + "live-iter-status={}", + stream_iter_status); + + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; + /* Put all existing messages on error. */ + put_messages(msgs, *count); + break; + default: + bt_common_abort(); } - break; - case LTTNG_LIVE_ITERATOR_STATUS_END: - status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END; - break; - case LTTNG_LIVE_ITERATOR_STATUS_NOMEM: - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Memory error preparing the next batch of messages: " - "live-iter-status=%s", - lttng_live_iterator_status_string(stream_iter_status)); - status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR; - break; - case LTTNG_LIVE_ITERATOR_STATUS_ERROR: - case LTTNG_LIVE_ITERATOR_STATUS_INVAL: - case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED: - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Error preparing the next batch of messages: " - "live-iter-status=%s", - lttng_live_iterator_status_string(stream_iter_status)); - - status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; - /* Put all existing messages on error. */ - put_messages(msgs, *count); - break; - default: - bt_common_abort(); - } -end: - return status; + return status; + } catch (const std::bad_alloc&) { + return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR; + } catch (const bt2::Error&) { + return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; + } } -static struct lttng_live_msg_iter * +static lttng_live_msg_iter::UP lttng_live_msg_iter_create(struct lttng_live_component *lttng_live_comp, bt_self_message_iterator *self_msg_it) { - bt_self_component *self_comp = lttng_live_comp->self_comp; - bt_logging_level log_level = lttng_live_comp->log_level; - - struct lttng_live_msg_iter *lttng_live_msg_iter = g_new0(struct lttng_live_msg_iter, 1); - if (!lttng_live_msg_iter) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate lttng_live_msg_iter"); - goto end; - } - - lttng_live_msg_iter->log_level = lttng_live_comp->log_level; - lttng_live_msg_iter->self_comp = lttng_live_comp->self_comp; - lttng_live_msg_iter->lttng_live_comp = lttng_live_comp; - lttng_live_msg_iter->self_msg_iter = self_msg_it; - - lttng_live_msg_iter->active_stream_iter = 0; - lttng_live_msg_iter->last_msg_ts_ns = INT64_MIN; - lttng_live_msg_iter->was_interrupted = false; + auto msg_iter = bt2s::make_unique(lttng_live_comp->logger); - lttng_live_msg_iter->sessions = - g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_session); - BT_ASSERT(lttng_live_msg_iter->sessions); + msg_iter->self_comp = lttng_live_comp->self_comp; + msg_iter->lttng_live_comp = lttng_live_comp; + msg_iter->self_msg_iter = self_msg_it; + msg_iter->active_stream_iter = 0; + msg_iter->last_msg_ts_ns = INT64_MIN; + msg_iter->was_interrupted = false; -end: - return lttng_live_msg_iter; + return msg_iter; } -BT_HIDDEN bt_message_iterator_class_initialize_method_status lttng_live_msg_iter_init(bt_self_message_iterator *self_msg_it, - bt_self_message_iterator_configuration *config, - bt_self_component_port_output *self_port) + bt_self_message_iterator_configuration *, bt_self_component_port_output *) { - bt_message_iterator_class_initialize_method_status status; - struct lttng_live_component *lttng_live; - struct lttng_live_msg_iter *lttng_live_msg_iter; - enum lttng_live_viewer_status viewer_status; - bt_logging_level log_level; - bt_self_component *self_comp = bt_self_message_iterator_borrow_component(self_msg_it); + try { + struct lttng_live_component *lttng_live; + enum lttng_live_viewer_status viewer_status; + bt_self_component *self_comp = bt_self_message_iterator_borrow_component(self_msg_it); + + lttng_live = (lttng_live_component *) bt_self_component_get_data(self_comp); + + /* There can be only one downstream iterator at the same time. */ + BT_ASSERT(!lttng_live->has_msg_iter); + lttng_live->has_msg_iter = true; + + auto lttng_live_msg_iter = lttng_live_msg_iter_create(lttng_live, self_msg_it); + if (!lttng_live_msg_iter) { + BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live->logger, + "Failed to create lttng_live_msg_iter"); + return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; + } - lttng_live = (lttng_live_component *) bt_self_component_get_data(self_comp); - log_level = lttng_live->log_level; - self_comp = lttng_live->self_comp; + viewer_status = live_viewer_connection_create( + lttng_live->params.url.c_str(), false, lttng_live_msg_iter.get(), + lttng_live_msg_iter->logger, lttng_live_msg_iter->viewer_connection); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { + BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, + "Failed to create viewer connection"); + } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { + /* + * Interruption in the _iter_init() method is not + * supported. Return an error. + */ + BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, + "Interrupted while creating viewer connection"); + } - /* There can be only one downstream iterator at the same time. */ - BT_ASSERT(!lttng_live->has_msg_iter); - lttng_live->has_msg_iter = true; + return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR; + } - lttng_live_msg_iter = lttng_live_msg_iter_create(lttng_live, self_msg_it); - if (!lttng_live_msg_iter) { - status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; - BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to create lttng_live_msg_iter"); - goto error; - } + viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter.get()); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { + BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, + "Failed to create viewer session"); + } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { + /* + * Interruption in the _iter_init() method is not + * supported. Return an error. + */ + BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, + "Interrupted when creating viewer session"); + } - viewer_status = live_viewer_connection_create( - self_comp, NULL, log_level, lttng_live->params.url->str, false, lttng_live_msg_iter, - <tng_live_msg_iter->viewer_connection); - if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { - if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to create viewer connection"); - } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { - /* - * Interruption in the _iter_init() method is not - * supported. Return an error. - */ - BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Interrupted while creating viewer connection"); + return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR; } - goto error; - } - viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter); - if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { - if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to create viewer session"); - } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { - /* - * Interruption in the _iter_init() method is not - * supported. Return an error. - */ - BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Interrupted when creating viewer session"); + if (lttng_live_msg_iter->sessions.empty()) { + switch (lttng_live->params.sess_not_found_act) { + case SESSION_NOT_FOUND_ACTION_CONTINUE: + BT_CPPLOGI_SPEC( + lttng_live_msg_iter->logger, + "Unable to connect to the requested live viewer session. " + "Keep trying to connect because of {}=\"{}\" component parameter: url=\"{}\"", + SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR, + lttng_live->params.url); + break; + case SESSION_NOT_FOUND_ACTION_FAIL: + BT_CPPLOGE_APPEND_CAUSE_SPEC( + lttng_live_msg_iter->logger, + "Unable to connect to the requested live viewer session. " + "Fail the message iterator initialization because of {}=\"{}\" " + "component parameter: url =\"{}\"", + SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_FAIL_STR, + lttng_live->params.url); + return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR; + case SESSION_NOT_FOUND_ACTION_END: + BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger, + "Unable to connect to the requested live viewer session. " + "End gracefully at the first _next() call because of {}=\"{}\"" + " component parameter: url=\"{}\"", + SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_END_STR, + lttng_live->params.url); + break; + default: + bt_common_abort(); + } } - goto error; - } - if (lttng_live_msg_iter->sessions->len == 0) { - switch (lttng_live->params.sess_not_found_act) { - case SESSION_NOT_FOUND_ACTION_CONTINUE: - BT_COMP_LOGI( - "Unable to connect to the requested live viewer session. Keep trying to connect because of " - "%s=\"%s\" component parameter: url=\"%s\"", - SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR, - lttng_live->params.url->str); - break; - case SESSION_NOT_FOUND_ACTION_FAIL: - BT_COMP_LOGE_APPEND_CAUSE( - self_comp, - "Unable to connect to the requested live viewer session. Fail the message iterator initialization because of %s=\"%s\" " - "component parameter: url =\"%s\"", - SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_FAIL_STR, - lttng_live->params.url->str); - goto error; - case SESSION_NOT_FOUND_ACTION_END: - BT_COMP_LOGI( - "Unable to connect to the requested live viewer session. End gracefully at the first _next() " - "call because of %s=\"%s\" component parameter: " - "url=\"%s\"", - SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_END_STR, - lttng_live->params.url->str); - break; - default: - bt_common_abort(); - } + bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter.release()); + return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK; + } catch (const std::bad_alloc&) { + return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; + } catch (const bt2::Error&) { + return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR; } - - bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter); - status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK; - goto end; - -error: - status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR; - lttng_live_msg_iter_destroy(lttng_live_msg_iter); -end: - return status; } static struct bt_param_validation_map_value_entry_descr list_sessions_params[] = { @@ -1880,191 +1616,128 @@ static struct bt_param_validation_map_value_entry_descr list_sessions_params[] = bt_param_validation_value_descr::makeString()}, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END}; -static bt_component_class_query_method_status -lttng_live_query_list_sessions(const bt_value *params, const bt_value **result, - bt_self_component_class *self_comp_class, bt_logging_level log_level) +static bt2::Value::Shared lttng_live_query_list_sessions(const bt2::ConstMapValue params, + const bt2c::Logger& logger) { - bt_component_class_query_method_status status; - const bt_value *url_value = NULL; const char *url; - struct live_viewer_connection *viewer_connection = NULL; + live_viewer_connection::UP viewer_connection; enum lttng_live_viewer_status viewer_status; enum bt_param_validation_status validation_status; gchar *validate_error = NULL; - validation_status = bt_param_validation_validate(params, list_sessions_params, &validate_error); + validation_status = + bt_param_validation_validate(params.libObjPtr(), list_sessions_params, &validate_error); if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) { - status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR; - goto error; + throw bt2c::MemoryError {}; } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) { - status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; - BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "%s", validate_error); - goto error; + bt2c::GCharUP errorFreer {validate_error}; + + BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(logger, bt2::Error, "{}", validate_error); } - url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM); - url = bt_value_string_get(url_value); + url = params[URL_PARAM]->asString().value(); - viewer_status = live_viewer_connection_create(NULL, self_comp_class, log_level, url, true, NULL, - &viewer_connection); + viewer_status = live_viewer_connection_create(url, true, NULL, logger, viewer_connection); if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { - BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Failed to create viewer connection"); - status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; + BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(logger, bt2::Error, + "Failed to create viewer connection"); } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { - status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN; + throw bt2c::TryAgain {}; } else { bt_common_abort(); } - goto error; - } - - status = live_viewer_connection_list_sessions(viewer_connection, result); - if (status != BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK) { - BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Failed to list viewer sessions"); - goto error; - } - - goto end; - -error: - BT_VALUE_PUT_REF_AND_RESET(*result); - - if (status >= 0) { - status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; - } - -end: - if (viewer_connection) { - live_viewer_connection_destroy(viewer_connection); } - g_free(validate_error); - - return status; + return live_viewer_connection_list_sessions(viewer_connection.get()); } -static bt_component_class_query_method_status -lttng_live_query_support_info(const bt_value *params, const bt_value **result, - bt_self_component_class *self_comp_class, bt_logging_level log_level) +static bt2::Value::Shared lttng_live_query_support_info(const bt2::ConstMapValue params, + const bt2c::Logger& logger) { - bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK; - const bt_value *input_type_value; - const bt_value *input_value; - double weight = 0; - struct bt_common_lttng_live_url_parts parts = {0}; + struct bt_common_lttng_live_url_parts parts = {}; + bt_common_lttng_live_url_parts_deleter partsDeleter {parts}; /* Used by the logging macros */ __attribute__((unused)) bt_self_component *self_comp = NULL; - *result = NULL; - input_type_value = bt_value_map_borrow_entry_value_const(params, "type"); - if (!input_type_value) { - BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Missing expected `type` parameter."); - goto error; + const auto typeValue = params["type"]; + if (!typeValue) { + BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(logger, bt2::Error, + "Missing expected `type` parameter."); } - if (!bt_value_is_string(input_type_value)) { - BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "`type` parameter is not a string value."); - goto error; + if (!typeValue->isString()) { + BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(logger, bt2::Error, + "`type` parameter is not a string value."); } - if (strcmp(bt_value_string_get(input_type_value), "string") != 0) { + if (strcmp(typeValue->asString().value(), "string") != 0) { /* We don't handle file system paths */ - goto create_result; + return bt2::RealValue::create(); } - input_value = bt_value_map_borrow_entry_value_const(params, "input"); - if (!input_value) { - BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Missing expected `input` parameter."); - goto error; + const auto inputValue = params["input"]; + if (!inputValue) { + BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(logger, bt2::Error, + "Missing expected `input` parameter."); } - if (!bt_value_is_string(input_value)) { - BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, - "`input` parameter is not a string value."); - goto error; + if (!inputValue->isString()) { + BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(logger, bt2::Error, + "`input` parameter is not a string value."); } - parts = bt_common_parse_lttng_live_url(bt_value_string_get(input_value), NULL, 0); + parts = bt_common_parse_lttng_live_url(inputValue->asString().value(), NULL, 0); if (parts.session_name) { /* * Looks pretty much like an LTTng live URL: we got the * session name part, which forms a complete URL. */ - weight = .75; + return bt2::RealValue::create(.75); } -create_result: - *result = bt_value_real_create_init(weight); - if (!*result) { - status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR; - goto error; - } - - goto end; - -error: - if (status >= 0) { - status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; - } - - BT_ASSERT(!*result); - -end: - bt_common_destroy_lttng_live_url_parts(&parts); - return status; + return bt2::RealValue::create(); } -BT_HIDDEN bt_component_class_query_method_status lttng_live_query(bt_self_component_class_source *comp_class, bt_private_query_executor *priv_query_exec, const char *object, const bt_value *params, __attribute__((unused)) void *method_data, const bt_value **result) { - bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK; - bt_self_component *self_comp = NULL; - bt_self_component_class *self_comp_class = - bt_self_component_class_source_as_self_component_class(comp_class); - bt_logging_level log_level = bt_query_executor_get_logging_level( - bt_private_query_executor_as_query_executor_const(priv_query_exec)); - - if (strcmp(object, "sessions") == 0) { - status = lttng_live_query_list_sessions(params, result, self_comp_class, log_level); - } else if (strcmp(object, "babeltrace.support-info") == 0) { - status = lttng_live_query_support_info(params, result, self_comp_class, log_level); - } else { - BT_COMP_LOGI("Unknown query object `%s`", object); - status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT; - goto end; - } + try { + bt2c::Logger logger {bt2::SelfComponentClass {comp_class}, + bt2::PrivateQueryExecutor {priv_query_exec}, + "PLUGIN/SRC.CTF.LTTNG-LIVE/QUERY"}; + const bt2::ConstMapValue paramsObj(params); + bt2::Value::Shared resultObj; + + if (strcmp(object, "sessions") == 0) { + resultObj = lttng_live_query_list_sessions(paramsObj, logger); + } else if (strcmp(object, "babeltrace.support-info") == 0) { + resultObj = lttng_live_query_support_info(paramsObj, logger); + } else { + BT_CPPLOGI_SPEC(logger, "Unknown query object `{}`", object); + return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT; + } -end: - return status; -} + *result = resultObj.release().libObjPtr(); -static void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live) -{ - if (!lttng_live) { - return; + return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK; + } catch (const bt2c::TryAgain&) { + return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR; + } catch (const std::bad_alloc&) { + return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR; + } catch (const bt2::Error&) { + return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; } - if (lttng_live->params.url) { - g_string_free(lttng_live->params.url, TRUE); - } - g_free(lttng_live); } -BT_HIDDEN void lttng_live_component_finalize(bt_self_component_source *component) { - lttng_live_component *data = (lttng_live_component *) bt_self_component_get_data( - bt_self_component_source_as_self_component(component)); - - if (!data) { - return; - } - lttng_live_component_destroy_data(data); + lttng_live_component::UP {static_cast( + bt_self_component_get_data(bt_self_component_source_as_self_component(component)))}; } static enum session_not_found_action @@ -2102,101 +1775,76 @@ static struct bt_param_validation_map_value_entry_descr params_descr[] = { BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END}; static bt_component_class_initialize_method_status -lttng_live_component_create(const bt_value *params, bt_logging_level log_level, - bt_self_component *self_comp, struct lttng_live_component **component) +lttng_live_component_create(const bt_value *params, bt_self_component_source *self_comp, + lttng_live_component::UP& component) { - struct lttng_live_component *lttng_live = NULL; const bt_value *inputs_value; const bt_value *url_value; const bt_value *value; - const char *url; enum bt_param_validation_status validation_status; gchar *validation_error = NULL; - bt_component_class_initialize_method_status status; + bt2c::Logger logger {bt2::SelfSourceComponent {self_comp}, "PLUGIN/SRC.CTF.LTTNG-LIVE/COMP"}; validation_status = bt_param_validation_validate(params, params_descr, &validation_error); if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) { - status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; - goto error; + return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, "%s", validation_error); - status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR; - goto error; + bt2c::GCharUP errorFreer {validation_error}; + BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "{}", validation_error); + return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR; } - lttng_live = g_new0(struct lttng_live_component, 1); - if (!lttng_live) { - status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; - goto end; - } - lttng_live->log_level = log_level; - lttng_live->self_comp = self_comp; + auto lttng_live = bt2s::make_unique(std::move(logger)); + lttng_live->self_comp = bt_self_component_source_as_self_component(self_comp); lttng_live->max_query_size = MAX_QUERY_SIZE; lttng_live->has_msg_iter = false; inputs_value = bt_value_map_borrow_entry_value_const(params, INPUTS_PARAM); url_value = bt_value_array_borrow_element_by_index_const(inputs_value, 0); - url = bt_value_string_get(url_value); - - lttng_live->params.url = g_string_new(url); - if (!lttng_live->params.url) { - status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; - goto error; - } + lttng_live->params.url = bt_value_string_get(url_value); value = bt_value_map_borrow_entry_value_const(params, SESS_NOT_FOUND_ACTION_PARAM); if (value) { lttng_live->params.sess_not_found_act = parse_session_not_found_action_param(value); } else { - BT_COMP_LOGI("Optional `%s` parameter is missing: " - "defaulting to `%s`.", - SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR); + BT_CPPLOGI_SPEC(lttng_live->logger, + "Optional `{}` parameter is missing: defaulting to `{}`.", + SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR); lttng_live->params.sess_not_found_act = SESSION_NOT_FOUND_ACTION_CONTINUE; } - status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK; - goto end; - -error: - lttng_live_component_destroy_data(lttng_live); - lttng_live = NULL; -end: - g_free(validation_error); - - *component = lttng_live; - return status; + component = std::move(lttng_live); + return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK; } -BT_HIDDEN bt_component_class_initialize_method_status lttng_live_component_init(bt_self_component_source *self_comp_src, - bt_self_component_source_configuration *config, const bt_value *params, - __attribute__((unused)) void *init_method_data) + bt_self_component_source_configuration *, const bt_value *params, void *) { - struct lttng_live_component *lttng_live; - bt_component_class_initialize_method_status ret; - bt_self_component *self_comp = bt_self_component_source_as_self_component(self_comp_src); - bt_logging_level log_level = - bt_component_get_logging_level(bt_self_component_as_component(self_comp)); - bt_self_component_add_port_status add_port_status; - - ret = lttng_live_component_create(params, log_level, self_comp, <tng_live); - if (ret != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) { - goto error; - } + try { + lttng_live_component::UP lttng_live; + bt_component_class_initialize_method_status ret; + bt_self_component *self_comp = bt_self_component_source_as_self_component(self_comp_src); + bt_self_component_add_port_status add_port_status; + + ret = lttng_live_component_create(params, self_comp_src, lttng_live); + if (ret != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) { + return ret; + } - add_port_status = bt_self_component_source_add_output_port(self_comp_src, "out", NULL, NULL); - if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) { - ret = (bt_component_class_initialize_method_status) add_port_status; - goto end; - } + add_port_status = + bt_self_component_source_add_output_port(self_comp_src, "out", NULL, NULL); + if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) { + ret = (bt_component_class_initialize_method_status) add_port_status; + return ret; + } - bt_self_component_set_data(self_comp, lttng_live); - goto end; + bt_self_component_set_data(self_comp, lttng_live.release()); -error: - lttng_live_component_destroy_data(lttng_live); - lttng_live = NULL; -end: - return ret; + return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK; + } catch (const std::bad_alloc&) { + return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; + } catch (const bt2::Error&) { + return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR; + } }