X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fplugins%2Fctf%2Flttng-live%2Flttng-live.c;h=9352fd12c0e69ab2a9885ee14db574c68accd6cd;hb=51c1eaeb368b7a4d6a3990bb30dc7e3a8d6e1768;hp=0944230759937cf16f7e46d420e07a57ec2bb2b8;hpb=fb25b9e364c8eab9fe5e37947831e233086c7218;p=babeltrace.git diff --git a/src/plugins/ctf/lttng-live/lttng-live.c b/src/plugins/ctf/lttng-live/lttng-live.c index 09442307..9352fd12 100644 --- a/src/plugins/ctf/lttng-live/lttng-live.c +++ b/src/plugins/ctf/lttng-live/lttng-live.c @@ -31,7 +31,7 @@ #define BT_COMP_LOG_SELF_COMP self_comp #define BT_LOG_OUTPUT_LEVEL log_level #define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE" -#include "plugins/comp-logging.h" +#include "logging/comp-logging.h" #include #include @@ -42,6 +42,8 @@ #include "compat/compiler.h" #include +#include "plugins/common/muxing/muxing.h" + #include "data-stream.h" #include "metadata.h" #include "lttng-live.h" @@ -109,18 +111,17 @@ const char *print_state(struct lttng_live_stream_iterator *s) } while (0); BT_HIDDEN -bool lttng_live_graph_is_canceled(struct lttng_live_component *lttng_live) +bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter) { - const bt_component *component; bool ret; - if (!lttng_live) { + if (!msg_iter) { ret = false; goto end; } - component = bt_self_component_as_component(lttng_live->self_comp); - ret = bt_component_graph_is_canceled(component); + ret = bt_self_message_iterator_is_interrupted( + msg_iter->self_msg_iter); end: return ret; @@ -259,7 +260,6 @@ end: static void lttng_live_destroy_session(struct lttng_live_session *session) { - struct lttng_live_component *live_comp; bt_logging_level log_level; bt_self_component *self_comp; @@ -272,9 +272,9 @@ void lttng_live_destroy_session(struct lttng_live_session *session) BT_COMP_LOGD("Destroy lttng live session"); if (session->id != -1ULL) { if (lttng_live_detach_session(session)) { - live_comp = session->lttng_live_msg_iter->lttng_live_comp; if (session->lttng_live_msg_iter && - !lttng_live_graph_is_canceled(live_comp)) { + !lttng_live_graph_is_canceled( + session->lttng_live_msg_iter)) { /* Old relayd cannot detach sessions. */ BT_COMP_LOGD("Unable to detach lttng live session %" PRIu64, session->id); @@ -442,7 +442,7 @@ enum lttng_live_iterator_status lttng_live_get_session( ret = lttng_live_attach_session(session); if (ret) { if (lttng_live_msg_iter && lttng_live_graph_is_canceled( - lttng_live_msg_iter->lttng_live_comp)) { + lttng_live_msg_iter)) { status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; } else { status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; @@ -633,7 +633,6 @@ int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter, const bt_clock_class *clock_class = NULL; const bt_clock_snapshot *clock_snapshot = NULL; int ret = 0; - bt_message_stream_activity_clock_snapshot_state sa_cs_state; bt_logging_level log_level = lttng_live_msg_iter->log_level; bt_self_component *self_comp = lttng_live_msg_iter->self_comp; @@ -690,32 +689,6 @@ int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter, clock_snapshot = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const( msg); break; - case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING: - clock_class = - bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const( - msg); - BT_ASSERT(clock_class); - - sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const( - msg, &clock_snapshot); - if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) { - goto no_clock_snapshot; - } - - break; - case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END: - clock_class = - bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const( - msg); - BT_ASSERT(clock_class); - - sa_cs_state = bt_message_stream_activity_end_borrow_default_clock_snapshot_const( - msg, &clock_snapshot); - if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) { - goto no_clock_snapshot; - } - - break; case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY: clock_snapshot = bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const( @@ -740,12 +713,6 @@ int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter, goto end; -no_clock_snapshot: - BT_COMP_LOGD_STR("Message's default clock snapshot is missing: " - "using the last message timestamp."); - *ts_ns = last_msg_ts_ns; - goto end; - error: ret = -1; @@ -828,6 +795,34 @@ end: return ret; } +static +enum lttng_live_iterator_status lttng_live_iterator_close_stream( + struct lttng_live_msg_iter *lttng_live_msg_iter, + struct lttng_live_stream_iterator *stream_iter, + bt_message **curr_msg) +{ + enum lttng_live_iterator_status live_status = + LTTNG_LIVE_ITERATOR_STATUS_OK; + /* + * The viewer has hung up on us so we are closing the stream. The + * `bt_msg_iter` should simply realize that it needs to close the + * stream properly by emitting the necessary stream end message. + */ + enum bt_msg_iter_status status = + bt_msg_iter_get_next_message(stream_iter->msg_iter, + lttng_live_msg_iter->self_msg_iter, curr_msg); + + if (status == BT_MSG_ITER_STATUS_ERROR) { + live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; + } + + BT_ASSERT(status == BT_MSG_ITER_STATUS_OK); + +end: + return live_status; +} + /* * helper function: * handle_no_data_streams() @@ -886,6 +881,17 @@ enum lttng_live_iterator_status lttng_live_iterator_next_on_stream( bt_self_component *self_comp = lttng_live_msg_iter->self_comp; enum lttng_live_iterator_status live_status; + if (stream_iter->has_stream_hung_up) { + /* + * The stream has hung up and the stream was properly closed + * during the last call to the current function. Return _END + * status now so that this stream iterator is removed for the + * stream iterator list. + */ + live_status = LTTNG_LIVE_ITERATOR_STATUS_END; + goto end; + } + retry: print_stream_state(stream_iter); live_status = lttng_live_iterator_handle_new_streams_and_metadata( @@ -895,13 +901,22 @@ retry: } live_status = lttng_live_iterator_next_handle_one_no_data_stream( lttng_live_msg_iter, stream_iter); + if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { + if (live_status == LTTNG_LIVE_ITERATOR_STATUS_END) { + /* + * We overwrite `live_status` since `curr_msg` is + * likely set to a valid message in this function. + */ + live_status = lttng_live_iterator_close_stream( + lttng_live_msg_iter, stream_iter, curr_msg); + } goto end; } live_status = lttng_live_iterator_next_handle_one_quiescent_stream( lttng_live_msg_iter, stream_iter, curr_msg); if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { - BT_ASSERT(*curr_msg == NULL); + BT_ASSERT(!*curr_msg); goto end; } if (*curr_msg) { @@ -910,7 +925,7 @@ retry: live_status = lttng_live_iterator_next_handle_one_active_data_stream( lttng_live_msg_iter, stream_iter, curr_msg); if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { - BT_ASSERT(*curr_msg == NULL); + BT_ASSERT(!*curr_msg); } end: @@ -949,6 +964,16 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( g_ptr_array_index(live_trace->stream_iterators, stream_iter_idx); + /* + * Since we may remove elements from the GPtrArray as we + * iterate over it, it's possible to see the same element more + * than once. + */ + if (stream_iter == curr_candidate_stream_iter) { + stream_iter_idx++; + continue; + } + /* * Find if there is are now current message for this stream * iterator get it. @@ -1006,18 +1031,50 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( } } - if (!stream_iter_is_ended && - stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) { - /* - * Update the current best candidate message for the - * stream iterator of thise live trace to be forwarded - * downstream. - */ - curr_candidate_msg_ts = stream_iter->current_msg_ts_ns; - curr_candidate_stream_iter = stream_iter; - } + BT_ASSERT(stream_iter != curr_candidate_stream_iter); - if (stream_iter_is_ended) { + if (!stream_iter_is_ended) { + if (G_UNLIKELY(curr_candidate_stream_iter == NULL) || + stream_iter->current_msg_ts_ns < curr_candidate_msg_ts) { + /* + * Update the current best candidate message + * for the stream iterator of this live trace + * to be forwarded downstream. + */ + curr_candidate_msg_ts = stream_iter->current_msg_ts_ns; + curr_candidate_stream_iter = stream_iter; + } else if (stream_iter->current_msg_ts_ns == curr_candidate_msg_ts) { + /* + * Order the messages in an arbitrary but + * deterministic way. + */ + BT_ASSERT(stream_iter != curr_candidate_stream_iter); + int ret = common_muxing_compare_messages( + stream_iter->current_msg, + curr_candidate_stream_iter->current_msg); + if (ret < 0) { + /* + * The `curr_candidate_stream_iter->current_msg` + * should go first. Update the next + * iterator and the current timestamp. + */ + curr_candidate_msg_ts = stream_iter->current_msg_ts_ns; + curr_candidate_stream_iter = stream_iter; + } else if (ret == 0) { + /* + * Unable to pick which one should go + * first. + */ + BT_COMP_LOGW("Cannot deterministically pick next live stream message iterator because they have identical next messages: " + "stream-iter-addr=%p" + "stream-iter-addr=%p", + stream_iter, + curr_candidate_stream_iter); + } + } + + stream_iter_idx++; + } else { /* * The live stream iterator is ENDed. We remove that * iterator from the list and we restart the iteration @@ -1027,8 +1084,6 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( g_ptr_array_remove_index_fast(live_trace->stream_iterators, stream_iter_idx); stream_iter_idx = 0; - } else { - stream_iter_idx++; } } @@ -1054,6 +1109,8 @@ enum lttng_live_iterator_status next_stream_iterator_for_session( struct lttng_live_session *session, struct lttng_live_stream_iterator **candidate_session_stream_iter) { + bt_self_component *self_comp = lttng_live_msg_iter->self_comp; + bt_logging_level log_level = lttng_live_msg_iter->log_level; enum lttng_live_iterator_status stream_iter_status; uint64_t trace_idx = 0; int64_t curr_candidate_msg_ts = INT64_MAX; @@ -1098,9 +1155,32 @@ enum lttng_live_iterator_status next_stream_iterator_for_session( if (!trace_is_ended) { BT_ASSERT(stream_iter); - if (stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) { + if (G_UNLIKELY(curr_candidate_stream_iter == NULL) || + stream_iter->current_msg_ts_ns < curr_candidate_msg_ts) { curr_candidate_msg_ts = stream_iter->current_msg_ts_ns; curr_candidate_stream_iter = stream_iter; + } else if (stream_iter->current_msg_ts_ns == curr_candidate_msg_ts) { + /* + * Order the messages in an arbitrary but + * deterministic way. + */ + int ret = common_muxing_compare_messages( + stream_iter->current_msg, + curr_candidate_stream_iter->current_msg); + if (ret < 0) { + /* + * The `curr_candidate_stream_iter->current_msg` + * should go first. Update the next iterator + * and the current timestamp. + */ + curr_candidate_msg_ts = stream_iter->current_msg_ts_ns; + curr_candidate_stream_iter = stream_iter; + } else if (ret == 0) { + /* Unable to pick which one should go first. */ + BT_COMP_LOGW("Cannot deterministically pick next live stream message iterator because they have identical next messages: " + "stream-iter-addr=%p" "stream-iter-addr=%p", + stream_iter, curr_candidate_stream_iter); + } } trace_idx++; } else { @@ -1149,6 +1229,8 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next( bt_self_message_iterator_get_data(self_msg_it); struct lttng_live_component *lttng_live = lttng_live_msg_iter->lttng_live_comp; + bt_self_component *self_comp = lttng_live_msg_iter->self_comp; + bt_logging_level log_level = lttng_live_msg_iter->log_level; enum lttng_live_iterator_status stream_iter_status; uint64_t session_idx; @@ -1260,9 +1342,39 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next( goto end; } - if (candidate_stream_iter->current_msg_ts_ns <= next_msg_ts_ns) { + if (G_UNLIKELY(next_stream_iter == NULL) || + candidate_stream_iter->current_msg_ts_ns <= next_msg_ts_ns) { next_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns; next_stream_iter = candidate_stream_iter; + } else if (candidate_stream_iter->current_msg_ts_ns == next_msg_ts_ns) { + /* + * The currently selected message to be sent + * downstream next has the exact same timestamp + * that of the current candidate message. We + * must break the tie in a predictable manner. + */ + BT_COMP_LOGD_STR("Two of the next message candidates have the same timestamps, pick one deterministically."); + /* + * Order the messages in an arbitrary but + * deterministic way. + */ + int ret = common_muxing_compare_messages( + candidate_stream_iter->current_msg, + next_stream_iter->current_msg); + if (ret < 0) { + /* + * The `candidate_stream_iter->current_msg` + * should go first. Update the next + * iterator and the current timestamp. + */ + next_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns; + next_stream_iter = candidate_stream_iter; + } else if (ret == 0) { + /* Unable to pick which one should go first. */ + BT_COMP_LOGW("Cannot deterministically pick next live stream message iterator because they have identical next messages: " + "next-stream-iter-addr=%p" "candidate-stream-iter-addr=%p", + next_stream_iter, candidate_stream_iter); + } } session_idx++; @@ -1372,7 +1484,7 @@ bt_component_class_message_iterator_init_method_status lttng_live_msg_iter_init( lttng_live_msg_iter->viewer_connection = live_viewer_connection_create(lttng_live->params.url->str, false, - lttng_live_msg_iter); + lttng_live_msg_iter, log_level); if (!lttng_live_msg_iter->viewer_connection) { goto error; } @@ -1407,6 +1519,17 @@ bt_component_class_message_iterator_init_method_status lttng_live_msg_iter_init( SESS_NOT_FOUND_ACTION_END_STR, lttng_live->params.url->str); break; + case SESSION_NOT_FOUND_ACTION_UNKNOWN: + default: + /* Fallthrough */ + BT_COMP_LOGE("Unknown action for session not found" + "error. Fail the message iterator" + "initialization because of %s=\"%s\" " + "component parameter: url =\"%s\"", + SESS_NOT_FOUND_ACTION_PARAM, + SESS_NOT_FOUND_ACTION_FAIL_STR, + lttng_live->params.url->str); + break; } } @@ -1434,21 +1557,22 @@ bt_component_class_query_method_status lttng_live_query_list_sessions( url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM); if (!url_value) { - BT_COMP_LOGW("Mandatory `%s` parameter missing", URL_PARAM); - status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_INVALID_PARAMS; + BT_COMP_LOGE("Mandatory `%s` parameter missing", URL_PARAM); + status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; goto error; } if (!bt_value_is_string(url_value)) { - BT_COMP_LOGW("`%s` parameter is required to be a string value", + BT_COMP_LOGE("`%s` parameter is required to be a string value", URL_PARAM); - status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_INVALID_PARAMS; + status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; goto error; } url = bt_value_string_get(url_value); - viewer_connection = live_viewer_connection_create(url, true, NULL); + viewer_connection = live_viewer_connection_create(url, true, NULL, + log_level); if (!viewer_connection) { goto error; } @@ -1478,20 +1602,24 @@ end: BT_HIDDEN bt_component_class_query_method_status lttng_live_query( bt_self_component_class_source *comp_class, - const bt_query_executor *query_exec, + bt_private_query_executor *priv_query_exec, const char *object, const bt_value *params, - bt_logging_level log_level, const bt_value **result) + __attribute__((unused)) void *method_data, + const bt_value **result) { bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK; bt_self_component *self_comp = NULL; + bt_logging_level log_level = bt_query_executor_get_logging_level( + bt_private_query_executor_as_query_executor_const( + priv_query_exec)); if (strcmp(object, "sessions") == 0) { status = lttng_live_query_list_sessions(params, result, log_level); } else { - BT_COMP_LOGW("Unknown query object `%s`", object); - status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_INVALID_OBJECT; + BT_COMP_LOGI("Unknown query object `%s`", object); + status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT; goto end; } @@ -1537,7 +1665,7 @@ enum session_not_found_action parse_session_not_found_action_param( } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0) { action = SESSION_NOT_FOUND_ACTION_END; } else { - action = -1; + action = SESSION_NOT_FOUND_ACTION_UNKNOWN; } return action; @@ -1577,7 +1705,7 @@ struct lttng_live_component *lttng_live_component_create(const bt_value *params, if (value && bt_value_is_string(value)) { lttng_live->params.sess_not_found_act = parse_session_not_found_action_param(value); - if (lttng_live->params.sess_not_found_act == -1) { + if (lttng_live->params.sess_not_found_act == SESSION_NOT_FOUND_ACTION_UNKNOWN) { BT_COMP_LOGE("Unexpected value for `%s` parameter: " "value=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM, bt_value_string_get(value)); @@ -1622,11 +1750,6 @@ bt_component_class_init_method_status lttng_live_component_init( goto error; } - if (lttng_live_graph_is_canceled(lttng_live)) { - ret = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR; - goto error; - } - add_port_status = bt_self_component_source_add_output_port( self_comp_src, "out", NULL, NULL); switch (add_port_status) {