Tests: flt.utils.muxer: run `black` on bt_plugin_muxer_test.py
[babeltrace.git] / src / plugins / utils / muxer / muxer.c
index 9510d07a5e45a865d33f7855fbdc02534f384268..6cf8ae6c65751a437214b8b6dc781f9fa14a59e8 100644 (file)
@@ -23,7 +23,7 @@
 #define BT_COMP_LOG_SELF_COMP (muxer_comp->self_comp)
 #define BT_LOG_OUTPUT_LEVEL (muxer_comp->log_level)
 #define BT_LOG_TAG "PLUGIN/FLT.UTILS.MUXER"
-#include "plugins/comp-logging.h"
+#include "logging/comp-logging.h"
 
 #include "common/macros.h"
 #include "common/uuid.h"
@@ -36,6 +36,8 @@
 #include <stdlib.h>
 #include <string.h>
 
+#include "plugins/common/muxing/muxing.h"
+
 #include "muxer.h"
 
 #define ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME       "assume-absolute-clock-classes"
@@ -73,6 +75,9 @@ enum muxer_msg_iter_clock_class_expectation {
 struct muxer_msg_iter {
        struct muxer_comp *muxer_comp;
 
+       /* Weak */
+       bt_self_message_iterator *self_msg_iter;
+
        /*
         * Array of struct muxer_upstream_msg_iter * (owned by this).
         *
@@ -427,6 +432,7 @@ void muxer_finalize(bt_self_component_filter *self_comp)
 static
 bt_self_component_port_input_message_iterator *
 create_msg_iter_on_input_port(struct muxer_comp *muxer_comp,
+               struct muxer_msg_iter *muxer_msg_iter,
                bt_self_component_port_input *self_port)
 {
        const bt_port *port = bt_self_component_port_as_port(
@@ -441,8 +447,8 @@ create_msg_iter_on_input_port(struct muxer_comp *muxer_comp,
        // TODO: Advance the iterator to >= the time of the latest
        //       returned message by the muxer message
        //       iterator which creates it.
-       msg_iter = bt_self_component_port_input_message_iterator_create(
-               self_port);
+       msg_iter = bt_self_component_port_input_message_iterator_create_from_message_iterator(
+               muxer_msg_iter->self_msg_iter, self_port);
        if (!msg_iter) {
                BT_COMP_LOGE("Cannot create upstream message iterator on input port: "
                        "port-addr=%p, port-name=\"%s\"",
@@ -939,11 +945,44 @@ muxer_msg_iter_youngest_upstream_msg_iter(
                        goto end;
                }
 
-               if (msg_ts_ns <= youngest_ts_ns) {
+               if (msg_ts_ns < youngest_ts_ns) {
                        *muxer_upstream_msg_iter =
                                cur_muxer_upstream_msg_iter;
                        youngest_ts_ns = msg_ts_ns;
                        *ts_ns = youngest_ts_ns;
+               } else if (msg_ts_ns == youngest_ts_ns) {
+                       /*
+                        * The currently selected message to be sent downstream
+                        * next has the exact same timestamp that of the
+                        * current candidate message. We must break the tie
+                        * in a predictable manner.
+                        */
+                       const bt_message *selected_msg = g_queue_peek_head(
+                               (*muxer_upstream_msg_iter)->msgs);
+                       BT_COMP_LOGD_STR("Two of the next message candidates have the same timestamps, pick one deterministically.");
+
+                       /*
+                        * Order the messages in an arbitrary but determinitic
+                        * way.
+                        */
+                       ret = common_muxing_compare_messages(msg, selected_msg);
+                       if (ret < 0) {
+                               /*
+                                * The `msg` should go first. Update the next
+                                * iterator and the current timestamp.
+                                */
+                               *muxer_upstream_msg_iter =
+                                       cur_muxer_upstream_msg_iter;
+                               youngest_ts_ns = msg_ts_ns;
+                               *ts_ns = youngest_ts_ns;
+                       } else if (ret == 0) {
+                               /* Unable to pick which one should go first. */
+                               BT_COMP_LOGW("Cannot deterministically pick next upstream message iterator because they have identical next messages: "
+                                       "muxer-upstream-msg-iter-wrap-addr=%p"
+                                       "cur-muxer-upstream-msg-iter-wrap-addr=%p",
+                                       *muxer_upstream_msg_iter,
+                                       cur_muxer_upstream_msg_iter);
+                       }
                }
        }
 
@@ -1231,7 +1270,7 @@ int muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp,
                }
 
                upstream_msg_iter = create_msg_iter_on_input_port(muxer_comp,
-                       self_port);
+                       muxer_msg_iter, self_port);
                if (!upstream_msg_iter) {
                        /* create_msg_iter_on_input_port() logs errors */
                        BT_ASSERT(!upstream_msg_iter);
@@ -1292,6 +1331,7 @@ bt_component_class_message_iterator_init_method_status muxer_msg_iter_init(
        }
 
        muxer_msg_iter->muxer_comp = muxer_comp;
+       muxer_msg_iter->self_msg_iter = self_msg_iter;
        muxer_msg_iter->last_returned_ts_ns = INT64_MIN;
        muxer_msg_iter->active_muxer_upstream_msg_iters =
                g_ptr_array_new_with_free_func(
This page took 0.045139 seconds and 4 git commands to generate.