Fix: flt.utils.muxer: Potential memory leak
[babeltrace.git] / plugins / utils / muxer / muxer.c
index cc6dc4602d649092eb19a75890346c52da23c3ca..47d790b3d9a14e59387e7d611c984077680d608e 100644 (file)
 #include <stdlib.h>
 #include <string.h>
 
+#include "muxer.h"
+
 #define ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME       "assume-absolute-clock-classes"
 
 struct muxer_comp {
-       /*
-        * Array of struct
-        * bt_self_message_iterator *
-        * (weak refs)
-        */
-       GPtrArray *muxer_msg_iters;
-
        /* Weak ref */
        bt_self_component_filter *self_comp;
 
@@ -68,6 +63,7 @@ struct muxer_upstream_msg_iter {
 
 enum muxer_msg_iter_clock_class_expectation {
        MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY = 0,
+       MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE,
        MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE,
        MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID,
        MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID,
@@ -82,18 +78,16 @@ struct muxer_msg_iter {
         * another data structure is faster than this for our typical
         * use cases.
         */
-       GPtrArray *muxer_upstream_msg_iters;
+       GPtrArray *active_muxer_upstream_msg_iters;
 
        /*
-        * List of "recently" connected input ports (weak) to
-        * handle by this muxer message iterator.
-        * muxer_port_connected() adds entries to this list, and the
-        * entries are removed when a message iterator is created
-        * on the port's connection and put into
-        * muxer_upstream_msg_iters above by
-        * muxer_msg_iter_handle_newly_connected_ports().
+        * Array of struct muxer_upstream_msg_iter * (owned by this).
+        *
+        * We move ended message iterators from
+        * `active_muxer_upstream_msg_iters` to this array so as to be
+        * able to restore them when seeking.
         */
-       GList *newly_connected_self_ports;
+       GPtrArray *ended_muxer_upstream_msg_iters;
 
        /* Last time returned in a message */
        int64_t last_returned_ts_ns;
@@ -109,6 +103,16 @@ struct muxer_msg_iter {
        unsigned char expected_clock_class_uuid[BABELTRACE_UUID_LEN];
 };
 
+static
+void empty_message_queue(struct muxer_upstream_msg_iter *upstream_msg_iter)
+{
+       const bt_message *msg;
+
+       while ((msg = g_queue_pop_head(upstream_msg_iter->msgs))) {
+               bt_message_put_ref(msg);
+       }
+}
+
 static
 void destroy_muxer_upstream_msg_iter(
                struct muxer_upstream_msg_iter *muxer_upstream_msg_iter)
@@ -122,16 +126,11 @@ void destroy_muxer_upstream_msg_iter(
                muxer_upstream_msg_iter,
                muxer_upstream_msg_iter->msg_iter,
                muxer_upstream_msg_iter->msgs->length);
-       bt_self_component_port_input_message_iterator_put_ref(muxer_upstream_msg_iter->msg_iter);
+       bt_self_component_port_input_message_iterator_put_ref(
+               muxer_upstream_msg_iter->msg_iter);
 
        if (muxer_upstream_msg_iter->msgs) {
-               const bt_message *msg;
-
-               while ((msg = g_queue_pop_head(
-                               muxer_upstream_msg_iter->msgs))) {
-                       bt_message_put_ref(msg);
-               }
-
+               empty_message_queue(muxer_upstream_msg_iter);
                g_queue_free(muxer_upstream_msg_iter->msgs);
        }
 
@@ -148,7 +147,7 @@ struct muxer_upstream_msg_iter *muxer_msg_iter_add_upstream_msg_iter(
 
        if (!muxer_upstream_msg_iter) {
                BT_LOGE_STR("Failed to allocate one muxer's upstream message iterator wrapper.");
-               goto end;
+               goto error;
        }
 
        muxer_upstream_msg_iter->msg_iter = self_msg_iter;
@@ -156,22 +155,28 @@ struct muxer_upstream_msg_iter *muxer_msg_iter_add_upstream_msg_iter(
        muxer_upstream_msg_iter->msgs = g_queue_new();
        if (!muxer_upstream_msg_iter->msgs) {
                BT_LOGE_STR("Failed to allocate a GQueue.");
-               goto end;
+               goto error;
        }
 
-       g_ptr_array_add(muxer_msg_iter->muxer_upstream_msg_iters,
+       g_ptr_array_add(muxer_msg_iter->active_muxer_upstream_msg_iters,
                muxer_upstream_msg_iter);
        BT_LOGD("Added muxer's upstream message iterator wrapper: "
                "addr=%p, muxer-msg-iter-addr=%p, msg-iter-addr=%p",
                muxer_upstream_msg_iter, muxer_msg_iter,
                self_msg_iter);
 
+       goto end;
+
+error:
+       g_free(muxer_upstream_msg_iter);
+       muxer_upstream_msg_iter = NULL;
+
 end:
        return muxer_upstream_msg_iter;
 }
 
 static
-bt_self_component_status ensure_available_input_port(
+bt_self_component_status add_available_input_port(
                bt_self_component_filter *self_comp)
 {
        struct muxer_comp *muxer_comp = bt_self_component_get_data(
@@ -180,11 +185,6 @@ bt_self_component_status ensure_available_input_port(
        GString *port_name = NULL;
 
        BT_ASSERT(muxer_comp);
-
-       if (muxer_comp->available_input_ports >= 1) {
-               goto end;
-       }
-
        port_name = g_string_new("in");
        if (!port_name) {
                BT_LOGE_STR("Failed to allocate a GString.");
@@ -208,6 +208,7 @@ bt_self_component_status ensure_available_input_port(
        BT_LOGD("Added one input port to muxer component: "
                "port-name=\"%s\", comp-addr=%p",
                port_name->str, self_comp);
+
 end:
        if (port_name) {
                g_string_free(port_name, TRUE);
@@ -231,15 +232,6 @@ void destroy_muxer_comp(struct muxer_comp *muxer_comp)
                return;
        }
 
-       BT_LOGD("Destroying muxer component: muxer-comp-addr=%p, "
-               "muxer-msg-iter-count=%u", muxer_comp,
-               muxer_comp->muxer_msg_iters ?
-                       muxer_comp->muxer_msg_iters->len : 0);
-
-       if (muxer_comp->muxer_msg_iters) {
-               g_ptr_array_free(muxer_comp->muxer_msg_iters, TRUE);
-       }
-
        g_free(muxer_comp);
 }
 
@@ -328,7 +320,7 @@ end:
 BT_HIDDEN
 bt_self_component_status muxer_init(
                bt_self_component_filter *self_comp,
-               bt_value *params, void *init_data)
+               const bt_value *params, void *init_data)
 {
        int ret;
        bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
@@ -350,17 +342,11 @@ bt_self_component_status muxer_init(
                goto error;
        }
 
-       muxer_comp->muxer_msg_iters = g_ptr_array_new();
-       if (!muxer_comp->muxer_msg_iters) {
-               BT_LOGE_STR("Failed to allocate a GPtrArray.");
-               goto error;
-       }
-
        muxer_comp->self_comp = self_comp;
        bt_self_component_set_data(
                bt_self_component_filter_as_self_component(self_comp),
                muxer_comp);
-       status = ensure_available_input_port(self_comp);
+       status = add_available_input_port(self_comp);
        if (status != BT_SELF_COMPONENT_STATUS_OK) {
                BT_LOGE("Cannot ensure that at least one muxer component's input port is available: "
                        "muxer-comp-addr=%p, status=%s",
@@ -411,8 +397,7 @@ void muxer_finalize(bt_self_component_filter *self_comp)
 
 static
 bt_self_component_port_input_message_iterator *
-create_msg_iter_on_input_port(
-               bt_self_component_port_input *self_port, int *ret)
+create_msg_iter_on_input_port(bt_self_component_port_input *self_port)
 {
        const bt_port *port = bt_self_component_port_as_port(
                bt_self_component_port_input_as_self_component_port(
@@ -420,8 +405,6 @@ create_msg_iter_on_input_port(
        bt_self_component_port_input_message_iterator *msg_iter =
                NULL;
 
-       BT_ASSERT(ret);
-       *ret = 0;
        BT_ASSERT(port);
        BT_ASSERT(bt_port_is_connected(port));
 
@@ -434,7 +417,6 @@ create_msg_iter_on_input_port(
                BT_LOGE("Cannot create upstream message iterator on input port: "
                        "port-addr=%p, port-name=\"%s\"",
                        port, bt_port_get_name(port));
-               *ret = -1;
                goto end;
        }
 
@@ -447,10 +429,12 @@ end:
 }
 
 static
-bt_message_iterator_status muxer_upstream_msg_iter_next(
-               struct muxer_upstream_msg_iter *muxer_upstream_msg_iter)
+bt_self_message_iterator_status muxer_upstream_msg_iter_next(
+               struct muxer_upstream_msg_iter *muxer_upstream_msg_iter,
+               bool *is_ended)
 {
-       bt_message_iterator_status status;
+       bt_self_message_iterator_status status;
+       bt_message_iterator_status input_port_iter_status;
        bt_message_array_const msgs;
        uint64_t i;
        uint64_t count;
@@ -459,12 +443,12 @@ bt_message_iterator_status muxer_upstream_msg_iter_next(
                "muxer-upstream-msg-iter-wrap-addr=%p, msg-iter-addr=%p",
                muxer_upstream_msg_iter,
                muxer_upstream_msg_iter->msg_iter);
-       status = bt_self_component_port_input_message_iterator_next(
+       input_port_iter_status = bt_self_component_port_input_message_iterator_next(
                muxer_upstream_msg_iter->msg_iter, &msgs, &count);
        BT_LOGV("Upstream message iterator's \"next\" method returned: "
-               "status=%s", bt_message_iterator_status_string(status));
+               "status=%s", bt_message_iterator_status_string(input_port_iter_status));
 
-       switch (status) {
+       switch (input_port_iter_status) {
        case BT_MESSAGE_ITERATOR_STATUS_OK:
                /*
                 * Message iterator's current message is
@@ -483,6 +467,7 @@ bt_message_iterator_status muxer_upstream_msg_iter_next(
                        g_queue_push_tail(muxer_upstream_msg_iter->msgs,
                                (void *) msgs[i]);
                }
+               status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
                break;
        case BT_MESSAGE_ITERATOR_STATUS_AGAIN:
                /*
@@ -490,145 +475,106 @@ bt_message_iterator_status muxer_upstream_msg_iter_next(
                 * valid anymore. Return
                 * BT_MESSAGE_ITERATOR_STATUS_AGAIN immediately.
                 */
+               status = BT_SELF_MESSAGE_ITERATOR_STATUS_AGAIN;
                break;
        case BT_MESSAGE_ITERATOR_STATUS_END:    /* Fall-through. */
-       case BT_MESSAGE_ITERATOR_STATUS_CANCELED:
                /*
                 * Message iterator reached the end: release it. It
                 * won't be considered again to find the youngest
                 * message.
                 */
-               BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(muxer_upstream_msg_iter->msg_iter);
-               status = BT_MESSAGE_ITERATOR_STATUS_OK;
+               *is_ended = true;
+               status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
                break;
        default:
                /* Error or unsupported status code */
                BT_LOGE("Error or unsupported status code: "
-                       "status-code=%d", status);
-               status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
+                       "status-code=%d", input_port_iter_status);
+               status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
                break;
        }
 
        return status;
 }
 
-static
-int muxer_msg_iter_handle_newly_connected_ports(
-               struct muxer_msg_iter *muxer_msg_iter)
-{
-       int ret = 0;
-
-       BT_LOGV("Handling newly connected ports: "
-               "muxer-msg-iter-addr=%p", muxer_msg_iter);
-
-       /*
-        * Here we create one upstream message iterator for each
-        * newly connected port. We do NOT perform an initial "next" on
-        * those new upstream message iterators: they are
-        * invalidated, to be validated later. The list of newly
-        * connected ports to handle here is updated by
-        * muxer_port_connected().
-        */
-       while (true) {
-               GList *node = muxer_msg_iter->newly_connected_self_ports;
-               bt_self_component_port_input *self_port;
-               const bt_port *port;
-               bt_self_component_port_input_message_iterator *
-                       upstream_msg_iter = NULL;
-               struct muxer_upstream_msg_iter *muxer_upstream_msg_iter;
-
-               if (!node) {
-                       break;
-               }
-
-               self_port = node->data;
-               port = bt_self_component_port_as_port(
-                       bt_self_component_port_input_as_self_component_port(
-                               (self_port)));
-               BT_ASSERT(port);
-
-               if (!bt_port_is_connected(port)) {
-                       /*
-                        * Looks like this port is not connected
-                        * anymore: we can't create an upstream
-                        * message iterator on its (non-existing)
-                        * connection in this case.
-                        */
-                       goto remove_node;
-               }
-
-               upstream_msg_iter = create_msg_iter_on_input_port(
-                       self_port, &ret);
-               if (ret) {
-                       /* create_msg_iter_on_input_port() logs errors */
-                       BT_ASSERT(!upstream_msg_iter);
-                       goto error;
-               }
-
-               muxer_upstream_msg_iter =
-                       muxer_msg_iter_add_upstream_msg_iter(
-                               muxer_msg_iter, upstream_msg_iter);
-               BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(upstream_msg_iter);
-               if (!muxer_upstream_msg_iter) {
-                       /*
-                        * muxer_msg_iter_add_upstream_msg_iter()
-                        * logs errors.
-                        */
-                       goto error;
-               }
-
-remove_node:
-               bt_self_component_port_input_message_iterator_put_ref(upstream_msg_iter);
-               muxer_msg_iter->newly_connected_self_ports =
-                       g_list_delete_link(
-                               muxer_msg_iter->newly_connected_self_ports,
-                               node);
-       }
-
-       goto end;
-
-error:
-       if (ret >= 0) {
-               ret = -1;
-       }
-
-end:
-       return ret;
-}
-
 static
 int get_msg_ts_ns(struct muxer_comp *muxer_comp,
                struct muxer_msg_iter *muxer_msg_iter,
                const bt_message *msg, int64_t last_returned_ts_ns,
                int64_t *ts_ns)
 {
-       const bt_clock_class *clock_class = NULL;
        const bt_clock_snapshot *clock_snapshot = NULL;
-       const bt_event *event = NULL;
        int ret = 0;
-       const unsigned char *cc_uuid;
-       const char *cc_name;
        bt_clock_snapshot_state cs_state = BT_CLOCK_SNAPSHOT_STATE_KNOWN;
+       bt_message_stream_activity_clock_snapshot_state sa_cs_state;
 
        BT_ASSERT(msg);
        BT_ASSERT(ts_ns);
-
        BT_LOGV("Getting message's timestamp: "
                "muxer-msg-iter-addr=%p, msg-addr=%p, "
                "last-returned-ts=%" PRId64,
                muxer_msg_iter, msg, last_returned_ts_ns);
 
+       if (unlikely(muxer_msg_iter->clock_class_expectation ==
+                       MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE)) {
+               *ts_ns = last_returned_ts_ns;
+               goto end;
+       }
+
        switch (bt_message_get_type(msg)) {
        case BT_MESSAGE_TYPE_EVENT:
-               event = bt_message_event_borrow_event_const(msg);
-               BT_ASSERT(event);
-               cs_state = bt_event_borrow_default_clock_snapshot_const(event,
-                       &clock_snapshot);
+               BT_ASSERT(bt_message_event_borrow_stream_class_default_clock_class_const(
+                               msg));
+               cs_state = bt_message_event_borrow_default_clock_snapshot_const(
+                       msg, &clock_snapshot);
+               break;
+       case BT_MESSAGE_TYPE_PACKET_BEGINNING:
+               BT_ASSERT(bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
+                               msg));
+               cs_state = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
+                       msg, &clock_snapshot);
+               break;
+       case BT_MESSAGE_TYPE_PACKET_END:
+               BT_ASSERT(bt_message_packet_end_borrow_stream_class_default_clock_class_const(
+                               msg));
+               cs_state = bt_message_packet_end_borrow_default_clock_snapshot_const(
+                       msg, &clock_snapshot);
+               break;
+       case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
+               BT_ASSERT(bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
+                               msg));
+               cs_state = bt_message_discarded_events_borrow_default_beginning_clock_snapshot_const(
+                       msg, &clock_snapshot);
                break;
+       case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
+               BT_ASSERT(bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
+                               msg));
+               cs_state = bt_message_discarded_packets_borrow_default_beginning_clock_snapshot_const(
+                       msg, &clock_snapshot);
+               break;
+       case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
+               BT_ASSERT(bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const(
+                               msg));
+               sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
+                       msg, &clock_snapshot);
+               if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
+                       goto no_clock_snapshot;
+               }
+
+               break;
+       case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
+               BT_ASSERT(bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const(
+                               msg));
+               sa_cs_state = bt_message_stream_activity_end_borrow_default_clock_snapshot_const(
+                       msg, &clock_snapshot);
+               if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
+                       goto no_clock_snapshot;
+               }
 
-       case BT_MESSAGE_TYPE_INACTIVITY:
+               break;
+       case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
                cs_state =
-                       bt_message_inactivity_borrow_default_clock_snapshot_const(
+                       bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
                                msg, &clock_snapshot);
                break;
        default:
@@ -638,25 +584,46 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp,
                goto end;
        }
 
-       if (cs_state != BT_CLOCK_SNAPSHOT_STATE_KNOWN) {
-               BT_LOGE_STR("Unsupported unknown clock snapshot.");
-               ret = -1;
-               goto end;
+       BT_ASSERT(cs_state == BT_CLOCK_SNAPSHOT_STATE_KNOWN);
+       ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
+       if (ret) {
+               BT_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: "
+                       "clock-snapshot-addr=%p", clock_snapshot);
+               goto error;
        }
 
-       /*
-        * If the clock snapshot is missing, then we consider that this
-        * message has no time. In this case it's always the
-        * youngest.
-        */
-       if (!clock_snapshot) {
-               BT_LOGV_STR("Message's default clock snapshot is missing: "
-                       "using the last returned timestamp.");
-               *ts_ns = last_returned_ts_ns;
-               goto end;
+       goto end;
+
+no_clock_snapshot:
+       BT_LOGV_STR("Message's default clock snapshot is missing: "
+               "using the last returned timestamp.");
+       *ts_ns = last_returned_ts_ns;
+       goto end;
+
+error:
+       ret = -1;
+
+end:
+       if (ret == 0) {
+               BT_LOGV("Found message's timestamp: "
+                       "muxer-msg-iter-addr=%p, msg-addr=%p, "
+                       "last-returned-ts=%" PRId64 ", ts=%" PRId64,
+                       muxer_msg_iter, msg, last_returned_ts_ns,
+                       *ts_ns);
        }
 
-       clock_class = bt_clock_snapshot_borrow_clock_class_const(clock_snapshot);
+       return ret;
+}
+
+static inline
+int validate_clock_class(struct muxer_msg_iter *muxer_msg_iter,
+               struct muxer_comp *muxer_comp,
+               const bt_clock_class *clock_class)
+{
+       int ret = 0;
+       const unsigned char *cc_uuid;
+       const char *cc_name;
+
        BT_ASSERT(clock_class);
        cc_uuid = bt_clock_class_get_uuid(clock_class);
        cc_name = bt_clock_class_get_name(clock_class);
@@ -670,7 +637,7 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp,
                 * the iterator without a true
                 * `assume-absolute-clock-classes` parameter.
                 */
-               if (bt_clock_class_is_absolute(clock_class)) {
+               if (bt_clock_class_origin_is_unix_epoch(clock_class)) {
                        /* Expect absolute clock classes */
                        muxer_msg_iter->clock_class_expectation =
                                MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE;
@@ -698,7 +665,7 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp,
        if (!muxer_comp->assume_absolute_clock_classes) {
                switch (muxer_msg_iter->clock_class_expectation) {
                case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE:
-                       if (!bt_clock_class_is_absolute(clock_class)) {
+                       if (!bt_clock_class_origin_is_unix_epoch(clock_class)) {
                                BT_LOGE("Expecting an absolute clock class, "
                                        "but got a non-absolute one: "
                                        "clock-class-addr=%p, clock-class-name=\"%s\"",
@@ -707,7 +674,7 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp,
                        }
                        break;
                case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID:
-                       if (bt_clock_class_is_absolute(clock_class)) {
+                       if (bt_clock_class_origin_is_unix_epoch(clock_class)) {
                                BT_LOGE("Expecting a non-absolute clock class with no UUID, "
                                        "but got an absolute one: "
                                        "clock-class-addr=%p, clock-class-name=\"%s\"",
@@ -741,7 +708,7 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp,
                        }
                        break;
                case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID:
-                       if (bt_clock_class_is_absolute(clock_class)) {
+                       if (bt_clock_class_origin_is_unix_epoch(clock_class)) {
                                BT_LOGE("Expecting a non-absolute clock class with a specific UUID, "
                                        "but got an absolute one: "
                                        "clock-class-addr=%p, clock-class-name=\"%s\"",
@@ -800,6 +767,11 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp,
                                goto error;
                        }
                        break;
+               case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE:
+                       BT_LOGE("Expecting no clock class, but got one: "
+                               "clock-class-addr=%p, clock-class-name=\"%s\"",
+                               clock_class, cc_name);
+                       goto error;
                default:
                        /* Unexpected */
                        BT_LOGF("Unexpected clock class expectation: "
@@ -809,27 +781,56 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp,
                }
        }
 
-       ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
-       if (ret) {
-               BT_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: "
-                       "clock-snapshot-addr=%p", clock_snapshot);
-               goto error;
-       }
-
        goto end;
 
 error:
        ret = -1;
 
 end:
-       if (ret == 0) {
-               BT_LOGV("Found message's timestamp: "
-                       "muxer-msg-iter-addr=%p, msg-addr=%p, "
-                       "last-returned-ts=%" PRId64 ", ts=%" PRId64,
-                       muxer_msg_iter, msg, last_returned_ts_ns,
-                       *ts_ns);
+       return ret;
+}
+
+static inline
+int validate_new_stream_clock_class(struct muxer_msg_iter *muxer_msg_iter,
+               struct muxer_comp *muxer_comp, const bt_stream *stream)
+{
+       int ret = 0;
+       const bt_stream_class *stream_class =
+               bt_stream_borrow_class_const(stream);
+       const bt_clock_class *clock_class =
+               bt_stream_class_borrow_default_clock_class_const(stream_class);
+
+       if (!clock_class) {
+               if (muxer_msg_iter->clock_class_expectation ==
+                       MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY) {
+                       /* Expect no clock class */
+                       muxer_msg_iter->clock_class_expectation =
+                               MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE;
+               } else {
+                       BT_LOGE("Expecting stream class with a default clock class: "
+                               "stream-class-addr=%p, stream-class-name=\"%s\", "
+                               "stream-class-id=%" PRIu64,
+                               stream_class, bt_stream_class_get_name(stream_class),
+                               bt_stream_class_get_id(stream_class));
+                       ret = -1;
+               }
+
+               goto end;
+       }
+
+       if (!bt_stream_class_default_clock_is_always_known(stream_class)) {
+               BT_LOGE("Stream's default clock is not always known: "
+                       "stream-class-addr=%p, stream-class-name=\"%s\", "
+                       "stream-class-id=%" PRIu64,
+                       stream_class, bt_stream_class_get_name(stream_class),
+                       bt_stream_class_get_id(stream_class));
+               ret = -1;
+               goto end;
        }
 
+       ret = validate_clock_class(muxer_msg_iter, muxer_comp, clock_class);
+
+end:
        return ret;
 }
 
@@ -843,7 +844,6 @@ end:
  * This function does NOT:
  *
  * * Update any upstream message iterator.
- * * Check for newly connected ports.
  * * Check the upstream message iterators to retry.
  *
  * On sucess, this function sets *muxer_upstream_msg_iter to the
@@ -851,7 +851,7 @@ end:
  * the youngest, and sets *ts_ns to its time.
  */
 static
-bt_message_iterator_status
+bt_self_message_iterator_status
 muxer_msg_iter_youngest_upstream_msg_iter(
                struct muxer_comp *muxer_comp,
                struct muxer_msg_iter *muxer_msg_iter,
@@ -861,18 +861,21 @@ muxer_msg_iter_youngest_upstream_msg_iter(
        size_t i;
        int ret;
        int64_t youngest_ts_ns = INT64_MAX;
-       bt_message_iterator_status status =
-               BT_MESSAGE_ITERATOR_STATUS_OK;
+       bt_self_message_iterator_status status =
+               BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
 
        BT_ASSERT(muxer_comp);
        BT_ASSERT(muxer_msg_iter);
        BT_ASSERT(muxer_upstream_msg_iter);
        *muxer_upstream_msg_iter = NULL;
 
-       for (i = 0; i < muxer_msg_iter->muxer_upstream_msg_iters->len; i++) {
+       for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len;
+                       i++) {
                const bt_message *msg;
                struct muxer_upstream_msg_iter *cur_muxer_upstream_msg_iter =
-                       g_ptr_array_index(muxer_msg_iter->muxer_upstream_msg_iters, i);
+                       g_ptr_array_index(
+                               muxer_msg_iter->active_muxer_upstream_msg_iters,
+                               i);
                int64_t msg_ts_ns;
 
                if (!cur_muxer_upstream_msg_iter->msg_iter) {
@@ -886,12 +889,53 @@ muxer_msg_iter_youngest_upstream_msg_iter(
                BT_ASSERT(cur_muxer_upstream_msg_iter->msgs->length > 0);
                msg = g_queue_peek_head(cur_muxer_upstream_msg_iter->msgs);
                BT_ASSERT(msg);
+
+               if (unlikely(bt_message_get_type(msg) ==
+                               BT_MESSAGE_TYPE_STREAM_BEGINNING)) {
+                       ret = validate_new_stream_clock_class(
+                               muxer_msg_iter, muxer_comp,
+                               bt_message_stream_beginning_borrow_stream_const(
+                                       msg));
+                       if (ret) {
+                               /*
+                                * validate_new_stream_clock_class() logs
+                                * errors.
+                                */
+                               status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+                               goto end;
+                       }
+               } else if (unlikely(bt_message_get_type(msg) ==
+                               BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY)) {
+                       const bt_clock_snapshot *cs;
+                       bt_clock_snapshot_state cs_state;
+
+                       cs_state = bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
+                               msg, &cs);
+
+                       if (cs_state != BT_CLOCK_SNAPSHOT_STATE_KNOWN) {
+                               BT_LOGE("Message iterator inactivity message's "
+                                       "default clock snapshot is unknown: "
+                                       "msg-addr=%p",
+                                       msg);
+                               status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+                               goto end;
+                       }
+
+                       ret = validate_clock_class(muxer_msg_iter, muxer_comp,
+                               bt_clock_snapshot_borrow_clock_class_const(cs));
+                       if (ret) {
+                               /* validate_clock_class() logs errors */
+                               status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+                               goto end;
+                       }
+               }
+
                ret = get_msg_ts_ns(muxer_comp, muxer_msg_iter, msg,
                        muxer_msg_iter->last_returned_ts_ns, &msg_ts_ns);
                if (ret) {
                        /* get_msg_ts_ns() logs errors */
                        *muxer_upstream_msg_iter = NULL;
-                       status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
+                       status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
                        goto end;
                }
 
@@ -904,7 +948,7 @@ muxer_msg_iter_youngest_upstream_msg_iter(
        }
 
        if (!*muxer_upstream_msg_iter) {
-               status = BT_MESSAGE_ITERATOR_STATUS_END;
+               status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
                *ts_ns = INT64_MIN;
        }
 
@@ -913,11 +957,12 @@ end:
 }
 
 static
-bt_message_iterator_status validate_muxer_upstream_msg_iter(
-       struct muxer_upstream_msg_iter *muxer_upstream_msg_iter)
+bt_self_message_iterator_status validate_muxer_upstream_msg_iter(
+       struct muxer_upstream_msg_iter *muxer_upstream_msg_iter,
+       bool *is_ended)
 {
-       bt_message_iterator_status status =
-               BT_MESSAGE_ITERATOR_STATUS_OK;
+       bt_self_message_iterator_status status =
+               BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
 
        BT_LOGV("Validating muxer's upstream message iterator wrapper: "
                "muxer-upstream-msg-iter-wrap-addr=%p",
@@ -933,32 +978,35 @@ bt_message_iterator_status validate_muxer_upstream_msg_iter(
        }
 
        /* muxer_upstream_msg_iter_next() logs details/errors */
-       status = muxer_upstream_msg_iter_next(muxer_upstream_msg_iter);
+       status = muxer_upstream_msg_iter_next(muxer_upstream_msg_iter,
+               is_ended);
 
 end:
        return status;
 }
 
 static
-bt_message_iterator_status validate_muxer_upstream_msg_iters(
+bt_self_message_iterator_status validate_muxer_upstream_msg_iters(
                struct muxer_msg_iter *muxer_msg_iter)
 {
-       bt_message_iterator_status status =
-               BT_MESSAGE_ITERATOR_STATUS_OK;
+       bt_self_message_iterator_status status =
+               BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
        size_t i;
 
        BT_LOGV("Validating muxer's upstream message iterator wrappers: "
                "muxer-msg-iter-addr=%p", muxer_msg_iter);
 
-       for (i = 0; i < muxer_msg_iter->muxer_upstream_msg_iters->len; i++) {
+       for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len;
+                       i++) {
+               bool is_ended = false;
                struct muxer_upstream_msg_iter *muxer_upstream_msg_iter =
                        g_ptr_array_index(
-                               muxer_msg_iter->muxer_upstream_msg_iters,
+                               muxer_msg_iter->active_muxer_upstream_msg_iters,
                                i);
 
                status = validate_muxer_upstream_msg_iter(
-                       muxer_upstream_msg_iter);
-               if (status != BT_MESSAGE_ITERATOR_STATUS_OK) {
+                       muxer_upstream_msg_iter, &is_ended);
+               if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
                        if (status < 0) {
                                BT_LOGE("Cannot validate muxer's upstream message iterator wrapper: "
                                        "muxer-msg-iter-addr=%p, "
@@ -977,20 +1025,25 @@ bt_message_iterator_status validate_muxer_upstream_msg_iters(
                }
 
                /*
-                * Remove this muxer upstream message iterator
-                * if it's ended or canceled.
+                * Move this muxer upstream message iterator to the
+                * array of ended iterators if it's ended.
                 */
-               if (!muxer_upstream_msg_iter->msg_iter) {
+               if (unlikely(is_ended)) {
+                       BT_LOGV("Muxer's upstream message iterator wrapper: ended or canceled: "
+                               "muxer-msg-iter-addr=%p, "
+                               "muxer-upstream-msg-iter-wrap-addr=%p",
+                               muxer_msg_iter, muxer_upstream_msg_iter);
+                       g_ptr_array_add(
+                               muxer_msg_iter->ended_muxer_upstream_msg_iters,
+                               muxer_upstream_msg_iter);
+                       muxer_msg_iter->active_muxer_upstream_msg_iters->pdata[i] = NULL;
+
                        /*
                         * Use g_ptr_array_remove_fast() because the
                         * order of those elements is not important.
                         */
-                       BT_LOGV("Removing muxer's upstream message iterator wrapper: ended or canceled: "
-                               "muxer-msg-iter-addr=%p, "
-                               "muxer-upstream-msg-iter-wrap-addr=%p",
-                               muxer_msg_iter, muxer_upstream_msg_iter);
                        g_ptr_array_remove_index_fast(
-                               muxer_msg_iter->muxer_upstream_msg_iters,
+                               muxer_msg_iter->active_muxer_upstream_msg_iters,
                                i);
                        i--;
                }
@@ -1001,52 +1054,21 @@ end:
 }
 
 static inline
-bt_message_iterator_status muxer_msg_iter_do_next_one(
+bt_self_message_iterator_status muxer_msg_iter_do_next_one(
                struct muxer_comp *muxer_comp,
                struct muxer_msg_iter *muxer_msg_iter,
                const bt_message **msg)
 {
-       bt_message_iterator_status status =
-               BT_MESSAGE_ITERATOR_STATUS_OK;
+       bt_self_message_iterator_status status;
        struct muxer_upstream_msg_iter *muxer_upstream_msg_iter = NULL;
        int64_t next_return_ts;
 
-       while (true) {
-               int ret = muxer_msg_iter_handle_newly_connected_ports(
-                       muxer_msg_iter);
-
-               if (ret) {
-                       BT_LOGE("Cannot handle newly connected input ports for muxer's message iterator: "
-                               "muxer-comp-addr=%p, muxer-msg-iter-addr=%p, "
-                               "ret=%d",
-                               muxer_comp, muxer_msg_iter, ret);
-                       status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
-                       goto end;
-               }
-
-               status = validate_muxer_upstream_msg_iters(muxer_msg_iter);
-               if (status != BT_MESSAGE_ITERATOR_STATUS_OK) {
-                       /* validate_muxer_upstream_msg_iters() logs details */
-                       goto end;
-               }
-
-               /*
-                * At this point, we know that all the existing upstream
-                * message iterators are valid. However the
-                * operations to validate them (during
-                * validate_muxer_upstream_msg_iters()) may have
-                * connected new ports. If no ports were connected
-                * during this operation, exit the loop.
-                */
-               if (!muxer_msg_iter->newly_connected_self_ports) {
-                       BT_LOGV("Not breaking this loop: muxer's message iterator still has newly connected input ports to handle: "
-                               "muxer-comp-addr=%p", muxer_comp);
-                       break;
-               }
+       status = validate_muxer_upstream_msg_iters(muxer_msg_iter);
+       if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
+               /* validate_muxer_upstream_msg_iters() logs details */
+               goto end;
        }
 
-       BT_ASSERT(!muxer_msg_iter->newly_connected_self_ports);
-
        /*
         * At this point we know that all the existing upstream
         * message iterators are valid. We can find the one,
@@ -1056,16 +1078,15 @@ bt_message_iterator_status muxer_msg_iter_do_next_one(
        status = muxer_msg_iter_youngest_upstream_msg_iter(muxer_comp,
                        muxer_msg_iter, &muxer_upstream_msg_iter,
                        &next_return_ts);
-       if (status < 0 || status == BT_MESSAGE_ITERATOR_STATUS_END ||
-                       status == BT_MESSAGE_ITERATOR_STATUS_CANCELED) {
+       if (status < 0 || status == BT_SELF_MESSAGE_ITERATOR_STATUS_END) {
                if (status < 0) {
                        BT_LOGE("Cannot find the youngest upstream message iterator wrapper: "
                                "status=%s",
-                               bt_message_iterator_status_string(status));
+                               bt_common_self_message_iterator_status_string(status));
                } else {
                        BT_LOGV("Cannot find the youngest upstream message iterator wrapper: "
                                "status=%s",
-                               bt_message_iterator_status_string(status));
+                               bt_common_self_message_iterator_status_string(status));
                }
 
                goto end;
@@ -1077,7 +1098,7 @@ bt_message_iterator_status muxer_msg_iter_do_next_one(
                        "last-returned-ts=%" PRId64,
                        muxer_msg_iter, next_return_ts,
                        muxer_msg_iter->last_returned_ts_ns);
-               status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
+               status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
                goto end;
        }
 
@@ -1086,7 +1107,7 @@ bt_message_iterator_status muxer_msg_iter_do_next_one(
                "muxer-upstream-msg-iter-wrap-addr=%p, "
                "ts=%" PRId64,
                muxer_msg_iter, muxer_upstream_msg_iter, next_return_ts);
-       BT_ASSERT(status == BT_MESSAGE_ITERATOR_STATUS_OK);
+       BT_ASSERT(status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK);
        BT_ASSERT(muxer_upstream_msg_iter);
 
        /*
@@ -1102,20 +1123,20 @@ end:
 }
 
 static
-bt_message_iterator_status muxer_msg_iter_do_next(
+bt_self_message_iterator_status muxer_msg_iter_do_next(
                struct muxer_comp *muxer_comp,
                struct muxer_msg_iter *muxer_msg_iter,
                bt_message_array_const msgs, uint64_t capacity,
                uint64_t *count)
 {
-       bt_message_iterator_status status =
-               BT_MESSAGE_ITERATOR_STATUS_OK;
+       bt_self_message_iterator_status status =
+               BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
        uint64_t i = 0;
 
-       while (i < capacity && status == BT_MESSAGE_ITERATOR_STATUS_OK) {
+       while (i < capacity && status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
                status = muxer_msg_iter_do_next_one(muxer_comp,
                        muxer_msg_iter, &msgs[i]);
-               if (status == BT_MESSAGE_ITERATOR_STATUS_OK) {
+               if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
                        i++;
                }
        }
@@ -1134,7 +1155,7 @@ bt_message_iterator_status muxer_msg_iter_do_next(
                 * message, in which case we'll return it.
                 */
                *count = i;
-               status = BT_MESSAGE_ITERATOR_STATUS_OK;
+               status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
        }
 
        return status;
@@ -1150,29 +1171,29 @@ void destroy_muxer_msg_iter(struct muxer_msg_iter *muxer_msg_iter)
        BT_LOGD("Destroying muxer component's message iterator: "
                "muxer-msg-iter-addr=%p", muxer_msg_iter);
 
-       if (muxer_msg_iter->muxer_upstream_msg_iters) {
-               BT_LOGD_STR("Destroying muxer's upstream message iterator wrappers.");
+       if (muxer_msg_iter->active_muxer_upstream_msg_iters) {
+               BT_LOGD_STR("Destroying muxer's active upstream message iterator wrappers.");
                g_ptr_array_free(
-                       muxer_msg_iter->muxer_upstream_msg_iters, TRUE);
+                       muxer_msg_iter->active_muxer_upstream_msg_iters, TRUE);
+       }
+
+       if (muxer_msg_iter->ended_muxer_upstream_msg_iters) {
+               BT_LOGD_STR("Destroying muxer's ended upstream message iterator wrappers.");
+               g_ptr_array_free(
+                       muxer_msg_iter->ended_muxer_upstream_msg_iters, TRUE);
        }
 
-       g_list_free(muxer_msg_iter->newly_connected_self_ports);
        g_free(muxer_msg_iter);
 }
 
 static
-int muxer_msg_iter_init_newly_connected_ports(struct muxer_comp *muxer_comp,
+int muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp,
                struct muxer_msg_iter *muxer_msg_iter)
 {
        int64_t count;
        int64_t i;
        int ret = 0;
 
-       /*
-        * Add the connected input ports to this muxer message
-        * iterator's list of newly connected ports. They will be
-        * handled by muxer_msg_iter_handle_newly_connected_ports().
-        */
        count = bt_component_filter_get_input_port_count(
                bt_self_component_filter_as_component_filter(
                        muxer_comp->self_comp));
@@ -1184,6 +1205,8 @@ int muxer_msg_iter_init_newly_connected_ports(struct muxer_comp *muxer_comp,
        }
 
        for (i = 0; i < count; i++) {
+               bt_self_component_port_input_message_iterator *upstream_msg_iter;
+               struct muxer_upstream_msg_iter *muxer_upstream_msg_iter;
                bt_self_component_port_input *self_port =
                        bt_self_component_filter_borrow_input_port_by_index(
                                muxer_comp->self_comp, i);
@@ -1196,29 +1219,28 @@ int muxer_msg_iter_init_newly_connected_ports(struct muxer_comp *muxer_comp,
                BT_ASSERT(port);
 
                if (!bt_port_is_connected(port)) {
-                       BT_LOGD("Skipping input port: not connected: "
-                               "muxer-comp-addr=%p, port-addr=%p, port-name\"%s\"",
-                               muxer_comp, port, bt_port_get_name(port));
+                       /* Skip non-connected port */
                        continue;
                }
 
-               muxer_msg_iter->newly_connected_self_ports =
-                       g_list_append(
-                               muxer_msg_iter->newly_connected_self_ports,
-                               self_port);
-               if (!muxer_msg_iter->newly_connected_self_ports) {
-                       BT_LOGE("Cannot append port to muxer's message iterator list of newly connected input ports: "
-                               "port-addr=%p, port-name=\"%s\", "
-                               "muxer-msg-iter-addr=%p", port,
-                               bt_port_get_name(port), muxer_msg_iter);
+               upstream_msg_iter = create_msg_iter_on_input_port(self_port);
+               if (!upstream_msg_iter) {
+                       /* create_msg_iter_on_input_port() logs errors */
+                       BT_ASSERT(!upstream_msg_iter);
                        ret = -1;
                        goto end;
                }
 
-               BT_LOGD("Appended port to muxer's message iterator list of newly connected input ports: "
-                       "port-addr=%p, port-name=\"%s\", "
-                       "muxer-msg-iter-addr=%p", port,
-                       bt_port_get_name(port), muxer_msg_iter);
+               muxer_upstream_msg_iter =
+                       muxer_msg_iter_add_upstream_msg_iter(
+                               muxer_msg_iter, upstream_msg_iter);
+               bt_self_component_port_input_message_iterator_put_ref(
+                       upstream_msg_iter);
+               if (!muxer_upstream_msg_iter) {
+                       /* muxer_msg_iter_add_upstream_msg_iter() logs errors */
+                       ret = -1;
+                       goto end;
+               }
        }
 
 end:
@@ -1234,7 +1256,7 @@ bt_self_message_iterator_status muxer_msg_iter_init(
        struct muxer_comp *muxer_comp = NULL;
        struct muxer_msg_iter *muxer_msg_iter = NULL;
        bt_self_message_iterator_status status =
-               BT_MESSAGE_ITERATOR_STATUS_OK;
+               BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
        int ret;
 
        muxer_comp = bt_self_component_get_data(
@@ -1264,27 +1286,26 @@ bt_self_message_iterator_status muxer_msg_iter_init(
        }
 
        muxer_msg_iter->last_returned_ts_ns = INT64_MIN;
-       muxer_msg_iter->muxer_upstream_msg_iters =
+       muxer_msg_iter->active_muxer_upstream_msg_iters =
                g_ptr_array_new_with_free_func(
                        (GDestroyNotify) destroy_muxer_upstream_msg_iter);
-       if (!muxer_msg_iter->muxer_upstream_msg_iters) {
+       if (!muxer_msg_iter->active_muxer_upstream_msg_iters) {
                BT_LOGE_STR("Failed to allocate a GPtrArray.");
                goto error;
        }
 
-       /*
-        * Add the muxer message iterator to the component's array
-        * of muxer message iterators here because
-        * muxer_msg_iter_init_newly_connected_ports() can cause
-        * muxer_port_connected() to be called, which adds the newly
-        * connected port to each muxer message iterator's list of
-        * newly connected ports.
-        */
-       g_ptr_array_add(muxer_comp->muxer_msg_iters, muxer_msg_iter);
-       ret = muxer_msg_iter_init_newly_connected_ports(muxer_comp,
+       muxer_msg_iter->ended_muxer_upstream_msg_iters =
+               g_ptr_array_new_with_free_func(
+                       (GDestroyNotify) destroy_muxer_upstream_msg_iter);
+       if (!muxer_msg_iter->ended_muxer_upstream_msg_iters) {
+               BT_LOGE_STR("Failed to allocate a GPtrArray.");
+               goto error;
+       }
+
+       ret = muxer_msg_iter_init_upstream_iterators(muxer_comp,
                muxer_msg_iter);
        if (ret) {
-               BT_LOGE("Cannot initialize newly connected input ports for muxer component's message iterator: "
+               BT_LOGE("Cannot initialize connected input ports for muxer component's message iterator: "
                        "comp-addr=%p, muxer-comp-addr=%p, "
                        "muxer-msg-iter-addr=%p, msg-iter-addr=%p, ret=%d",
                        self_comp, muxer_comp, muxer_msg_iter,
@@ -1292,8 +1313,7 @@ bt_self_message_iterator_status muxer_msg_iter_init(
                goto error;
        }
 
-       bt_self_message_iterator_set_data(self_msg_iter,
-               muxer_msg_iter);
+       bt_self_message_iterator_set_data(self_msg_iter, muxer_msg_iter);
        BT_LOGD("Initialized muxer component's message iterator: "
                "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, "
                "msg-iter-addr=%p",
@@ -1301,16 +1321,9 @@ bt_self_message_iterator_status muxer_msg_iter_init(
        goto end;
 
 error:
-       if (g_ptr_array_index(muxer_comp->muxer_msg_iters,
-                       muxer_comp->muxer_msg_iters->len - 1) == muxer_msg_iter) {
-               g_ptr_array_remove_index(muxer_comp->muxer_msg_iters,
-                       muxer_comp->muxer_msg_iters->len - 1);
-       }
-
        destroy_muxer_msg_iter(muxer_msg_iter);
-       bt_self_message_iterator_set_data(self_msg_iter,
-               NULL);
-       status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
+       bt_self_message_iterator_set_data(self_msg_iter, NULL);
+       status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
 
 end:
        muxer_comp->initializing_muxer_msg_iter = false;
@@ -1318,8 +1331,7 @@ end:
 }
 
 BT_HIDDEN
-void muxer_msg_iter_finalize(
-               bt_self_message_iterator *self_msg_iter)
+void muxer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
 {
        struct muxer_msg_iter *muxer_msg_iter =
                bt_self_message_iterator_get_data(self_msg_iter);
@@ -1335,20 +1347,18 @@ void muxer_msg_iter_finalize(
                "msg-iter-addr=%p",
                self_comp, muxer_comp, muxer_msg_iter, self_msg_iter);
 
-       if (muxer_comp) {
-               (void) g_ptr_array_remove_fast(muxer_comp->muxer_msg_iters,
-                       muxer_msg_iter);
+       if (muxer_msg_iter) {
                destroy_muxer_msg_iter(muxer_msg_iter);
        }
 }
 
 BT_HIDDEN
-bt_message_iterator_status muxer_msg_iter_next(
+bt_self_message_iterator_status muxer_msg_iter_next(
                bt_self_message_iterator *self_msg_iter,
                bt_message_array_const msgs, uint64_t capacity,
                uint64_t *count)
 {
-       bt_message_iterator_status status;
+       bt_self_message_iterator_status status;
        struct muxer_msg_iter *muxer_msg_iter =
                bt_self_message_iterator_get_data(self_msg_iter);
        bt_self_component *self_comp = NULL;
@@ -1372,11 +1382,11 @@ bt_message_iterator_status muxer_msg_iter_next(
                        "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, "
                        "msg-iter-addr=%p, status=%s",
                        self_comp, muxer_comp, muxer_msg_iter, self_msg_iter,
-                       bt_message_iterator_status_string(status));
+                       bt_common_self_message_iterator_status_string(status));
        } else {
                BT_LOGV("Returning from muxer component's message iterator's \"next\" method: "
                        "status=%s",
-                       bt_message_iterator_status_string(status));
+                       bt_common_self_message_iterator_status_string(status));
        }
 
        return status;
@@ -1388,98 +1398,126 @@ bt_self_component_status muxer_input_port_connected(
                bt_self_component_port_input *self_port,
                const bt_port_output *other_port)
 {
-       bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
-       const bt_port *port = bt_self_component_port_as_port(
-               bt_self_component_port_input_as_self_component_port(
-                       self_port));
-       struct muxer_comp *muxer_comp =
-               bt_self_component_get_data(
-                       bt_self_component_filter_as_self_component(
-                               self_comp));
-       size_t i;
-       int ret;
-
-       BT_ASSERT(port);
-       BT_ASSERT(muxer_comp);
-       BT_LOGD("Port connected: "
-               "comp-addr=%p, muxer-comp-addr=%p, "
-               "port-addr=%p, port-name=\"%s\", "
-               "other-port-addr=%p, other-port-name=\"%s\"",
-               self_comp, muxer_comp, self_port, bt_port_get_name(port),
-               other_port,
-               bt_port_get_name(bt_port_output_as_port_const(other_port)));
-
-       for (i = 0; i < muxer_comp->muxer_msg_iters->len; i++) {
-               struct muxer_msg_iter *muxer_msg_iter =
-                       g_ptr_array_index(muxer_comp->muxer_msg_iters, i);
+       bt_self_component_status status;
 
+       status = add_available_input_port(self_comp);
+       if (status) {
                /*
-                * Add this port to the list of newly connected ports
-                * for this muxer message iterator. We append at
-                * the end of this list while
-                * muxer_msg_iter_handle_newly_connected_ports()
-                * removes the nodes from the beginning.
+                * Only way to report an error later since this
+                * method does not return anything.
                 */
-               muxer_msg_iter->newly_connected_self_ports =
-                       g_list_append(
-                               muxer_msg_iter->newly_connected_self_ports,
-                               self_port);
-               if (!muxer_msg_iter->newly_connected_self_ports) {
-                       BT_LOGE("Cannot append port to muxer's message iterator list of newly connected input ports: "
-                               "port-addr=%p, port-name=\"%s\", "
-                               "muxer-msg-iter-addr=%p", self_port,
-                               bt_port_get_name(port), muxer_msg_iter);
-                       status = BT_SELF_COMPONENT_STATUS_ERROR;
+               BT_LOGE("Cannot add one muxer component's input port: "
+                       "status=%s",
+                       bt_self_component_status_string(status));
+               goto end;
+       }
+
+end:
+       return status;
+}
+
+static inline
+bt_bool muxer_upstream_msg_iters_can_all_seek_beginning(
+               GPtrArray *muxer_upstream_msg_iters)
+{
+       uint64_t i;
+       bt_bool ret = BT_TRUE;
+
+       for (i = 0; i < muxer_upstream_msg_iters->len; i++) {
+               struct muxer_upstream_msg_iter *upstream_msg_iter =
+                       muxer_upstream_msg_iters->pdata[i];
+
+               if (!bt_self_component_port_input_message_iterator_can_seek_beginning(
+                               upstream_msg_iter->msg_iter)) {
+                       ret = BT_FALSE;
                        goto end;
                }
+       }
+
+end:
+       return ret;
+}
+
+BT_HIDDEN
+bt_bool muxer_msg_iter_can_seek_beginning(
+               bt_self_message_iterator *self_msg_iter)
+{
+       struct muxer_msg_iter *muxer_msg_iter =
+               bt_self_message_iterator_get_data(self_msg_iter);
+       bt_bool ret = BT_TRUE;
 
-               BT_LOGD("Appended port to muxer's message iterator list of newly connected input ports: "
-                       "port-addr=%p, port-name=\"%s\", "
-                       "muxer-msg-iter-addr=%p", self_port,
-                       bt_port_get_name(port), muxer_msg_iter);
+       if (!muxer_upstream_msg_iters_can_all_seek_beginning(
+                       muxer_msg_iter->active_muxer_upstream_msg_iters)) {
+               ret = BT_FALSE;
+               goto end;
        }
 
-       /* One less available input port */
-       muxer_comp->available_input_ports--;
-       ret = ensure_available_input_port(self_comp);
-       if (ret) {
-               /*
-                * Only way to report an error later since this
-                * method does not return anything.
-                */
-               BT_LOGE("Cannot ensure that at least one muxer component's input port is available: "
-                       "muxer-comp-addr=%p, status=%s",
-                       muxer_comp, bt_self_component_status_string(ret));
-               status = BT_SELF_COMPONENT_STATUS_ERROR;
+       if (!muxer_upstream_msg_iters_can_all_seek_beginning(
+                       muxer_msg_iter->ended_muxer_upstream_msg_iters)) {
+               ret = BT_FALSE;
                goto end;
        }
 
 end:
-       return status;
+       return ret;
 }
 
 BT_HIDDEN
-void muxer_input_port_disconnected(
-               bt_self_component_filter *self_component,
-               bt_self_component_port_input *self_port)
+bt_self_message_iterator_status muxer_msg_iter_seek_beginning(
+               bt_self_message_iterator *self_msg_iter)
 {
-       struct muxer_comp *muxer_comp =
-               bt_self_component_get_data(
-                       bt_self_component_filter_as_self_component(
-                               self_component));
-       const bt_port *port =
-               bt_self_component_port_as_port(
-                       bt_self_component_port_input_as_self_component_port(
-                               self_port));
+       struct muxer_msg_iter *muxer_msg_iter =
+               bt_self_message_iterator_get_data(self_msg_iter);
+       bt_message_iterator_status status = BT_MESSAGE_ITERATOR_STATUS_OK;
+       uint64_t i;
 
-       BT_ASSERT(port);
-       BT_ASSERT(muxer_comp);
+       /* Seek all ended upstream iterators first */
+       for (i = 0; i < muxer_msg_iter->ended_muxer_upstream_msg_iters->len;
+                       i++) {
+               struct muxer_upstream_msg_iter *upstream_msg_iter =
+                       muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i];
 
-       /* One more available input port */
-       muxer_comp->available_input_ports++;
-       BT_LOGD("Leaving disconnected input port available for future connections: "
-               "comp-addr=%p, muxer-comp-addr=%p, port-addr=%p, "
-               "port-name=\"%s\", avail-input-port-count=%zu",
-               self_component, muxer_comp, port, bt_port_get_name(port),
-               muxer_comp->available_input_ports);
+               status = bt_self_component_port_input_message_iterator_seek_beginning(
+                       upstream_msg_iter->msg_iter);
+               if (status != BT_MESSAGE_ITERATOR_STATUS_OK) {
+                       goto end;
+               }
+
+               empty_message_queue(upstream_msg_iter);
+       }
+
+       /* Seek all previously active upstream iterators */
+       for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len;
+                       i++) {
+               struct muxer_upstream_msg_iter *upstream_msg_iter =
+                       muxer_msg_iter->active_muxer_upstream_msg_iters->pdata[i];
+
+               status = bt_self_component_port_input_message_iterator_seek_beginning(
+                       upstream_msg_iter->msg_iter);
+               if (status != BT_MESSAGE_ITERATOR_STATUS_OK) {
+                       goto end;
+               }
+
+               empty_message_queue(upstream_msg_iter);
+       }
+
+       /* Make them all active */
+       for (i = 0; i < muxer_msg_iter->ended_muxer_upstream_msg_iters->len;
+                       i++) {
+               struct muxer_upstream_msg_iter *upstream_msg_iter =
+                       muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i];
+
+               g_ptr_array_add(muxer_msg_iter->active_muxer_upstream_msg_iters,
+                       upstream_msg_iter);
+               muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i] = NULL;
+       }
+
+       g_ptr_array_remove_range(muxer_msg_iter->ended_muxer_upstream_msg_iters,
+               0, muxer_msg_iter->ended_muxer_upstream_msg_iters->len);
+       muxer_msg_iter->last_returned_ts_ns = INT64_MIN;
+       muxer_msg_iter->clock_class_expectation =
+               MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY;
+
+end:
+       return (bt_self_message_iterator_status) status;
 }
This page took 0.041367 seconds and 4 git commands to generate.