From 07afb6bc2e161cfcc6c99d82bdebdfff12de8d63 Mon Sep 17 00:00:00 2001 From: Francis Deslauriers Date: Mon, 12 Aug 2019 17:35:01 -0400 Subject: [PATCH] Fix: src.ctf.lttng-live: missing ordering within traces and streams Issue ===== Sorting of messages with the same timestamps are different from the messages sorted by the a `flt.utils.muxer` component. This is caused by the fact that we are not sorting messages within sessions and traces but only across the different session currently handled by the component. Indeed, we need to sort messages at across three locations in the code to ensure proper ordering: sessions, traces, and streams. Solution ======== Handle messages with the same timestamps at each of these three locations using the `common_muxing_compare_messages()` function. Signed-off-by: Francis Deslauriers Change-Id: Iadc66f3b6a12f34522f6364b3f47f15eefa87930 Reviewed-on: https://review.lttng.org/c/babeltrace/+/1888 Reviewed-by: Philippe Proulx Tested-by: jenkins --- src/plugins/ctf/lttng-live/lttng-live.c | 98 +++++++++++++++++++++---- 1 file changed, 82 insertions(+), 16 deletions(-) diff --git a/src/plugins/ctf/lttng-live/lttng-live.c b/src/plugins/ctf/lttng-live/lttng-live.c index 72d3e74e..9352fd12 100644 --- a/src/plugins/ctf/lttng-live/lttng-live.c +++ b/src/plugins/ctf/lttng-live/lttng-live.c @@ -964,6 +964,16 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( g_ptr_array_index(live_trace->stream_iterators, stream_iter_idx); + /* + * Since we may remove elements from the GPtrArray as we + * iterate over it, it's possible to see the same element more + * than once. + */ + if (stream_iter == curr_candidate_stream_iter) { + stream_iter_idx++; + continue; + } + /* * Find if there is are now current message for this stream * iterator get it. @@ -1021,18 +1031,50 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( } } - if (!stream_iter_is_ended && - stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) { - /* - * Update the current best candidate message for the - * stream iterator of thise live trace to be forwarded - * downstream. - */ - curr_candidate_msg_ts = stream_iter->current_msg_ts_ns; - curr_candidate_stream_iter = stream_iter; - } + BT_ASSERT(stream_iter != curr_candidate_stream_iter); - if (stream_iter_is_ended) { + if (!stream_iter_is_ended) { + if (G_UNLIKELY(curr_candidate_stream_iter == NULL) || + stream_iter->current_msg_ts_ns < curr_candidate_msg_ts) { + /* + * Update the current best candidate message + * for the stream iterator of this live trace + * to be forwarded downstream. + */ + curr_candidate_msg_ts = stream_iter->current_msg_ts_ns; + curr_candidate_stream_iter = stream_iter; + } else if (stream_iter->current_msg_ts_ns == curr_candidate_msg_ts) { + /* + * Order the messages in an arbitrary but + * deterministic way. + */ + BT_ASSERT(stream_iter != curr_candidate_stream_iter); + int ret = common_muxing_compare_messages( + stream_iter->current_msg, + curr_candidate_stream_iter->current_msg); + if (ret < 0) { + /* + * The `curr_candidate_stream_iter->current_msg` + * should go first. Update the next + * iterator and the current timestamp. + */ + curr_candidate_msg_ts = stream_iter->current_msg_ts_ns; + curr_candidate_stream_iter = stream_iter; + } else if (ret == 0) { + /* + * Unable to pick which one should go + * first. + */ + BT_COMP_LOGW("Cannot deterministically pick next live stream message iterator because they have identical next messages: " + "stream-iter-addr=%p" + "stream-iter-addr=%p", + stream_iter, + curr_candidate_stream_iter); + } + } + + stream_iter_idx++; + } else { /* * The live stream iterator is ENDed. We remove that * iterator from the list and we restart the iteration @@ -1042,8 +1084,6 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( g_ptr_array_remove_index_fast(live_trace->stream_iterators, stream_iter_idx); stream_iter_idx = 0; - } else { - stream_iter_idx++; } } @@ -1069,6 +1109,8 @@ enum lttng_live_iterator_status next_stream_iterator_for_session( struct lttng_live_session *session, struct lttng_live_stream_iterator **candidate_session_stream_iter) { + bt_self_component *self_comp = lttng_live_msg_iter->self_comp; + bt_logging_level log_level = lttng_live_msg_iter->log_level; enum lttng_live_iterator_status stream_iter_status; uint64_t trace_idx = 0; int64_t curr_candidate_msg_ts = INT64_MAX; @@ -1113,9 +1155,32 @@ enum lttng_live_iterator_status next_stream_iterator_for_session( if (!trace_is_ended) { BT_ASSERT(stream_iter); - if (stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) { + if (G_UNLIKELY(curr_candidate_stream_iter == NULL) || + stream_iter->current_msg_ts_ns < curr_candidate_msg_ts) { curr_candidate_msg_ts = stream_iter->current_msg_ts_ns; curr_candidate_stream_iter = stream_iter; + } else if (stream_iter->current_msg_ts_ns == curr_candidate_msg_ts) { + /* + * Order the messages in an arbitrary but + * deterministic way. + */ + int ret = common_muxing_compare_messages( + stream_iter->current_msg, + curr_candidate_stream_iter->current_msg); + if (ret < 0) { + /* + * The `curr_candidate_stream_iter->current_msg` + * should go first. Update the next iterator + * and the current timestamp. + */ + curr_candidate_msg_ts = stream_iter->current_msg_ts_ns; + curr_candidate_stream_iter = stream_iter; + } else if (ret == 0) { + /* Unable to pick which one should go first. */ + BT_COMP_LOGW("Cannot deterministically pick next live stream message iterator because they have identical next messages: " + "stream-iter-addr=%p" "stream-iter-addr=%p", + stream_iter, curr_candidate_stream_iter); + } } trace_idx++; } else { @@ -1291,9 +1356,10 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next( BT_COMP_LOGD_STR("Two of the next message candidates have the same timestamps, pick one deterministically."); /* * Order the messages in an arbitrary but - * determinitic way. + * deterministic way. */ - int ret = common_muxing_compare_messages(candidate_stream_iter->current_msg, + int ret = common_muxing_compare_messages( + candidate_stream_iter->current_msg, next_stream_iter->current_msg); if (ret < 0) { /* -- 2.34.1