Fix: src.ctf.lttng-live: missing ordering within traces and streams
authorFrancis Deslauriers <francis.deslauriers@efficios.com>
Mon, 12 Aug 2019 21:35:01 +0000 (17:35 -0400)
committerPhilippe Proulx <eeppeliteloop@gmail.com>
Tue, 13 Aug 2019 01:13:35 +0000 (21:13 -0400)
Issue
=====
Sorting of messages with the same timestamps are different from the
messages sorted by the a `flt.utils.muxer` component. This is caused by
the fact that we are not sorting messages within sessions and traces
but only across the different session currently handled by the
component.

Indeed, we need to sort messages at across three locations in the code
to ensure proper ordering: sessions, traces, and streams.

Solution
========
Handle messages with the same timestamps at each of these three
locations using the `common_muxing_compare_messages()` function.

Signed-off-by: Francis Deslauriers <francis.deslauriers@efficios.com>
Change-Id: Iadc66f3b6a12f34522f6364b3f47f15eefa87930
Reviewed-on: https://review.lttng.org/c/babeltrace/+/1888
Reviewed-by: Philippe Proulx <eeppeliteloop@gmail.com>
Tested-by: jenkins <jenkins@lttng.org>
src/plugins/ctf/lttng-live/lttng-live.c

index 72d3e74ea05b8d98f6bc62e6ac01ac0d9d5bade3..9352fd12c0e69ab2a9885ee14db574c68accd6cd 100644 (file)
@@ -964,6 +964,16 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace(
                        g_ptr_array_index(live_trace->stream_iterators,
                                        stream_iter_idx);
 
+               /*
+                * Since we may remove elements from the GPtrArray as we
+                * iterate over it, it's possible to see the same element more
+                * than once.
+                */
+               if (stream_iter == curr_candidate_stream_iter) {
+                       stream_iter_idx++;
+                       continue;
+               }
+
                /*
                 * Find if there is are now current message for this stream
                 * iterator get it.
@@ -1021,18 +1031,50 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace(
                        }
                }
 
-               if (!stream_iter_is_ended &&
-                               stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) {
-                       /*
-                        * Update the current best candidate message for the
-                        * stream iterator of thise live trace to be forwarded
-                        * downstream.
-                        */
-                       curr_candidate_msg_ts = stream_iter->current_msg_ts_ns;
-                       curr_candidate_stream_iter = stream_iter;
-               }
+               BT_ASSERT(stream_iter != curr_candidate_stream_iter);
 
-               if (stream_iter_is_ended) {
+               if (!stream_iter_is_ended) {
+                       if (G_UNLIKELY(curr_candidate_stream_iter == NULL) ||
+                                       stream_iter->current_msg_ts_ns < curr_candidate_msg_ts) {
+                               /*
+                                * Update the current best candidate message
+                                * for the stream iterator of this live trace
+                                * to be forwarded downstream.
+                                */
+                               curr_candidate_msg_ts = stream_iter->current_msg_ts_ns;
+                               curr_candidate_stream_iter = stream_iter;
+                       } else if (stream_iter->current_msg_ts_ns == curr_candidate_msg_ts) {
+                               /*
+                                * Order the messages in an arbitrary but
+                                * deterministic way.
+                                */
+                               BT_ASSERT(stream_iter != curr_candidate_stream_iter);
+                               int ret = common_muxing_compare_messages(
+                                       stream_iter->current_msg,
+                                       curr_candidate_stream_iter->current_msg);
+                               if (ret < 0) {
+                                       /*
+                                        * The `curr_candidate_stream_iter->current_msg`
+                                        * should go first. Update the next
+                                        * iterator and the current timestamp.
+                                        */
+                                       curr_candidate_msg_ts = stream_iter->current_msg_ts_ns;
+                                       curr_candidate_stream_iter = stream_iter;
+                               } else if (ret == 0) {
+                                       /*
+                                        * Unable to pick which one should go
+                                        * first.
+                                        */
+                                       BT_COMP_LOGW("Cannot deterministically pick next live stream message iterator because they have identical next messages: "
+                                               "stream-iter-addr=%p"
+                                               "stream-iter-addr=%p",
+                                               stream_iter,
+                                               curr_candidate_stream_iter);
+                               }
+                       }
+
+                       stream_iter_idx++;
+               } else {
                        /*
                         * The live stream iterator is ENDed. We remove that
                         * iterator from the list and we restart the iteration
@@ -1042,8 +1084,6 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace(
                        g_ptr_array_remove_index_fast(live_trace->stream_iterators,
                                stream_iter_idx);
                        stream_iter_idx = 0;
-               } else {
-                       stream_iter_idx++;
                }
        }
 
@@ -1069,6 +1109,8 @@ enum lttng_live_iterator_status next_stream_iterator_for_session(
                struct lttng_live_session *session,
                struct lttng_live_stream_iterator **candidate_session_stream_iter)
 {
+       bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
+       bt_logging_level log_level = lttng_live_msg_iter->log_level;
        enum lttng_live_iterator_status stream_iter_status;
        uint64_t trace_idx = 0;
        int64_t curr_candidate_msg_ts = INT64_MAX;
@@ -1113,9 +1155,32 @@ enum lttng_live_iterator_status next_stream_iterator_for_session(
                if (!trace_is_ended) {
                        BT_ASSERT(stream_iter);
 
-                       if (stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) {
+                       if (G_UNLIKELY(curr_candidate_stream_iter == NULL) ||
+                                       stream_iter->current_msg_ts_ns < curr_candidate_msg_ts) {
                                curr_candidate_msg_ts = stream_iter->current_msg_ts_ns;
                                curr_candidate_stream_iter = stream_iter;
+                       } else if (stream_iter->current_msg_ts_ns == curr_candidate_msg_ts) {
+                               /*
+                                * Order the messages in an arbitrary but
+                                * deterministic way.
+                                */
+                               int ret = common_muxing_compare_messages(
+                                       stream_iter->current_msg,
+                                       curr_candidate_stream_iter->current_msg);
+                               if (ret < 0) {
+                                       /*
+                                        * The `curr_candidate_stream_iter->current_msg`
+                                        * should go first. Update the next iterator
+                                        * and the current timestamp.
+                                        */
+                                       curr_candidate_msg_ts = stream_iter->current_msg_ts_ns;
+                                       curr_candidate_stream_iter = stream_iter;
+                               } else if (ret == 0) {
+                                       /* Unable to pick which one should go first. */
+                                       BT_COMP_LOGW("Cannot deterministically pick next live stream message iterator because they have identical next messages: "
+                                               "stream-iter-addr=%p" "stream-iter-addr=%p",
+                                               stream_iter, curr_candidate_stream_iter);
+                               }
                        }
                        trace_idx++;
                } else {
@@ -1291,9 +1356,10 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next(
                                BT_COMP_LOGD_STR("Two of the next message candidates have the same timestamps, pick one deterministically.");
                                /*
                                 * Order the messages in an arbitrary but
-                                * determinitic way.
+                                * deterministic way.
                                 */
-                               int ret = common_muxing_compare_messages(candidate_stream_iter->current_msg,
+                               int ret = common_muxing_compare_messages(
+                                       candidate_stream_iter->current_msg,
                                        next_stream_iter->current_msg);
                                if (ret < 0) {
                                        /*
This page took 0.027937 seconds and 4 git commands to generate.