X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fplugins%2Fctf%2Flttng-live%2Flttng-live.c;h=a79d02b10d3f7abb87c462c2dddf764cb6a04e94;hb=59225a3e0e13a9c674234755e55055d9ff68d635;hp=f13a5eabc301f9ac60a0f331c11b6e32af0ee269;hpb=550004b427608c43666ed59b552c37ea900ad6f8;p=babeltrace.git diff --git a/src/plugins/ctf/lttng-live/lttng-live.c b/src/plugins/ctf/lttng-live/lttng-live.c index f13a5eab..a79d02b1 100644 --- a/src/plugins/ctf/lttng-live/lttng-live.c +++ b/src/plugins/ctf/lttng-live/lttng-live.c @@ -42,12 +42,15 @@ #include "compat/compiler.h" #include +#include "plugins/common/muxing/muxing.h" + #include "data-stream.h" #include "metadata.h" #include "lttng-live.h" #define MAX_QUERY_SIZE (256*1024) #define URL_PARAM "url" +#define INPUTS_PARAM "inputs" #define SESS_NOT_FOUND_ACTION_PARAM "session-not-found-action" #define SESS_NOT_FOUND_ACTION_CONTINUE_STR "continue" #define SESS_NOT_FOUND_ACTION_FAIL_STR "fail" @@ -793,6 +796,34 @@ end: return ret; } +static +enum lttng_live_iterator_status lttng_live_iterator_close_stream( + struct lttng_live_msg_iter *lttng_live_msg_iter, + struct lttng_live_stream_iterator *stream_iter, + bt_message **curr_msg) +{ + enum lttng_live_iterator_status live_status = + LTTNG_LIVE_ITERATOR_STATUS_OK; + /* + * The viewer has hung up on us so we are closing the stream. The + * `bt_msg_iter` should simply realize that it needs to close the + * stream properly by emitting the necessary stream end message. + */ + enum bt_msg_iter_status status = + bt_msg_iter_get_next_message(stream_iter->msg_iter, + lttng_live_msg_iter->self_msg_iter, curr_msg); + + if (status == BT_MSG_ITER_STATUS_ERROR) { + live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; + } + + BT_ASSERT(status == BT_MSG_ITER_STATUS_OK); + +end: + return live_status; +} + /* * helper function: * handle_no_data_streams() @@ -842,7 +873,7 @@ end: * When disconnected from relayd: try to re-connect endlessly. */ static -enum lttng_live_iterator_status lttng_live_iterator_next_on_stream( +enum lttng_live_iterator_status lttng_live_iterator_next_msg_on_stream( struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator *stream_iter, bt_message **curr_msg) @@ -851,6 +882,17 @@ enum lttng_live_iterator_status lttng_live_iterator_next_on_stream( bt_self_component *self_comp = lttng_live_msg_iter->self_comp; enum lttng_live_iterator_status live_status; + if (stream_iter->has_stream_hung_up) { + /* + * The stream has hung up and the stream was properly closed + * during the last call to the current function. Return _END + * status now so that this stream iterator is removed for the + * stream iterator list. + */ + live_status = LTTNG_LIVE_ITERATOR_STATUS_END; + goto end; + } + retry: print_stream_state(stream_iter); live_status = lttng_live_iterator_handle_new_streams_and_metadata( @@ -860,7 +902,16 @@ retry: } live_status = lttng_live_iterator_next_handle_one_no_data_stream( lttng_live_msg_iter, stream_iter); + if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { + if (live_status == LTTNG_LIVE_ITERATOR_STATUS_END) { + /* + * We overwrite `live_status` since `curr_msg` is + * likely set to a valid message in this function. + */ + live_status = lttng_live_iterator_close_stream( + lttng_live_msg_iter, stream_iter, curr_msg); + } goto end; } live_status = lttng_live_iterator_next_handle_one_quiescent_stream( @@ -890,11 +941,11 @@ static enum lttng_live_iterator_status next_stream_iterator_for_trace( struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_trace *live_trace, - struct lttng_live_stream_iterator **candidate_stream_iter) + struct lttng_live_stream_iterator **youngest_trace_stream_iter) { - struct lttng_live_stream_iterator *curr_candidate_stream_iter = NULL; + struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL; enum lttng_live_iterator_status stream_iter_status;; - int64_t curr_candidate_msg_ts = INT64_MAX; + int64_t youngest_candidate_msg_ts = INT64_MAX; uint64_t stream_iter_idx; bt_logging_level log_level = lttng_live_msg_iter->log_level; bt_self_component *self_comp = lttng_live_msg_iter->self_comp; @@ -914,6 +965,16 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( g_ptr_array_index(live_trace->stream_iterators, stream_iter_idx); + /* + * Since we may remove elements from the GPtrArray as we + * iterate over it, it's possible to see the same element more + * than once. + */ + if (stream_iter == youngest_candidate_stream_iter) { + stream_iter_idx++; + continue; + } + /* * Find if there is are now current message for this stream * iterator get it. @@ -921,7 +982,7 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( while (!stream_iter->current_msg) { bt_message *msg = NULL; int64_t curr_msg_ts_ns = INT64_MAX; - stream_iter_status = lttng_live_iterator_next_on_stream( + stream_iter_status = lttng_live_iterator_next_msg_on_stream( lttng_live_msg_iter, stream_iter, &msg); BT_COMP_LOGD("live stream iterator returned status :%s", @@ -971,18 +1032,50 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( } } - if (!stream_iter_is_ended && - stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) { - /* - * Update the current best candidate message for the - * stream iterator of thise live trace to be forwarded - * downstream. - */ - curr_candidate_msg_ts = stream_iter->current_msg_ts_ns; - curr_candidate_stream_iter = stream_iter; - } + BT_ASSERT(stream_iter != youngest_candidate_stream_iter); - if (stream_iter_is_ended) { + if (!stream_iter_is_ended) { + if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) || + stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) { + /* + * Update the current best candidate message + * for the stream iterator of this live trace + * to be forwarded downstream. + */ + youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns; + youngest_candidate_stream_iter = stream_iter; + } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) { + /* + * Order the messages in an arbitrary but + * deterministic way. + */ + BT_ASSERT(stream_iter != youngest_candidate_stream_iter); + int ret = common_muxing_compare_messages( + stream_iter->current_msg, + youngest_candidate_stream_iter->current_msg); + if (ret < 0) { + /* + * The `youngest_candidate_stream_iter->current_msg` + * should go first. Update the next + * iterator and the current timestamp. + */ + youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns; + youngest_candidate_stream_iter = stream_iter; + } else if (ret == 0) { + /* + * Unable to pick which one should go + * first. + */ + BT_COMP_LOGW("Cannot deterministically pick next live stream message iterator because they have identical next messages: " + "stream-iter-addr=%p" + "stream-iter-addr=%p", + stream_iter, + youngest_candidate_stream_iter); + } + } + + stream_iter_idx++; + } else { /* * The live stream iterator is ENDed. We remove that * iterator from the list and we restart the iteration @@ -992,13 +1085,11 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( g_ptr_array_remove_index_fast(live_trace->stream_iterators, stream_iter_idx); stream_iter_idx = 0; - } else { - stream_iter_idx++; } } - if (curr_candidate_stream_iter) { - *candidate_stream_iter = curr_candidate_stream_iter; + if (youngest_candidate_stream_iter) { + *youngest_trace_stream_iter = youngest_candidate_stream_iter; stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK; } else { /* @@ -1017,12 +1108,14 @@ static enum lttng_live_iterator_status next_stream_iterator_for_session( struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_session *session, - struct lttng_live_stream_iterator **candidate_session_stream_iter) + struct lttng_live_stream_iterator **youngest_session_stream_iter) { + bt_self_component *self_comp = lttng_live_msg_iter->self_comp; + bt_logging_level log_level = lttng_live_msg_iter->log_level; enum lttng_live_iterator_status stream_iter_status; uint64_t trace_idx = 0; - int64_t curr_candidate_msg_ts = INT64_MAX; - struct lttng_live_stream_iterator *curr_candidate_stream_iter = NULL; + int64_t youngest_candidate_msg_ts = INT64_MAX; + struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL; /* * Make sure we are attached to the session and look for new streams @@ -1063,9 +1156,32 @@ enum lttng_live_iterator_status next_stream_iterator_for_session( if (!trace_is_ended) { BT_ASSERT(stream_iter); - if (stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) { - curr_candidate_msg_ts = stream_iter->current_msg_ts_ns; - curr_candidate_stream_iter = stream_iter; + if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) || + stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) { + youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns; + youngest_candidate_stream_iter = stream_iter; + } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) { + /* + * Order the messages in an arbitrary but + * deterministic way. + */ + int ret = common_muxing_compare_messages( + stream_iter->current_msg, + youngest_candidate_stream_iter->current_msg); + if (ret < 0) { + /* + * The `youngest_candidate_stream_iter->current_msg` + * should go first. Update the next iterator + * and the current timestamp. + */ + youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns; + youngest_candidate_stream_iter = stream_iter; + } else if (ret == 0) { + /* Unable to pick which one should go first. */ + BT_COMP_LOGW("Cannot deterministically pick next live stream message iterator because they have identical next messages: " + "stream-iter-addr=%p" "stream-iter-addr=%p", + stream_iter, youngest_candidate_stream_iter); + } } trace_idx++; } else { @@ -1073,8 +1189,8 @@ enum lttng_live_iterator_status next_stream_iterator_for_session( trace_idx = 0; } } - if (curr_candidate_stream_iter) { - *candidate_session_stream_iter = curr_candidate_stream_iter; + if (youngest_candidate_stream_iter) { + *youngest_session_stream_iter = youngest_candidate_stream_iter; stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK; } else { /* @@ -1114,6 +1230,8 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next( bt_self_message_iterator_get_data(self_msg_it); struct lttng_live_component *lttng_live = lttng_live_msg_iter->lttng_live_comp; + bt_self_component *self_comp = lttng_live_msg_iter->self_comp; + bt_logging_level log_level = lttng_live_msg_iter->log_level; enum lttng_live_iterator_status stream_iter_status; uint64_t session_idx; @@ -1174,9 +1292,9 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next( * return it. */ while (*count < capacity) { - struct lttng_live_stream_iterator *next_stream_iter = NULL, - *candidate_stream_iter = NULL; - int64_t next_msg_ts_ns = INT64_MAX; + struct lttng_live_stream_iterator *youngest_stream_iter = NULL, + *candidate_stream_iter = NULL; + int64_t youngest_msg_ts_ns = INT64_MAX; BT_ASSERT(lttng_live_msg_iter->sessions); session_idx = 0; @@ -1225,35 +1343,65 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next( goto end; } - if (candidate_stream_iter->current_msg_ts_ns <= next_msg_ts_ns) { - next_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns; - next_stream_iter = candidate_stream_iter; + if (G_UNLIKELY(youngest_stream_iter == NULL) || + candidate_stream_iter->current_msg_ts_ns < youngest_msg_ts_ns) { + youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns; + youngest_stream_iter = candidate_stream_iter; + } else if (candidate_stream_iter->current_msg_ts_ns == youngest_msg_ts_ns) { + /* + * The currently selected message to be sent + * downstream next has the exact same timestamp + * that of the current candidate message. We + * must break the tie in a predictable manner. + */ + BT_COMP_LOGD_STR("Two of the next message candidates have the same timestamps, pick one deterministically."); + /* + * Order the messages in an arbitrary but + * deterministic way. + */ + int ret = common_muxing_compare_messages( + candidate_stream_iter->current_msg, + youngest_stream_iter->current_msg); + if (ret < 0) { + /* + * The `candidate_stream_iter->current_msg` + * should go first. Update the next + * iterator and the current timestamp. + */ + youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns; + youngest_stream_iter = candidate_stream_iter; + } else if (ret == 0) { + /* Unable to pick which one should go first. */ + BT_COMP_LOGW("Cannot deterministically pick next live stream message iterator because they have identical next messages: " + "next-stream-iter-addr=%p" "candidate-stream-iter-addr=%p", + youngest_stream_iter, candidate_stream_iter); + } } session_idx++; } - if (!next_stream_iter) { + if (!youngest_stream_iter) { stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; goto end; } - BT_ASSERT(next_stream_iter->current_msg); + BT_ASSERT(youngest_stream_iter->current_msg); /* Ensure monotonicity. */ BT_ASSERT(lttng_live_msg_iter->last_msg_ts_ns <= - next_stream_iter->current_msg_ts_ns); + youngest_stream_iter->current_msg_ts_ns); /* * Insert the next message to the message batch. This will set * stream iterator current messsage to NULL so that next time * we fetch the next message of that stream iterator */ - BT_MESSAGE_MOVE_REF(msgs[*count], next_stream_iter->current_msg); + BT_MESSAGE_MOVE_REF(msgs[*count], youngest_stream_iter->current_msg); (*count)++; /* Update the last timestamp in nanoseconds sent downstream. */ - lttng_live_msg_iter->last_msg_ts_ns = next_msg_ts_ns; - next_stream_iter->current_msg_ts_ns = INT64_MAX; + lttng_live_msg_iter->last_msg_ts_ns = youngest_msg_ts_ns; + youngest_stream_iter->current_msg_ts_ns = INT64_MAX; stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK; } @@ -1452,11 +1600,87 @@ end: return status; } +static +bt_component_class_query_method_status lttng_live_query_support_info( + const bt_value *params, const bt_value **result, + bt_logging_level log_level) +{ + bt_component_class_query_method_status status = + BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK; + const bt_value *input_type_value; + const bt_value *input_value; + double weight = 0; + struct bt_common_lttng_live_url_parts parts = { 0 }; + + /* Used by the logging macros */ + __attribute__((unused)) bt_self_component *self_comp = NULL; + + *result = NULL; + input_type_value = bt_value_map_borrow_entry_value_const(params, + "type"); + if (!input_type_value) { + BT_COMP_LOGE("Missing expected `type` parameter."); + goto error; + } + + if (!bt_value_is_string(input_type_value)) { + BT_COMP_LOGE("`type` parameter is not a string value."); + goto error; + } + + if (strcmp(bt_value_string_get(input_type_value), "string") != 0) { + /* We don't handle file system paths */ + goto create_result; + } + + input_value = bt_value_map_borrow_entry_value_const(params, "input"); + if (!input_value) { + BT_COMP_LOGE("Missing expected `input` parameter."); + goto error; + } + + if (!bt_value_is_string(input_value)) { + BT_COMP_LOGE("`input` parameter is not a string value."); + goto error; + } + + parts = bt_common_parse_lttng_live_url(bt_value_string_get(input_value), + NULL, 0); + if (parts.session_name) { + /* + * Looks pretty much like an LTTng live URL: we got the + * session name part, which forms a complete URL. + */ + weight = .75; + } + +create_result: + *result = bt_value_real_create_init(weight); + if (!*result) { + status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR; + goto error; + } + + goto end; + +error: + if (status >= 0) { + status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; + } + + BT_ASSERT(!*result); + +end: + bt_common_destroy_lttng_live_url_parts(&parts); + return status; +} + BT_HIDDEN bt_component_class_query_method_status lttng_live_query( bt_self_component_class_source *comp_class, bt_private_query_executor *priv_query_exec, const char *object, const bt_value *params, + __attribute__((unused)) void *method_data, const bt_value **result) { bt_component_class_query_method_status status = @@ -1469,6 +1693,9 @@ bt_component_class_query_method_status lttng_live_query( if (strcmp(object, "sessions") == 0) { status = lttng_live_query_list_sessions(params, result, log_level); + } else if (strcmp(object, "babeltrace.support-info") == 0) { + status = lttng_live_query_support_info(params, result, + log_level); } else { BT_COMP_LOGI("Unknown query object `%s`", object); status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT; @@ -1527,7 +1754,9 @@ struct lttng_live_component *lttng_live_component_create(const bt_value *params, bt_logging_level log_level, bt_self_component *self_comp) { struct lttng_live_component *lttng_live; - const bt_value *value = NULL; + const bt_value *inputs_value; + const bt_value *url_value; + const bt_value *value; const char *url; lttng_live = g_new0(struct lttng_live_component, 1); @@ -1539,13 +1768,36 @@ struct lttng_live_component *lttng_live_component_create(const bt_value *params, lttng_live->max_query_size = MAX_QUERY_SIZE; lttng_live->has_msg_iter = false; - value = bt_value_map_borrow_entry_value_const(params, URL_PARAM); - if (!value || !bt_value_is_string(value)) { - BT_COMP_LOGW("Mandatory `%s` parameter missing or not a string", - URL_PARAM); + inputs_value = bt_value_map_borrow_entry_value_const(params, + INPUTS_PARAM); + if (!inputs_value) { + BT_COMP_LOGE("Mandatory `%s` parameter missing", INPUTS_PARAM); + goto error; + } + + if (!bt_value_is_array(inputs_value)) { + BT_COMP_LOGE("`%s` parameter is required to be an array value", + INPUTS_PARAM); + goto error; + } + + if (bt_value_array_get_length(inputs_value) != 1) { + BT_COMP_LOGE("`%s` parameter's length is required to be 1", + INPUTS_PARAM); goto error; } - url = bt_value_string_get(value); + + url_value = bt_value_array_borrow_element_by_index_const(inputs_value, + 0); + BT_ASSERT(url_value); + + if (!bt_value_is_string(url_value)) { + BT_COMP_LOGE("First element of `%s` parameter is required to be a string value (URL)", + INPUTS_PARAM); + goto error; + } + + url = bt_value_string_get(url_value); lttng_live->params.url = g_string_new(url); if (!lttng_live->params.url) { goto error; @@ -1553,8 +1805,13 @@ struct lttng_live_component *lttng_live_component_create(const bt_value *params, value = bt_value_map_borrow_entry_value_const(params, SESS_NOT_FOUND_ACTION_PARAM); + if (value) { + if (!bt_value_is_string(value)) { + BT_COMP_LOGE("`%s` parameter is required to be a string value", + SESS_NOT_FOUND_ACTION_PARAM); + goto error; + } - if (value && bt_value_is_string(value)) { lttng_live->params.sess_not_found_act = parse_session_not_found_action_param(value); if (lttng_live->params.sess_not_found_act == SESSION_NOT_FOUND_ACTION_UNKNOWN) { @@ -1564,9 +1821,8 @@ struct lttng_live_component *lttng_live_component_create(const bt_value *params, goto error; } } else { - BT_COMP_LOGW("Optional `%s` parameter is missing or " - "not a string value. Defaulting to %s=\"%s\".", - SESS_NOT_FOUND_ACTION_PARAM, + BT_COMP_LOGI("Optional `%s` parameter is missing: " + "defaulting to `%s`.", SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR); lttng_live->params.sess_not_found_act = @@ -1585,7 +1841,9 @@ end: BT_HIDDEN bt_component_class_init_method_status lttng_live_component_init( bt_self_component_source *self_comp_src, - const bt_value *params, __attribute__((unused)) void *init_method_data) + bt_self_component_source_configuration *config, + const bt_value *params, + __attribute__((unused)) void *init_method_data) { struct lttng_live_component *lttng_live; bt_component_class_init_method_status ret =