X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fplugins%2Fctf%2Flttng-live%2Flttng-live.c;h=9352fd12c0e69ab2a9885ee14db574c68accd6cd;hb=07afb6bc2e161cfcc6c99d82bdebdfff12de8d63;hp=72d3e74ea05b8d98f6bc62e6ac01ac0d9d5bade3;hpb=2080bf80f9306106500dd5f9e7d0493b43b9dd79;p=babeltrace.git diff --git a/src/plugins/ctf/lttng-live/lttng-live.c b/src/plugins/ctf/lttng-live/lttng-live.c index 72d3e74e..9352fd12 100644 --- a/src/plugins/ctf/lttng-live/lttng-live.c +++ b/src/plugins/ctf/lttng-live/lttng-live.c @@ -964,6 +964,16 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( g_ptr_array_index(live_trace->stream_iterators, stream_iter_idx); + /* + * Since we may remove elements from the GPtrArray as we + * iterate over it, it's possible to see the same element more + * than once. + */ + if (stream_iter == curr_candidate_stream_iter) { + stream_iter_idx++; + continue; + } + /* * Find if there is are now current message for this stream * iterator get it. @@ -1021,18 +1031,50 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( } } - if (!stream_iter_is_ended && - stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) { - /* - * Update the current best candidate message for the - * stream iterator of thise live trace to be forwarded - * downstream. - */ - curr_candidate_msg_ts = stream_iter->current_msg_ts_ns; - curr_candidate_stream_iter = stream_iter; - } + BT_ASSERT(stream_iter != curr_candidate_stream_iter); - if (stream_iter_is_ended) { + if (!stream_iter_is_ended) { + if (G_UNLIKELY(curr_candidate_stream_iter == NULL) || + stream_iter->current_msg_ts_ns < curr_candidate_msg_ts) { + /* + * Update the current best candidate message + * for the stream iterator of this live trace + * to be forwarded downstream. + */ + curr_candidate_msg_ts = stream_iter->current_msg_ts_ns; + curr_candidate_stream_iter = stream_iter; + } else if (stream_iter->current_msg_ts_ns == curr_candidate_msg_ts) { + /* + * Order the messages in an arbitrary but + * deterministic way. + */ + BT_ASSERT(stream_iter != curr_candidate_stream_iter); + int ret = common_muxing_compare_messages( + stream_iter->current_msg, + curr_candidate_stream_iter->current_msg); + if (ret < 0) { + /* + * The `curr_candidate_stream_iter->current_msg` + * should go first. Update the next + * iterator and the current timestamp. + */ + curr_candidate_msg_ts = stream_iter->current_msg_ts_ns; + curr_candidate_stream_iter = stream_iter; + } else if (ret == 0) { + /* + * Unable to pick which one should go + * first. + */ + BT_COMP_LOGW("Cannot deterministically pick next live stream message iterator because they have identical next messages: " + "stream-iter-addr=%p" + "stream-iter-addr=%p", + stream_iter, + curr_candidate_stream_iter); + } + } + + stream_iter_idx++; + } else { /* * The live stream iterator is ENDed. We remove that * iterator from the list and we restart the iteration @@ -1042,8 +1084,6 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( g_ptr_array_remove_index_fast(live_trace->stream_iterators, stream_iter_idx); stream_iter_idx = 0; - } else { - stream_iter_idx++; } } @@ -1069,6 +1109,8 @@ enum lttng_live_iterator_status next_stream_iterator_for_session( struct lttng_live_session *session, struct lttng_live_stream_iterator **candidate_session_stream_iter) { + bt_self_component *self_comp = lttng_live_msg_iter->self_comp; + bt_logging_level log_level = lttng_live_msg_iter->log_level; enum lttng_live_iterator_status stream_iter_status; uint64_t trace_idx = 0; int64_t curr_candidate_msg_ts = INT64_MAX; @@ -1113,9 +1155,32 @@ enum lttng_live_iterator_status next_stream_iterator_for_session( if (!trace_is_ended) { BT_ASSERT(stream_iter); - if (stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) { + if (G_UNLIKELY(curr_candidate_stream_iter == NULL) || + stream_iter->current_msg_ts_ns < curr_candidate_msg_ts) { curr_candidate_msg_ts = stream_iter->current_msg_ts_ns; curr_candidate_stream_iter = stream_iter; + } else if (stream_iter->current_msg_ts_ns == curr_candidate_msg_ts) { + /* + * Order the messages in an arbitrary but + * deterministic way. + */ + int ret = common_muxing_compare_messages( + stream_iter->current_msg, + curr_candidate_stream_iter->current_msg); + if (ret < 0) { + /* + * The `curr_candidate_stream_iter->current_msg` + * should go first. Update the next iterator + * and the current timestamp. + */ + curr_candidate_msg_ts = stream_iter->current_msg_ts_ns; + curr_candidate_stream_iter = stream_iter; + } else if (ret == 0) { + /* Unable to pick which one should go first. */ + BT_COMP_LOGW("Cannot deterministically pick next live stream message iterator because they have identical next messages: " + "stream-iter-addr=%p" "stream-iter-addr=%p", + stream_iter, curr_candidate_stream_iter); + } } trace_idx++; } else { @@ -1291,9 +1356,10 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next( BT_COMP_LOGD_STR("Two of the next message candidates have the same timestamps, pick one deterministically."); /* * Order the messages in an arbitrary but - * determinitic way. + * deterministic way. */ - int ret = common_muxing_compare_messages(candidate_stream_iter->current_msg, + int ret = common_muxing_compare_messages( + candidate_stream_iter->current_msg, next_stream_iter->current_msg); if (ret < 0) { /*