X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fplugins%2Fctf%2Flttng-live%2Flttng-live.c;h=bc8ea07d4d5e841774bc96596862b372d95ec408;hb=883ffb2f79116ad331926162546b3cc257c82a65;hp=17818348ba334ab7288fc184ce64d6b560eed55b;hpb=c4f23e30bf67d2523163614bc9461d84cbe1ae80;p=babeltrace.git diff --git a/src/plugins/ctf/lttng-live/lttng-live.c b/src/plugins/ctf/lttng-live/lttng-live.c index 17818348..bc8ea07d 100644 --- a/src/plugins/ctf/lttng-live/lttng-live.c +++ b/src/plugins/ctf/lttng-live/lttng-live.c @@ -1,31 +1,11 @@ /* - * lttng-live.c - * - * Babeltrace CTF LTTng-live Client Component + * SPDX-License-Identifier: MIT * * Copyright 2019 Francis Deslauriers * Copyright 2016 Jérémie Galarneau * Copyright 2016 Mathieu Desnoyers * - * Author: Jérémie Galarneau - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. + * Babeltrace CTF LTTng-live Client Component */ #define BT_COMP_LOG_SELF_COMP self_comp @@ -33,11 +13,12 @@ #define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE" #include "logging/comp-logging.h" -#include #include #include #include +#include + #include "common/assert.h" #include #include "compat/compiler.h" @@ -61,7 +42,8 @@ #define print_dbg(fmt, ...) BT_COMP_LOGD(fmt, ## __VA_ARGS__) static -const char *print_live_iterator_status(enum lttng_live_iterator_status status) +const char *lttng_live_iterator_status_string( + enum lttng_live_iterator_status status) { switch (status) { case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE: @@ -81,14 +63,14 @@ const char *print_live_iterator_status(enum lttng_live_iterator_status status) case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED: return "LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED"; default: - abort(); + bt_common_abort(); } } static -const char *print_state(struct lttng_live_stream_iterator *s) +const char *lttng_live_stream_state_string(enum lttng_live_stream_state state) { - switch (s->state) { + switch (state) { case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA: return "ACTIVE_NO_DATA"; case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA: @@ -104,11 +86,11 @@ const char *print_state(struct lttng_live_stream_iterator *s) } } -#define print_stream_state(live_stream_iter) \ +#define LTTNG_LIVE_LOGD_STREAM_ITER(live_stream_iter) \ do { \ - BT_COMP_LOGD("stream state %s last_inact_ts %" PRId64 \ - ", curr_inact_ts %" PRId64, \ - print_state(live_stream_iter), \ + BT_COMP_LOGD("Live stream iterator state=%s, last-inact-ts=%" PRId64 \ + ", curr-inact-ts %" PRId64, \ + lttng_live_stream_state_string(live_stream_iter->state), \ live_stream_iter->last_inactivity_ts, \ live_stream_iter->current_inactivity_ts); \ } while (0); @@ -131,7 +113,7 @@ end: } static -struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session, +struct lttng_live_trace *lttng_live_session_borrow_trace_by_id(struct lttng_live_session *session, uint64_t trace_id) { uint64_t trace_idx; @@ -156,7 +138,7 @@ void lttng_live_destroy_trace(struct lttng_live_trace *trace) bt_logging_level log_level = trace->log_level; bt_self_component *self_comp = trace->self_comp; - BT_COMP_LOGD("Destroy lttng_live_trace"); + BT_COMP_LOGD("Destroying live trace: trace-id=%"PRIu64, trace->id); BT_ASSERT(trace->stream_iterators); g_ptr_array_free(trace->stream_iterators, TRUE); @@ -176,8 +158,13 @@ struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *sess bt_logging_level log_level = session->log_level; bt_self_component *self_comp = session->self_comp; + BT_COMP_LOGD("Creating live trace: " + "session-id=%"PRIu64", trace-id=%"PRIu64, + session->id, trace_id); trace = g_new0(struct lttng_live_trace, 1); if (!trace) { + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Failed to allocate live trace"); goto error; } trace->log_level = session->log_level; @@ -189,10 +176,9 @@ struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *sess trace->stream_iterators = g_ptr_array_new_with_free_func( (GDestroyNotify) lttng_live_stream_iterator_destroy); BT_ASSERT(trace->stream_iterators); - trace->new_metadata_needed = true; + trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED; g_ptr_array_add(session->traces, trace); - BT_COMP_LOGI("Create trace"); goto end; error: g_free(trace); @@ -202,12 +188,12 @@ end: } BT_HIDDEN -struct lttng_live_trace *lttng_live_borrow_trace( +struct lttng_live_trace *lttng_live_session_borrow_or_create_trace_by_id( struct lttng_live_session *session, uint64_t trace_id) { struct lttng_live_trace *trace; - trace = lttng_live_find_trace(session, trace_id); + trace = lttng_live_session_borrow_trace_by_id(session, trace_id); if (trace) { goto end; } @@ -229,8 +215,14 @@ int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, bt_logging_level log_level = lttng_live_msg_iter->log_level; bt_self_component *self_comp = lttng_live_msg_iter->self_comp; + BT_COMP_LOGD("Adding live session: " + "session-id=%" PRIu64 ", hostname=\"%s\" session-name=\"%s\"", + session_id, hostname, session_name); + session = g_new0(struct lttng_live_session, 1); if (!session) { + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Failed to allocate live session"); goto error; } @@ -248,12 +240,9 @@ int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, session->session_name = g_string_new(session_name); BT_ASSERT(session->session_name); - BT_COMP_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s", - session->id, hostname, session_name); g_ptr_array_add(lttng_live_msg_iter->sessions, session); goto end; error: - BT_COMP_LOGE("Error adding session"); g_free(session); ret = -1; end: @@ -272,9 +261,11 @@ void lttng_live_destroy_session(struct lttng_live_session *session) log_level = session->log_level; self_comp = session->self_comp; - BT_COMP_LOGD("Destroy lttng live session"); + BT_COMP_LOGD("Destroying live session: " + "session-id=%"PRIu64", session-name=\"%s\"", + session->id, session->session_name->str); if (session->id != -1ULL) { - if (lttng_live_detach_session(session)) { + if (lttng_live_session_detach(session)) { if (!lttng_live_graph_is_canceled( session->lttng_live_msg_iter)) { /* Old relayd cannot detach sessions. */ @@ -312,7 +303,9 @@ void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE); } - BT_OBJECT_PUT_REF_AND_RESET(lttng_live_msg_iter->viewer_connection); + if (lttng_live_msg_iter->viewer_connection) { + live_viewer_connection_destroy(lttng_live_msg_iter->viewer_connection); + } BT_ASSERT(lttng_live_msg_iter->lttng_live_comp); BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter); @@ -352,11 +345,11 @@ enum lttng_live_iterator_status lttng_live_iterator_next_check_stream_state( case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA: /* Invalid state. */ BT_COMP_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\""); - abort(); + bt_common_abort(); case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA: /* Invalid state. */ BT_COMP_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\""); - abort(); + bt_common_abort(); case LTTNG_LIVE_STREAM_EOF: break; } @@ -382,7 +375,8 @@ enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stre enum lttng_live_stream_state orig_state = lttng_live_stream->state; struct packet_index index; - if (lttng_live_stream->trace->new_metadata_needed) { + if (lttng_live_stream->trace->metadata_stream_state == + LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) { ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; goto end; } @@ -407,7 +401,7 @@ enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stre if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA && last_inact_ts == curr_inact_ts) { ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; - print_stream_state(lttng_live_stream); + LTTNG_LIVE_LOGD_STREAM_ITER(lttng_live_stream); } else { ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; } @@ -435,56 +429,74 @@ enum lttng_live_iterator_status lttng_live_get_session( struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_session *session) { + bt_logging_level log_level = lttng_live_msg_iter->log_level; + bt_self_component *self_comp = lttng_live_msg_iter->self_comp; enum lttng_live_iterator_status status; uint64_t trace_idx; - int ret = 0; + + BT_COMP_LOGD("Updating all streams for session: " + "session-id=%"PRIu64", session-name=\"%s\"", + session->id, session->session_name->str); if (!session->attached) { - ret = lttng_live_attach_session(session); - if (ret) { + enum lttng_live_viewer_status attach_status = + lttng_live_session_attach(session, + lttng_live_msg_iter->self_msg_iter); + if (attach_status != LTTNG_LIVE_VIEWER_STATUS_OK) { if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) { + /* + * Clear any causes appended in + * `lttng_live_attach_session()` as we want to + * return gracefully since the graph was + * cancelled. + */ + bt_current_thread_clear_error(); status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; } else { status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Error attaching to LTTng live session"); } goto end; } } - status = lttng_live_get_new_streams(session); + status = lttng_live_session_get_new_streams(session, + lttng_live_msg_iter->self_msg_iter); if (status != LTTNG_LIVE_ITERATOR_STATUS_OK && status != LTTNG_LIVE_ITERATOR_STATUS_END) { goto end; } - for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) { + trace_idx = 0; + while (trace_idx < session->traces->len) { struct lttng_live_trace *trace = g_ptr_array_index(session->traces, trace_idx); status = lttng_live_metadata_update(trace); - if (status != LTTNG_LIVE_ITERATOR_STATUS_OK && - status != LTTNG_LIVE_ITERATOR_STATUS_END) { + switch (status) { + case LTTNG_LIVE_ITERATOR_STATUS_END: + case LTTNG_LIVE_ITERATOR_STATUS_OK: + trace_idx++; + break; + case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE: + case LTTNG_LIVE_ITERATOR_STATUS_AGAIN: + goto end; + default: + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Error updating trace metadata: " + "stream-iter-status=%s, trace-id=%"PRIu64, + lttng_live_iterator_status_string(status), + trace->id); goto end; } } - status = lttng_live_lazy_msg_init(session); + status = lttng_live_lazy_msg_init(session, + lttng_live_msg_iter->self_msg_iter); end: return status; } -BT_HIDDEN -void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter) -{ - uint64_t session_idx; - - for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; - session_idx++) { - struct lttng_live_session *session = - g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx); - session->new_streams_needed = true; - } -} - static void lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter) { @@ -499,7 +511,11 @@ void lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng trace_idx++) { struct lttng_live_trace *trace = g_ptr_array_index(session->traces, trace_idx); - trace->new_metadata_needed = true; + + BT_ASSERT(trace->metadata_stream_state != + LTTNG_LIVE_METADATA_STREAM_STATE_CLOSED); + + trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED; } } } @@ -509,12 +525,17 @@ enum lttng_live_iterator_status lttng_live_iterator_handle_new_streams_and_metadata( struct lttng_live_msg_iter *lttng_live_msg_iter) { - enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK; + enum lttng_live_iterator_status status; + enum lttng_live_viewer_status viewer_status; + bt_logging_level log_level = lttng_live_msg_iter->log_level; + bt_self_component *self_comp = lttng_live_msg_iter->self_comp; uint64_t session_idx = 0, nr_sessions_opened = 0; struct lttng_live_session *session; enum session_not_found_action sess_not_found_act = lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act; + BT_COMP_LOGD("Update data and metadata of all sessions: " + "live-msg-iter-addr=%p", lttng_live_msg_iter); /* * In a remotely distant future, we could add a "new * session" flag to the protocol, which would tell us that we @@ -523,15 +544,26 @@ lttng_live_iterator_handle_new_streams_and_metadata( */ if (lttng_live_msg_iter->sessions->len == 0) { if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) { - ret = LTTNG_LIVE_ITERATOR_STATUS_END; + BT_COMP_LOGD("No session found. Exiting in accordance with the `session-not-found-action` parameter"); + status = LTTNG_LIVE_ITERATOR_STATUS_END; goto end; } else { + BT_COMP_LOGD("No session found. Try creating a new one in accordance with the `session-not-found-action` parameter"); /* * Retry to create a viewer session for the requested * session name. */ - if (lttng_live_create_viewer_session(lttng_live_msg_iter)) { - ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { + status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Error creating LTTng live viewer session"); + } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { + status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + } else { + bt_common_abort(); + } goto end; } } @@ -541,12 +573,12 @@ lttng_live_iterator_handle_new_streams_and_metadata( session_idx++) { session = g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx); - ret = lttng_live_get_session(lttng_live_msg_iter, session); - switch (ret) { + status = lttng_live_get_session(lttng_live_msg_iter, session); + switch (status) { case LTTNG_LIVE_ITERATOR_STATUS_OK: break; case LTTNG_LIVE_ITERATOR_STATUS_END: - ret = LTTNG_LIVE_ITERATOR_STATUS_OK; + status = LTTNG_LIVE_ITERATOR_STATUS_OK; break; default: goto end; @@ -555,22 +587,27 @@ lttng_live_iterator_handle_new_streams_and_metadata( nr_sessions_opened++; } } -end: - if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK && - sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE && + + if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE && nr_sessions_opened == 0) { - ret = LTTNG_LIVE_ITERATOR_STATUS_END; + status = LTTNG_LIVE_ITERATOR_STATUS_END; + } else { + status = LTTNG_LIVE_ITERATOR_STATUS_OK; } - return ret; + +end: + return status; } static enum lttng_live_iterator_status emit_inactivity_message( struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator *stream_iter, - bt_message **message, uint64_t timestamp) + const bt_message **message, uint64_t timestamp) { enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK; + bt_logging_level log_level = lttng_live_msg_iter->log_level; + bt_self_component *self_comp = lttng_live_msg_iter->self_comp; bt_message *msg = NULL; BT_ASSERT(stream_iter->trace->clock_class); @@ -579,6 +616,8 @@ enum lttng_live_iterator_status emit_inactivity_message( lttng_live_msg_iter->self_msg_iter, stream_iter->trace->clock_class, timestamp); if (!msg) { + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Error emitting message iterator inactivity message"); goto error; } @@ -596,7 +635,7 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream( struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator *lttng_live_stream, - bt_message **message) + const bt_message **message) { enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK; @@ -681,7 +720,7 @@ int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter, msg); break; case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY: - clock_snapshot = bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const( + clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const( msg); break; default: @@ -696,7 +735,8 @@ int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter, ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns); if (ret) { - BT_COMP_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: " + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Cannot get nanoseconds from Epoch of clock snapshot: " "clock-snapshot-addr=%p", clock_snapshot); goto error; } @@ -721,12 +761,12 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream( struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator *lttng_live_stream, - bt_message **message) + const bt_message **message) { enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK; bt_logging_level log_level = lttng_live_msg_iter->log_level; bt_self_component *self_comp = lttng_live_msg_iter->self_comp; - enum bt_msg_iter_status status; + enum ctf_msg_iter_status status; uint64_t session_idx, trace_idx; for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; @@ -742,7 +782,7 @@ enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_ trace_idx++) { struct lttng_live_trace *trace = g_ptr_array_index(session->traces, trace_idx); - if (trace->new_metadata_needed) { + if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) { ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; goto end; } @@ -751,19 +791,23 @@ enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_ if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) { ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Invalid state of live stream iterator" + "stream-iter-status=%s", + lttng_live_stream_state_string(lttng_live_stream->state)); goto end; } - status = bt_msg_iter_get_next_message(lttng_live_stream->msg_iter, - lttng_live_msg_iter->self_msg_iter, message); + status = ctf_msg_iter_get_next_message( + lttng_live_stream->msg_iter, message); switch (status) { - case BT_MSG_ITER_STATUS_EOF: + case CTF_MSG_ITER_STATUS_EOF: ret = LTTNG_LIVE_ITERATOR_STATUS_END; break; - case BT_MSG_ITER_STATUS_OK: + case CTF_MSG_ITER_STATUS_OK: ret = LTTNG_LIVE_ITERATOR_STATUS_OK; break; - case BT_MSG_ITER_STATUS_AGAIN: + case CTF_MSG_ITER_STATUS_AGAIN: /* * Continue immediately (end of packet). The next * get_index may return AGAIN to delay the following @@ -771,13 +815,14 @@ enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_ */ ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; break; - case BT_MSG_ITER_STATUS_INVAL: - /* No argument provided by the user, so don't return INVAL. */ - case BT_MSG_ITER_STATUS_ERROR: + case CTF_MSG_ITER_STATUS_ERROR: default: ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - BT_COMP_LOGW("CTF msg iterator return an error or failed msg_iter=%p", - lttng_live_stream->msg_iter); + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "CTF message iterator failed to get next message: " + "msg-iter=%p, msg-iter-status=%s", + lttng_live_stream->msg_iter, + ctf_msg_iter_status_string(status)); break; } @@ -789,25 +834,32 @@ static enum lttng_live_iterator_status lttng_live_iterator_close_stream( struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator *stream_iter, - bt_message **curr_msg) + const bt_message **curr_msg) { enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK; + bt_logging_level log_level = lttng_live_msg_iter->log_level; + bt_self_component *self_comp = lttng_live_msg_iter->self_comp; /* * The viewer has hung up on us so we are closing the stream. The - * `bt_msg_iter` should simply realize that it needs to close the + * `ctf_msg_iter` should simply realize that it needs to close the * stream properly by emitting the necessary stream end message. */ - enum bt_msg_iter_status status = bt_msg_iter_get_next_message( - stream_iter->msg_iter, lttng_live_msg_iter->self_msg_iter, - curr_msg); + enum ctf_msg_iter_status status = ctf_msg_iter_get_next_message( + stream_iter->msg_iter, curr_msg); - if (status == BT_MSG_ITER_STATUS_ERROR) { + if (status == CTF_MSG_ITER_STATUS_ERROR) { + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Error getting the next message from CTF message iterator"); live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; goto end; + } else if (status == CTF_MSG_ITER_STATUS_EOF) { + BT_COMP_LOGI("Reached the end of the live stream iterator."); + live_status = LTTNG_LIVE_ITERATOR_STATUS_END; + goto end; } - BT_ASSERT(status == BT_MSG_ITER_STATUS_OK); + BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK); end: return live_status; @@ -865,12 +917,15 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_msg_on_stream( struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator *stream_iter, - bt_message **curr_msg) + const bt_message **curr_msg) { bt_logging_level log_level = lttng_live_msg_iter->log_level; bt_self_component *self_comp = lttng_live_msg_iter->self_comp; enum lttng_live_iterator_status live_status; + BT_COMP_LOGD("Finding the next message for stream iterator: " + "stream-name=\"%s\"", stream_iter->name->str); + if (stream_iter->has_stream_hung_up) { /* * The stream has hung up and the stream was properly closed @@ -883,7 +938,7 @@ enum lttng_live_iterator_status lttng_live_iterator_next_msg_on_stream( } retry: - print_stream_state(stream_iter); + LTTNG_LIVE_LOGD_STREAM_ITER(stream_iter); live_status = lttng_live_iterator_handle_new_streams_and_metadata( lttng_live_msg_iter); if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { @@ -926,6 +981,77 @@ end: return live_status; } +static +bool is_discarded_packet_or_event_message(const bt_message *msg) +{ + const enum bt_message_type msg_type = bt_message_get_type(msg); + + return msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS || + msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS; +} + +static +enum lttng_live_iterator_status adjust_discarded_packets_message( + bt_self_message_iterator *iter, + const bt_stream *stream, + const bt_message *msg_in, bt_message **msg_out, + uint64_t new_begin_ts) +{ + enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK; + enum bt_property_availability availability; + const bt_clock_snapshot *clock_snapshot; + uint64_t end_ts; + uint64_t count; + + clock_snapshot = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg_in); + end_ts = bt_clock_snapshot_get_value(clock_snapshot); + + availability = bt_message_discarded_packets_get_count(msg_in, &count); + BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE); + + *msg_out = bt_message_discarded_packets_create_with_default_clock_snapshots( + iter, stream, new_begin_ts, end_ts); + if (!*msg_out) { + status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM; + goto end; + } + + bt_message_discarded_packets_set_count(*msg_out, count); +end: + return status; +} + +static +enum lttng_live_iterator_status adjust_discarded_events_message( + bt_self_message_iterator *iter, + const bt_stream *stream, + const bt_message *msg_in, bt_message **msg_out, + uint64_t new_begin_ts) +{ + enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK; + enum bt_property_availability availability; + const bt_clock_snapshot *clock_snapshot; + uint64_t end_ts; + uint64_t count; + + clock_snapshot = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg_in); + end_ts = bt_clock_snapshot_get_value(clock_snapshot); + + availability = bt_message_discarded_events_get_count(msg_in, &count); + BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE); + + *msg_out = bt_message_discarded_events_create_with_default_clock_snapshots( + iter, stream, new_begin_ts, end_ts); + if (!*msg_out) { + status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM; + goto end; + } + + bt_message_discarded_events_set_count(*msg_out, count); +end: + return status; +} + static enum lttng_live_iterator_status next_stream_iterator_for_trace( struct lttng_live_msg_iter *lttng_live_msg_iter, @@ -941,6 +1067,9 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( BT_ASSERT_DBG(live_trace); BT_ASSERT_DBG(live_trace->stream_iterators); + + BT_COMP_LOGD("Finding the next stream iterator for trace: " + "trace-id=%"PRIu64, live_trace->id); /* * Update the current message of every stream iterators of this trace. * The current msg of every stream must have a timestamp equal or @@ -955,17 +1084,17 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( stream_iter_idx); /* - * Find if there is are now current message for this stream - * iterator get it. + * If there is no current message for this stream, go fetch + * one. */ while (!stream_iter->current_msg) { - bt_message *msg = NULL; + const bt_message *msg = NULL; int64_t curr_msg_ts_ns = INT64_MAX; stream_iter_status = lttng_live_iterator_next_msg_on_stream( lttng_live_msg_iter, stream_iter, &msg); BT_COMP_LOGD("live stream iterator returned status :%s", - print_live_iterator_status(stream_iter_status)); + lttng_live_iterator_status_string(stream_iter_status)); if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) { stream_iter_is_ended = true; break; @@ -987,19 +1116,73 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( /* * Check if the message of the current live stream - * iterator occured at the exact same time or after the + * iterator occurred at the exact same time or after the * last message returned by this component's message * iterator. If not, we return an error. */ if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) { stream_iter->current_msg = msg; stream_iter->current_msg_ts_ns = curr_msg_ts_ns; + } else if (stream_iter->last_inactivity_ts > curr_msg_ts_ns && + is_discarded_packet_or_event_message(msg)) { + /* + * The CTF message iterator emits Discarded + * Packets and Events with synthesized begin and + * end timestamps from the bounds of the last + * known packet and the newly decoded packet + * header. + * + * The CTF message iterator is not aware of + * stream inactivity beacons. Hence, we have + * to adjust the begin timestamp of those types + * of messages if a stream signaled its + * inactivity up until _after_ the last known + * packet's begin timestamp. + * + * Otherwise, the monotonicity guarantee would + * not be preserved. + */ + const enum bt_message_type msg_type = + bt_message_get_type(msg); + enum lttng_live_iterator_status adjust_status = + LTTNG_LIVE_ITERATOR_STATUS_OK; + bt_message *adjusted_message; + + switch (msg_type) { + case BT_MESSAGE_TYPE_DISCARDED_EVENTS: + adjust_status = adjust_discarded_events_message( + lttng_live_msg_iter->self_msg_iter, + stream_iter->stream, + msg, &adjusted_message, + stream_iter->last_inactivity_ts); + break; + case BT_MESSAGE_TYPE_DISCARDED_PACKETS: + adjust_status = adjust_discarded_packets_message( + lttng_live_msg_iter->self_msg_iter, + stream_iter->stream, + msg, &adjusted_message, + stream_iter->last_inactivity_ts); + break; + default: + bt_common_abort(); + } + + if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { + stream_iter_status = adjust_status; + goto end; + } + + BT_ASSERT_DBG(adjusted_message); + stream_iter->current_msg = adjusted_message; + stream_iter->current_msg_ts_ns = + stream_iter->last_inactivity_ts; } else { /* * We received a message in the past. To ensure * monotonicity, we can't send it forward. */ - BT_COMP_LOGE("Message's timestamp is less than " + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Message's timestamp is less than " "lttng-live's message iterator's last " "returned timestamp: " "lttng-live-msg-iter-addr=%p, ts=%" PRId64 ", " @@ -1099,6 +1282,8 @@ enum lttng_live_iterator_status next_stream_iterator_for_session( int64_t youngest_candidate_msg_ts = INT64_MAX; struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL; + BT_COMP_LOGD("Finding the next stream iterator for session: " + "session-id=%"PRIu64, session->id); /* * Make sure we are attached to the session and look for new streams * and metadata. @@ -1202,12 +1387,13 @@ void put_messages(bt_message_array_const msgs, uint64_t count) } BT_HIDDEN -bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next( +bt_message_iterator_class_next_method_status lttng_live_msg_iter_next( bt_self_message_iterator *self_msg_it, bt_message_array_const msgs, uint64_t capacity, uint64_t *count) { - bt_component_class_message_iterator_next_method_status status; + bt_message_iterator_class_next_method_status status; + enum lttng_live_viewer_status viewer_status; struct lttng_live_msg_iter *lttng_live_msg_iter = bt_self_message_iterator_get_data(self_msg_it); struct lttng_live_component *lttng_live = @@ -1221,6 +1407,21 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next( BT_ASSERT_DBG(lttng_live_msg_iter); + if (G_UNLIKELY(lttng_live_msg_iter->was_interrupted)) { + /* + * The iterator was interrupted in a previous call to the + * `_next()` method. We currently do not support generating + * messages after such event. The babeltrace2 CLI should never + * be running the graph after being interrupted. So this check + * is to prevent other graph users from using this live + * iterator in an messed up internal state. + */ + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Message iterator was interrupted during a previous call to the `next()` and currently does not support continuing after such event."); + goto end; + } + /* * Clear all the invalid message reference that might be left over in * the output array. @@ -1235,17 +1436,26 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next( if (lttng_live_msg_iter->sessions->len == 0) { if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) { - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END; - goto no_session; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END; + goto end; } else { /* * The are no more active session for this session * name. Retry to create a viewer session for the * requested session name. */ - if (lttng_live_create_viewer_session(lttng_live_msg_iter)) { - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; - goto no_session; + viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Error creating LTTng live viewer session"); + } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN; + } else { + bt_common_abort(); + } + goto end; } } } @@ -1313,7 +1523,7 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next( } if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { - goto end; + goto return_status; } if (G_UNLIKELY(youngest_stream_iter == NULL) || @@ -1356,7 +1566,7 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next( if (!youngest_stream_iter) { stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; - goto end; + goto return_status; } BT_ASSERT_DBG(youngest_stream_iter->current_msg); @@ -1378,135 +1588,193 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next( stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK; } -end: + +return_status: switch (stream_iter_status) { case LTTNG_LIVE_ITERATOR_STATUS_OK: case LTTNG_LIVE_ITERATOR_STATUS_AGAIN: + /* + * If we gathered messages, return _OK even if the graph was + * interrupted. This allows for the components downstream to at + * least get the thoses messages. If the graph was indeed + * interrupted there should not be another _next() call as the + * application will tear down the graph. This component class + * doesn't support restarting after an interruption. + */ if (*count > 0) { - /* - * We received a again status but we have some messages - * to send downstream. We send them and return OK for - * now. On the next call we return again if there are - * still no new message to send. - */ - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; } else { - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_AGAIN; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN; } break; case LTTNG_LIVE_ITERATOR_STATUS_END: - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END; break; case LTTNG_LIVE_ITERATOR_STATUS_NOMEM: - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Memory error preparing the next batch of messages: " + "live-iter-status=%s", + lttng_live_iterator_status_string(stream_iter_status)); + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR; break; case LTTNG_LIVE_ITERATOR_STATUS_ERROR: case LTTNG_LIVE_ITERATOR_STATUS_INVAL: case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED: - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Error preparing the next batch of messages: " + "live-iter-status=%s", + lttng_live_iterator_status_string(stream_iter_status)); + + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; /* Put all existing messages on error. */ put_messages(msgs, *count); break; default: - abort(); + bt_common_abort(); } -no_session: +end: return status; } +static +struct lttng_live_msg_iter *lttng_live_msg_iter_create( + struct lttng_live_component *lttng_live_comp, + bt_self_message_iterator *self_msg_it) +{ + bt_self_component *self_comp = lttng_live_comp->self_comp; + bt_logging_level log_level = lttng_live_comp->log_level; + + struct lttng_live_msg_iter *lttng_live_msg_iter = + g_new0(struct lttng_live_msg_iter, 1); + if (!lttng_live_msg_iter) { + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Failed to allocate lttng_live_msg_iter"); + goto end; + } + + lttng_live_msg_iter->log_level = lttng_live_comp->log_level; + lttng_live_msg_iter->self_comp = lttng_live_comp->self_comp; + lttng_live_msg_iter->lttng_live_comp = lttng_live_comp; + lttng_live_msg_iter->self_msg_iter = self_msg_it; + + lttng_live_msg_iter->active_stream_iter = 0; + lttng_live_msg_iter->last_msg_ts_ns = INT64_MIN; + lttng_live_msg_iter->was_interrupted = false; + + lttng_live_msg_iter->sessions = g_ptr_array_new_with_free_func( + (GDestroyNotify) lttng_live_destroy_session); + BT_ASSERT(lttng_live_msg_iter->sessions); + +end: + return lttng_live_msg_iter; + +} + BT_HIDDEN -bt_component_class_message_iterator_initialize_method_status lttng_live_msg_iter_init( +bt_message_iterator_class_initialize_method_status lttng_live_msg_iter_init( bt_self_message_iterator *self_msg_it, bt_self_message_iterator_configuration *config, - bt_self_component_source *self_comp_src, bt_self_component_port_output *self_port) { - bt_component_class_message_iterator_initialize_method_status ret = - BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_OK; - bt_self_component *self_comp = - bt_self_component_source_as_self_component(self_comp_src); + bt_message_iterator_class_initialize_method_status status; struct lttng_live_component *lttng_live; struct lttng_live_msg_iter *lttng_live_msg_iter; + enum lttng_live_viewer_status viewer_status; bt_logging_level log_level; - - BT_ASSERT(self_msg_it); + bt_self_component *self_comp = + bt_self_message_iterator_borrow_component(self_msg_it); lttng_live = bt_self_component_get_data(self_comp); log_level = lttng_live->log_level; self_comp = lttng_live->self_comp; + /* There can be only one downstream iterator at the same time. */ BT_ASSERT(!lttng_live->has_msg_iter); lttng_live->has_msg_iter = true; - lttng_live_msg_iter = g_new0(struct lttng_live_msg_iter, 1); + lttng_live_msg_iter = lttng_live_msg_iter_create(lttng_live, + self_msg_it); if (!lttng_live_msg_iter) { - ret = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR; - goto end; + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Failed to create lttng_live_msg_iter"); + goto error; } - lttng_live_msg_iter->log_level = lttng_live->log_level; - lttng_live_msg_iter->self_comp = lttng_live->self_comp; - lttng_live_msg_iter->lttng_live_comp = lttng_live; - lttng_live_msg_iter->self_msg_iter = self_msg_it; - - lttng_live_msg_iter->active_stream_iter = 0; - lttng_live_msg_iter->last_msg_ts_ns = INT64_MIN; - lttng_live_msg_iter->sessions = g_ptr_array_new_with_free_func( - (GDestroyNotify) lttng_live_destroy_session); - BT_ASSERT(lttng_live_msg_iter->sessions); - - lttng_live_msg_iter->viewer_connection = - live_viewer_connection_create(lttng_live->params.url->str, false, - lttng_live_msg_iter, log_level); - if (!lttng_live_msg_iter->viewer_connection) { + viewer_status = live_viewer_connection_create(self_comp, NULL, + log_level, lttng_live->params.url->str, false, + lttng_live_msg_iter, <tng_live_msg_iter->viewer_connection); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Failed to create viewer connection"); + } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { + /* + * Interruption in the _iter_init() method is not + * supported. Return an error. + */ + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Interrupted while creating viewer connection"); + } goto error; } - if (lttng_live_create_viewer_session(lttng_live_msg_iter)) { + viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Failed to create viewer session"); + } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { + /* + * Interruption in the _iter_init() method is not + * supported. Return an error. + */ + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Interrupted when creating viewer session"); + } goto error; } + if (lttng_live_msg_iter->sessions->len == 0) { switch (lttng_live->params.sess_not_found_act) { case SESSION_NOT_FOUND_ACTION_CONTINUE: - BT_COMP_LOGI("Unable to connect to the requested live viewer " - "session. Keep trying to connect because of " + BT_COMP_LOGI("Unable to connect to the requested live viewer session. Keep trying to connect because of " "%s=\"%s\" component parameter: url=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR, lttng_live->params.url->str); break; case SESSION_NOT_FOUND_ACTION_FAIL: - BT_COMP_LOGE("Unable to connect to the requested live viewer " - "session. Fail the message iterator" - "initialization because of %s=\"%s\" " + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Unable to connect to the requested live viewer session. Fail the message iterator initialization because of %s=\"%s\" " "component parameter: url =\"%s\"", SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_FAIL_STR, lttng_live->params.url->str); goto error; case SESSION_NOT_FOUND_ACTION_END: - BT_COMP_LOGI("Unable to connect to the requested live viewer " - "session. End gracefully at the first _next() " + BT_COMP_LOGI("Unable to connect to the requested live viewer session. End gracefully at the first _next() " "call because of %s=\"%s\" component parameter: " "url=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_END_STR, lttng_live->params.url->str); break; default: - abort(); + bt_common_abort(); } } bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter); - + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK; goto end; + error: - ret = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR; lttng_live_msg_iter_destroy(lttng_live_msg_iter); end: - return ret; + return status; } static struct bt_param_validation_map_value_entry_descr list_sessions_params[] = { @@ -1524,6 +1792,7 @@ bt_component_class_query_method_status lttng_live_query_list_sessions( const bt_value *url_value = NULL; const char *url; struct live_viewer_connection *viewer_connection = NULL; + enum lttng_live_viewer_status viewer_status; enum bt_param_validation_status validation_status; gchar *validate_error = NULL; @@ -1534,23 +1803,34 @@ bt_component_class_query_method_status lttng_live_query_list_sessions( goto error; } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) { status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; - BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "%s", validate_error); + BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "%s", + validate_error); goto error; } url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM); url = bt_value_string_get(url_value); - viewer_connection = live_viewer_connection_create(url, true, NULL, - log_level); - if (!viewer_connection) { - status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; + viewer_status = live_viewer_connection_create(NULL, self_comp_class, + log_level, url, true, NULL, &viewer_connection); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { + BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, + "Failed to create viewer connection"); + status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; + } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { + status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN; + } else { + bt_common_abort(); + } goto error; } status = live_viewer_connection_list_sessions(viewer_connection, result); if (status != BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK) { + BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, + "Failed to list viewer sessions"); goto error; } @@ -1576,6 +1856,7 @@ end: static bt_component_class_query_method_status lttng_live_query_support_info( const bt_value *params, const bt_value **result, + bt_self_component_class *self_comp_class, bt_logging_level log_level) { bt_component_class_query_method_status status = @@ -1592,12 +1873,14 @@ bt_component_class_query_method_status lttng_live_query_support_info( input_type_value = bt_value_map_borrow_entry_value_const(params, "type"); if (!input_type_value) { - BT_COMP_LOGE("Missing expected `type` parameter."); + BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, + "Missing expected `type` parameter."); goto error; } if (!bt_value_is_string(input_type_value)) { - BT_COMP_LOGE("`type` parameter is not a string value."); + BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, + "`type` parameter is not a string value."); goto error; } @@ -1608,12 +1891,14 @@ bt_component_class_query_method_status lttng_live_query_support_info( input_value = bt_value_map_borrow_entry_value_const(params, "input"); if (!input_value) { - BT_COMP_LOGE("Missing expected `input` parameter."); + BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, + "Missing expected `input` parameter."); goto error; } if (!bt_value_is_string(input_value)) { - BT_COMP_LOGE("`input` parameter is not a string value."); + BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, + "`input` parameter is not a string value."); goto error; } @@ -1670,7 +1955,7 @@ bt_component_class_query_method_status lttng_live_query( self_comp_class, log_level); } else if (strcmp(object, "babeltrace.support-info") == 0) { status = lttng_live_query_support_info(params, result, - log_level); + self_comp_class, log_level); } else { BT_COMP_LOGI("Unknown query object `%s`", object); status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT;