g_ptr_array_index(live_trace->stream_iterators,
stream_iter_idx);
+ /*
+ * Since we may remove elements from the GPtrArray as we
+ * iterate over it, it's possible to see the same element more
+ * than once.
+ */
+ if (stream_iter == curr_candidate_stream_iter) {
+ stream_iter_idx++;
+ continue;
+ }
+
/*
* Find if there is are now current message for this stream
* iterator get it.
}
}
- if (!stream_iter_is_ended &&
- stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) {
- /*
- * Update the current best candidate message for the
- * stream iterator of thise live trace to be forwarded
- * downstream.
- */
- curr_candidate_msg_ts = stream_iter->current_msg_ts_ns;
- curr_candidate_stream_iter = stream_iter;
- }
+ BT_ASSERT(stream_iter != curr_candidate_stream_iter);
- if (stream_iter_is_ended) {
+ if (!stream_iter_is_ended) {
+ if (G_UNLIKELY(curr_candidate_stream_iter == NULL) ||
+ stream_iter->current_msg_ts_ns < curr_candidate_msg_ts) {
+ /*
+ * Update the current best candidate message
+ * for the stream iterator of this live trace
+ * to be forwarded downstream.
+ */
+ curr_candidate_msg_ts = stream_iter->current_msg_ts_ns;
+ curr_candidate_stream_iter = stream_iter;
+ } else if (stream_iter->current_msg_ts_ns == curr_candidate_msg_ts) {
+ /*
+ * Order the messages in an arbitrary but
+ * deterministic way.
+ */
+ BT_ASSERT(stream_iter != curr_candidate_stream_iter);
+ int ret = common_muxing_compare_messages(
+ stream_iter->current_msg,
+ curr_candidate_stream_iter->current_msg);
+ if (ret < 0) {
+ /*
+ * The `curr_candidate_stream_iter->current_msg`
+ * should go first. Update the next
+ * iterator and the current timestamp.
+ */
+ curr_candidate_msg_ts = stream_iter->current_msg_ts_ns;
+ curr_candidate_stream_iter = stream_iter;
+ } else if (ret == 0) {
+ /*
+ * Unable to pick which one should go
+ * first.
+ */
+ BT_COMP_LOGW("Cannot deterministically pick next live stream message iterator because they have identical next messages: "
+ "stream-iter-addr=%p"
+ "stream-iter-addr=%p",
+ stream_iter,
+ curr_candidate_stream_iter);
+ }
+ }
+
+ stream_iter_idx++;
+ } else {
/*
* The live stream iterator is ENDed. We remove that
* iterator from the list and we restart the iteration
g_ptr_array_remove_index_fast(live_trace->stream_iterators,
stream_iter_idx);
stream_iter_idx = 0;
- } else {
- stream_iter_idx++;
}
}
struct lttng_live_session *session,
struct lttng_live_stream_iterator **candidate_session_stream_iter)
{
+ bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
+ bt_logging_level log_level = lttng_live_msg_iter->log_level;
enum lttng_live_iterator_status stream_iter_status;
uint64_t trace_idx = 0;
int64_t curr_candidate_msg_ts = INT64_MAX;
if (!trace_is_ended) {
BT_ASSERT(stream_iter);
- if (stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) {
+ if (G_UNLIKELY(curr_candidate_stream_iter == NULL) ||
+ stream_iter->current_msg_ts_ns < curr_candidate_msg_ts) {
curr_candidate_msg_ts = stream_iter->current_msg_ts_ns;
curr_candidate_stream_iter = stream_iter;
+ } else if (stream_iter->current_msg_ts_ns == curr_candidate_msg_ts) {
+ /*
+ * Order the messages in an arbitrary but
+ * deterministic way.
+ */
+ int ret = common_muxing_compare_messages(
+ stream_iter->current_msg,
+ curr_candidate_stream_iter->current_msg);
+ if (ret < 0) {
+ /*
+ * The `curr_candidate_stream_iter->current_msg`
+ * should go first. Update the next iterator
+ * and the current timestamp.
+ */
+ curr_candidate_msg_ts = stream_iter->current_msg_ts_ns;
+ curr_candidate_stream_iter = stream_iter;
+ } else if (ret == 0) {
+ /* Unable to pick which one should go first. */
+ BT_COMP_LOGW("Cannot deterministically pick next live stream message iterator because they have identical next messages: "
+ "stream-iter-addr=%p" "stream-iter-addr=%p",
+ stream_iter, curr_candidate_stream_iter);
+ }
}
trace_idx++;
} else {
BT_COMP_LOGD_STR("Two of the next message candidates have the same timestamps, pick one deterministically.");
/*
* Order the messages in an arbitrary but
- * determinitic way.
+ * deterministic way.
*/
- int ret = common_muxing_compare_messages(candidate_stream_iter->current_msg,
+ int ret = common_muxing_compare_messages(
+ candidate_stream_iter->current_msg,
next_stream_iter->current_msg);
if (ret < 0) {
/*