utils.muxer: fix behaviour with ports connected during next/init ops
authorPhilippe Proulx <eeppeliteloop@gmail.com>
Tue, 18 Apr 2017 19:02:26 +0000 (15:02 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Sun, 28 May 2017 16:57:41 +0000 (12:57 -0400)
Signed-off-by: Philippe Proulx <eeppeliteloop@gmail.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
plugins/utils/muxer/muxer.c

index 265d653efa32d1d46af20f358898fb490c2230a0..e790ab01ca06710ea56ade50dd72eeeeeb0d3dc9 100644 (file)
@@ -53,20 +53,37 @@ struct muxer_comp {
 };
 
 struct muxer_upstream_notif_iter {
-       /* Owned by this */
+       /* Owned by this, NULL if ended */
        struct bt_notification_iterator *notif_iter;
 
        /* Owned by this*/
        struct bt_private_port *priv_port;
 };
 
-
 struct muxer_notif_iter {
-       /* Array of struct muxer_upstream_notif_iter * (owned by this) */
+       /*
+        * Array of struct muxer_upstream_notif_iter * (owned by this).
+        *
+        * NOTE: This array is searched in linearly to find the youngest
+        * current notification. Keep this until benchmarks confirm that
+        * another data structure is faster than this for our typical
+        * use cases.
+        */
        GPtrArray *muxer_upstream_notif_iters;
 
        /* Array of struct muxer_upstream_notif_iter * (weak refs) */
-       GPtrArray *muxer_upstream_notif_iters_retry;
+       GList *muxer_upstream_notif_iters_to_retry;
+
+       /*
+        * List of "recently" connected input ports (owned by this) to
+        * handle by this muxer notification iterator.
+        * muxer_port_connected() adds entries to this list, and the
+        * entries are removed when a notification iterator is created
+        * on the port's connection and put into
+        * muxer_upstream_notif_iters above by
+        * muxer_notif_iter_handle_newly_connected_ports().
+        */
+       GList *newly_connected_priv_ports;
 
        /* Next thing to return by the "next" method */
        struct bt_notification_iterator_next_return next_next_return;
@@ -76,6 +93,19 @@ struct muxer_notif_iter {
        int64_t last_returned_ts_ns;
 };
 
+static
+void destroy_muxer_upstream_notif_iter(
+               struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
+{
+       if (!muxer_upstream_notif_iter) {
+               return;
+       }
+
+       bt_put(muxer_upstream_notif_iter->notif_iter);
+       bt_put(muxer_upstream_notif_iter->priv_port);
+       g_free(muxer_upstream_notif_iter);
+}
+
 static
 struct muxer_upstream_notif_iter *muxer_notif_iter_add_upstream_notif_iter(
                struct muxer_notif_iter *muxer_notif_iter,
@@ -103,7 +133,7 @@ bool muxer_notif_iter_has_upstream_notif_iter_to_retry(
                struct muxer_notif_iter *muxer_notif_iter)
 {
        assert(muxer_notif_iter);
-       return muxer_notif_iter->muxer_upstream_notif_iters_retry->len > 0;
+       return muxer_notif_iter->muxer_upstream_notif_iters_to_retry != NULL;
 }
 
 static
@@ -113,44 +143,10 @@ void muxer_notif_iter_add_upstream_notif_iter_to_retry(
 {
        assert(muxer_notif_iter);
        assert(muxer_upstream_notif_iter);
-       g_ptr_array_add(muxer_notif_iter->muxer_upstream_notif_iters_retry,
-               muxer_upstream_notif_iter);
-}
-
-static
-void destroy_muxer_upstream_notif_iter(
-               struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
-{
-       if (!muxer_upstream_notif_iter) {
-               return;
-       }
-
-       bt_put(muxer_upstream_notif_iter->notif_iter);
-       bt_put(muxer_upstream_notif_iter->priv_port);
-       g_free(muxer_upstream_notif_iter);
-}
-
-static
-bool muxer_notif_iter_has_upstream_notif_iter_on_port(
-               struct muxer_notif_iter *muxer_notif_iter,
-               struct bt_private_port *priv_port)
-{
-       size_t i;
-       bool exists = false;
-
-       for (i = 0; i < muxer_notif_iter->muxer_upstream_notif_iters->len; i++) {
-               struct muxer_upstream_notif_iter *muxer_upstream_notif_iter =
-                       g_ptr_array_index(
-                               muxer_notif_iter->muxer_upstream_notif_iters, i);
-
-               if (muxer_upstream_notif_iter->priv_port == priv_port) {
-                       exists = true;
-                       goto end;
-               }
-       }
-
-end:
-       return exists;
+       muxer_notif_iter->muxer_upstream_notif_iters_to_retry =
+               g_list_append(
+                       muxer_notif_iter->muxer_upstream_notif_iters_to_retry,
+                       muxer_upstream_notif_iter);
 }
 
 static
@@ -330,6 +326,9 @@ struct bt_notification_iterator *create_notif_iter_on_input_port(
                goto end;
        }
 
+       // TODO: Advance the iterator to >= the time of the latest
+       //       returned notification by the muxer notification
+       //       iterator which creates it.
        notif_iter = bt_private_connection_create_notification_iterator(
                priv_conn);
        if (!notif_iter) {
@@ -343,6 +342,155 @@ end:
        return notif_iter;
 }
 
+static
+int muxer_upstream_notif_iter_next(struct muxer_notif_iter *muxer_notif_iter,
+               struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
+{
+       int ret = 0;
+       enum bt_notification_iterator_status next_status;
+
+       next_status = bt_notification_iterator_next(
+               muxer_upstream_notif_iter->notif_iter);
+
+       switch (next_status) {
+       case BT_NOTIFICATION_ITERATOR_STATUS_OK:
+               /* Everything okay */
+               break;
+       case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
+               muxer_notif_iter_add_upstream_notif_iter_to_retry(
+                       muxer_notif_iter, muxer_upstream_notif_iter);
+               break;
+       case BT_NOTIFICATION_ITERATOR_STATUS_END:
+               /*
+                * Notification iterator reached the end: release it. It
+                * won't be considered again to find the youngest
+                * notification.
+                */
+               BT_PUT(muxer_upstream_notif_iter->notif_iter);
+               goto end;
+       default:
+               /* Error or unsupported status code */
+               ret = next_status;
+       }
+
+end:
+       return ret;
+}
+
+static
+int muxer_notif_iter_handle_newly_connected_ports(struct muxer_comp *muxer_comp,
+               struct muxer_notif_iter *muxer_notif_iter)
+{
+       struct bt_component *comp = NULL;
+       int ret = 0;
+
+       comp = bt_component_from_private_component(muxer_comp->priv_comp);
+       assert(comp);
+
+       /*
+        * Here we create one upstream notification iterator for each
+        * newly connected port. The list of newly connected ports to
+        * handle here is updated by muxer_port_connected().
+        *
+        * An initial "next" operation is performed on each new upstream
+        * notification iterator. The possible return values of this
+        * initial "next" operation are:
+        *
+        * * BT_NOTIFICATION_ITERATOR_STATUS_OK: Perfect, we have a
+        *   current notification.
+        *
+        * * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN: No notification so
+        *   far, but the muxer upstream notification iterator is added
+        *   to the list of upstream notification iterators to retry
+        *   before finding the next youngest notification.
+        *
+        * * BT_NOTIFICATION_ITERATOR_STATUS_END: No notification, and
+        *   we immediately release the upstream notification iterator
+        *   because it's useless.
+        *
+        * A possible side effect of this initial "next" operation, on
+        * each notification iterator, is the connection of a new port.
+        * In this case the list of newly connected ports is updated and
+        * this loop continues.
+        *
+        * Once this loop finishes successfully, the set of upstream
+        * notification iterators is considered _stable_, that is, it is
+        * safe, if no notification iterators must be retried, to select
+        * the youngest notification amongst them to be returned by the
+        * next "next" method call.
+        */
+       while (true) {
+               GList *node = muxer_notif_iter->newly_connected_priv_ports;
+               struct bt_private_port *priv_port;
+               struct bt_port *port;
+               struct bt_notification_iterator *upstream_notif_iter = NULL;
+               struct muxer_upstream_notif_iter *muxer_upstream_notif_iter;
+
+               if (!node) {
+                       break;
+               }
+
+               priv_port = node->data;
+               port = bt_port_from_private_port(priv_port);
+               assert(port);
+
+               if (!bt_port_is_connected(port)) {
+                       /*
+                        * Looks like this port is not connected
+                        * anymore: we can't create an upstream
+                        * notification iterator on its connection in
+                        * this case.
+                        */
+                       goto remove_node;
+               }
+
+               BT_PUT(port);
+               upstream_notif_iter = create_notif_iter_on_input_port(priv_port,
+                       &ret);
+               if (ret) {
+                       assert(!upstream_notif_iter);
+                       bt_put(priv_port);
+                       goto error;
+               }
+
+               muxer_upstream_notif_iter =
+                       muxer_notif_iter_add_upstream_notif_iter(
+                               muxer_notif_iter, upstream_notif_iter,
+                               priv_port);
+               BT_PUT(priv_port);
+               BT_PUT(upstream_notif_iter);
+               if (!muxer_upstream_notif_iter) {
+                       goto error;
+               }
+
+               ret = muxer_upstream_notif_iter_next(muxer_notif_iter,
+                       muxer_upstream_notif_iter);
+               if (ret) {
+                       goto error;
+               }
+
+remove_node:
+               bt_put(upstream_notif_iter);
+               bt_put(port);
+               bt_put(priv_port);
+               muxer_notif_iter->newly_connected_priv_ports =
+                       g_list_delete_link(
+                               muxer_notif_iter->newly_connected_priv_ports,
+                               node);
+       }
+
+       goto end;
+
+error:
+       if (ret == 0) {
+               ret = -1;
+       }
+
+end:
+       bt_put(comp);
+       return ret;
+}
+
 static
 int get_notif_ts_ns(struct muxer_comp *muxer_comp,
                struct bt_notification *notif, int64_t last_returned_ts_ns,
@@ -400,16 +548,14 @@ int get_notif_ts_ns(struct muxer_comp *muxer_comp,
        }
 
        if (!bt_ctf_clock_class_get_is_absolute(clock_class)) {
+               // TODO: Allow this with an explicit parameter
                goto error;
        }
 
        switch (bt_notification_get_type(notif)) {
        case BT_NOTIFICATION_TYPE_EVENT:
                event = bt_notification_event_get_event(notif);
-               if (!event) {
-                       goto error;
-               }
-
+               assert(event);
                clock_value = bt_ctf_event_get_clock_value(event,
                        clock_class);
                break;
@@ -443,6 +589,23 @@ end:
        return ret;
 }
 
+/*
+ * This function finds the youngest available notification amongst the
+ * non-ended upstream notification iterators and returns the upstream
+ * notification iterator which has it, or
+ * BT_NOTIFICATION_ITERATOR_STATUS_END if there's no available
+ * notification.
+ *
+ * This function does NOT:
+ *
+ * * Update any upstream notification iterator.
+ * * Check for newly connected ports.
+ * * Check the upstream notification iterators to retry.
+ *
+ * On sucess, this function sets *muxer_upstream_notif_iter to the
+ * upstream notification iterator of which the current notification is
+ * the youngest, and sets *ts_ns to its time.
+ */
 static
 enum bt_notification_iterator_status
 muxer_notif_iter_youngest_upstream_notif_iter(
@@ -469,7 +632,7 @@ muxer_notif_iter_youngest_upstream_notif_iter(
                int64_t notif_ts_ns;
 
                if (!cur_muxer_upstream_notif_iter->notif_iter) {
-                       /* This upstream notification iterator is done */
+                       /* This upstream notification iterator is ended */
                        continue;
                }
 
@@ -507,14 +670,32 @@ int muxer_notif_iter_set_next_next_return(struct muxer_comp *muxer_comp,
                struct muxer_notif_iter *muxer_notif_iter)
 {
        struct muxer_upstream_notif_iter *muxer_upstream_notif_iter;
-       struct bt_notification *notif = NULL;
        enum bt_notification_iterator_status notif_iter_status;
        int ret = 0;
 
-       if (muxer_notif_iter_has_upstream_notif_iter_to_retry(muxer_notif_iter)) {
+       /*
+        * Previous operations might have connected ports. They must be
+        * considered when finding the youngest notification because
+        * their upstream notification iterator does not exist yet.
+        */
+       ret = muxer_notif_iter_handle_newly_connected_ports(muxer_comp,
+               muxer_notif_iter);
+       if (ret) {
+               muxer_notif_iter->next_next_return.status =
+                       BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+               BT_PUT(muxer_notif_iter->next_next_return.notification);
+               goto end;
+       }
+
+       assert(!muxer_notif_iter->newly_connected_priv_ports);
+
+       if (muxer_notif_iter_has_upstream_notif_iter_to_retry(
+                       muxer_notif_iter)) {
                /*
                 * At least one upstream notification iterator to retry:
-                * try again later.
+                * try again later, because we cannot find the youngest
+                * notification if we don't have the current
+                * notification of each upstream notification iterator.
                 */
                muxer_notif_iter->next_next_return.status =
                        BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
@@ -523,8 +704,13 @@ int muxer_notif_iter_set_next_next_return(struct muxer_comp *muxer_comp,
        }
 
        /*
-        * Pick the current youngest notification and advance this
-        * upstream notification iterator.
+        * At this point we know that all our connected ports have an
+        * upstream notification iterator, and that all those iterators
+        * have a current notification (stable state). It is safe to
+        * find the youngest notification. It is possible that calling
+        * "next" on its iterator will connect new ports. This will be
+        * handled by the next call to
+        * muxer_notif_iter_set_next_next_return().
         */
        notif_iter_status =
                muxer_notif_iter_youngest_upstream_notif_iter(muxer_comp,
@@ -543,35 +729,20 @@ int muxer_notif_iter_set_next_next_return(struct muxer_comp *muxer_comp,
                goto end;
        }
 
-       assert(muxer_upstream_notif_iter);
-       notif = bt_notification_iterator_get_notification(
-               muxer_upstream_notif_iter->notif_iter);
-       assert(notif);
+       assert(notif_iter_status == BT_NOTIFICATION_ITERATOR_STATUS_OK);
+       BT_PUT(muxer_notif_iter->next_next_return.notification);
+       muxer_notif_iter->next_next_return.notification =
+               bt_notification_iterator_get_notification(
+                       muxer_upstream_notif_iter->notif_iter);
+       assert(muxer_notif_iter->next_next_return.notification);
        muxer_notif_iter->next_next_return.status =
                BT_NOTIFICATION_ITERATOR_STATUS_OK;
-       BT_MOVE(muxer_notif_iter->next_next_return.notification, notif);
-       notif_iter_status = bt_notification_iterator_next(
-               muxer_upstream_notif_iter->notif_iter);
-       if (notif_iter_status < 0) {
-               ret = -1;
+       ret = muxer_upstream_notif_iter_next(muxer_notif_iter,
+               muxer_upstream_notif_iter);
+       if (ret) {
                goto end;
        }
 
-       if (notif_iter_status == BT_NOTIFICATION_ITERATOR_STATUS_END) {
-               /* This upstream notification iterator is done */
-               BT_PUT(muxer_upstream_notif_iter->notif_iter);
-               goto ensure_monotonic;
-       }
-
-       assert(notif_iter_status == BT_NOTIFICATION_ITERATOR_STATUS_OK ||
-               notif_iter_status == BT_NOTIFICATION_ITERATOR_STATUS_AGAIN);
-
-       if (notif_iter_status == BT_NOTIFICATION_ITERATOR_STATUS_AGAIN) {
-               muxer_notif_iter_add_upstream_notif_iter_to_retry(
-                       muxer_notif_iter, muxer_upstream_notif_iter);
-       }
-
-ensure_monotonic:
        /*
         * Here we have the next "next" return value. It won't change
         * until it is returned by the next call to our "next" method.
@@ -588,8 +759,8 @@ ensure_monotonic:
        /*
         * We are now sure that the next "next" return value will not
         * change until it is returned by this muxer notification
-        * iterator. It is now safe to set the last returned time
-        * to this one.
+        * iterator (unless there's a fatal error). It is now safe to
+        * set the last returned time to this one.
         */
        muxer_notif_iter->last_returned_ts_ns =
                muxer_notif_iter->next_next_return_ts_ns;
@@ -599,95 +770,79 @@ end:
 }
 
 static
-int muxer_notif_iter_update_upstream_notif_iters(struct muxer_comp *muxer_comp,
+void destroy_muxer_notif_iter(struct muxer_notif_iter *muxer_notif_iter)
+{
+       GList *node;
+
+       if (!muxer_notif_iter) {
+               return;
+       }
+
+       if (muxer_notif_iter->muxer_upstream_notif_iters) {
+               g_ptr_array_free(
+                       muxer_notif_iter->muxer_upstream_notif_iters, TRUE);
+       }
+
+       if (muxer_notif_iter->muxer_upstream_notif_iters_to_retry) {
+               g_list_free(muxer_notif_iter->muxer_upstream_notif_iters_to_retry);
+       }
+
+       for (node = muxer_notif_iter->newly_connected_priv_ports;
+                       node; node = g_list_next(node)) {
+               bt_put(node->data);
+       }
+
+       g_list_free(muxer_notif_iter->newly_connected_priv_ports);
+       g_free(muxer_notif_iter);
+}
+
+static
+int muxer_notif_iter_init_newly_connected_ports(struct muxer_comp *muxer_comp,
                struct muxer_notif_iter *muxer_notif_iter)
 {
-       struct bt_component *comp = NULL;
-       int ret = 0;
+       struct bt_component *comp;
        uint64_t count;
-       size_t i;
+       uint64_t i;
+       int ret = 0;
 
+       /*
+        * Add the connected input ports to this muxer notification
+        * iterator's list of newly connected ports. They will be
+        * handled by muxer_notif_iter_handle_newly_connected_ports().
+        */
        comp = bt_component_from_private_component(muxer_comp->priv_comp);
        assert(comp);
        ret = bt_component_filter_get_input_port_count(comp, &count);
-       assert(ret == 0);
+       if (ret) {
+               goto end;
+       }
 
        for (i = 0; i < count; i++) {
                struct bt_private_port *priv_port =
                        bt_private_component_filter_get_input_private_port_at_index(
                                muxer_comp->priv_comp, i);
                struct bt_port *port;
-               struct bt_notification_iterator *upstream_notif_iter;
-               enum bt_notification_iterator_status next_status;
-               struct muxer_upstream_notif_iter *muxer_upstream_notif_iter;
 
                assert(priv_port);
-
-               if (muxer_notif_iter_has_upstream_notif_iter_on_port(
-                               muxer_notif_iter, priv_port)) {
-                       bt_put(priv_port);
-                       continue;
-               }
-
                port = bt_port_from_private_port(priv_port);
+               assert(port);
 
                if (!bt_port_is_connected(port)) {
-                       bt_put(port);
                        bt_put(priv_port);
+                       bt_put(port);
                        continue;
                }
 
                bt_put(port);
-               upstream_notif_iter = create_notif_iter_on_input_port(priv_port,
-                       &ret);
-               if (ret) {
-                       assert(!upstream_notif_iter);
-                       bt_put(priv_port);
-                       goto error;
-               }
-
-               next_status = bt_notification_iterator_next(
-                       upstream_notif_iter);
-               if (next_status < 0) {
-                       bt_put(priv_port);
-                       bt_put(upstream_notif_iter);
-                       ret = next_status;
-                       goto error;
-               }
-
-               if (next_status == BT_NOTIFICATION_ITERATOR_STATUS_END) {
-                       /* Already the end: do not even keep it */
-                       bt_put(priv_port);
-                       bt_put(upstream_notif_iter);
-                       continue;
-               }
-
-               assert(next_status == BT_NOTIFICATION_ITERATOR_STATUS_OK ||
-                       next_status == BT_NOTIFICATION_ITERATOR_STATUS_AGAIN);
-               muxer_upstream_notif_iter =
-                       muxer_notif_iter_add_upstream_notif_iter(
-                               muxer_notif_iter, upstream_notif_iter,
+               muxer_notif_iter->newly_connected_priv_ports =
+                       g_list_append(
+                               muxer_notif_iter->newly_connected_priv_ports,
                                priv_port);
-               if (!muxer_upstream_notif_iter) {
+               if (!muxer_notif_iter->newly_connected_priv_ports) {
                        bt_put(priv_port);
-                       bt_put(upstream_notif_iter);
-                       goto error;
-               }
-
-               if (next_status == BT_NOTIFICATION_ITERATOR_STATUS_AGAIN) {
-                       muxer_notif_iter_add_upstream_notif_iter_to_retry(
-                               muxer_notif_iter, muxer_upstream_notif_iter);
+                       ret = -1;
+                       goto end;
                }
-
-               bt_put(priv_port);
-               bt_put(upstream_notif_iter);
-       }
-
-       goto end;
-
-error:
-       if (ret >= 0) {
-               ret = -1;
        }
 
 end:
@@ -695,27 +850,6 @@ end:
        return ret;
 }
 
-static
-void destroy_muxer_notif_iter(struct muxer_notif_iter *muxer_notif_iter)
-{
-       if (!muxer_notif_iter) {
-               return;
-       }
-
-       if (muxer_notif_iter->muxer_upstream_notif_iters) {
-               g_ptr_array_free(
-                       muxer_notif_iter->muxer_upstream_notif_iters, TRUE);
-       }
-
-       if (muxer_notif_iter->muxer_upstream_notif_iters_retry) {
-               g_ptr_array_free(
-                       muxer_notif_iter->muxer_upstream_notif_iters_retry,
-                       TRUE);
-       }
-
-       g_free(muxer_notif_iter);
-}
-
 BT_HIDDEN
 enum bt_notification_iterator_status muxer_notif_iter_init(
                struct bt_private_notification_iterator *priv_notif_iter,
@@ -738,6 +872,12 @@ enum bt_notification_iterator_status muxer_notif_iter_init(
                goto error;
        }
 
+       ret = muxer_notif_iter_init_newly_connected_ports(muxer_comp,
+               muxer_notif_iter);
+       if (ret) {
+               goto error;
+       }
+
        muxer_notif_iter->last_returned_ts_ns = INT64_MIN;
        muxer_notif_iter->muxer_upstream_notif_iters =
                g_ptr_array_new_with_free_func(
@@ -746,28 +886,7 @@ enum bt_notification_iterator_status muxer_notif_iter_init(
                goto error;
        }
 
-       muxer_notif_iter->muxer_upstream_notif_iters_retry = g_ptr_array_new();
-       if (!muxer_notif_iter->muxer_upstream_notif_iters_retry) {
-               goto error;
-       }
-
-       /*
-        * Initial upstream notification iterator update: this creates
-        * one upstream notification iterator for each connected port
-        * without an upstream notification iterator (for this muxer
-        * notification iterator).
-        *
-        * At this point the next "next" return value is not set yet.
-        */
-       ret = muxer_notif_iter_update_upstream_notif_iters(muxer_comp,
-               muxer_notif_iter);
-       if (ret) {
-               goto error;
-       }
-
-       /*
-        * Set the initial "next" return value.
-        */
+       /* Set the initial "next" return value */
        ret = muxer_notif_iter_set_next_next_return(muxer_comp,
                muxer_notif_iter);
        if (ret) {
@@ -826,7 +945,7 @@ struct bt_notification_iterator_next_return muxer_notif_iter_next(
                bt_private_notification_iterator_get_user_data(priv_notif_iter);
        struct bt_private_component *priv_comp = NULL;
        struct muxer_comp *muxer_comp = NULL;
-       size_t i;
+       GList *retry_node;
        int ret;
 
        assert(muxer_notif_iter);
@@ -847,32 +966,31 @@ struct bt_notification_iterator_next_return muxer_notif_iter_next(
         * is in "end" state, we set it to NULL in this array. Then
         * we remove all the NULL values from this array.
         */
-       for (i = 0; i < muxer_notif_iter->muxer_upstream_notif_iters_retry->len; i++) {
+       retry_node = muxer_notif_iter->muxer_upstream_notif_iters_to_retry;
+       while (retry_node) {
                struct muxer_upstream_notif_iter *muxer_upstream_notif_iter =
-                       g_ptr_array_index(muxer_notif_iter->muxer_upstream_notif_iters_retry, i);
+                       retry_node->data;
                enum bt_notification_iterator_status status;
+               GList *next_retry_node = g_list_next(retry_node);
 
                assert(muxer_upstream_notif_iter->notif_iter);
                status = bt_notification_iterator_next(
                        muxer_upstream_notif_iter->notif_iter);
                if (status < 0) {
-                       /*
-                        * Technically we have a next "next" return
-                        * value which is ready for this call, but we're
-                        * failing within this call, so discard this
-                        * buffer.
-                        */
                        goto error;
                }
 
                if (status == BT_NOTIFICATION_ITERATOR_STATUS_END) {
                        /*
                         * This upstream notification iterator is done.
-                        * Set it to NULL so that it's removed later.
+                        * Put the iterator and remove node from list.
                         */
-                       g_ptr_array_index(muxer_notif_iter->muxer_upstream_notif_iters_retry,
-                               i) = NULL;
                        BT_PUT(muxer_upstream_notif_iter->notif_iter);
+                       muxer_notif_iter->muxer_upstream_notif_iters_to_retry =
+                               g_list_delete_link(
+                                       muxer_notif_iter->muxer_upstream_notif_iters_to_retry,
+                                       retry_node);
+                       retry_node = next_retry_node;
                        continue;
                }
 
@@ -882,20 +1000,16 @@ struct bt_notification_iterator_next_return muxer_notif_iter_next(
                if (status == BT_NOTIFICATION_ITERATOR_STATUS_OK) {
                        /*
                         * This upstream notification iterator now has.
-                        * a notification. Remove it from this array.
+                        * a notification. Remove it from this list.
                         */
-                       g_ptr_array_index(muxer_notif_iter->muxer_upstream_notif_iters_retry,
-                               i) = NULL;
-                       continue;
+                       muxer_notif_iter->muxer_upstream_notif_iters_to_retry =
+                               g_list_delete_link(
+                                       muxer_notif_iter->muxer_upstream_notif_iters_to_retry,
+                                       retry_node);
                }
-       }
 
-       /*
-        * Remove NULL values from the array of upstream notification
-        * iterators to retry.
-        */
-       while (g_ptr_array_remove_fast(
-               muxer_notif_iter->muxer_upstream_notif_iters_retry, NULL));
+               retry_node = next_retry_node;
+       }
 
        /* Take our next "next" next return value */
        next_ret = muxer_notif_iter->next_next_return;
@@ -913,6 +1027,11 @@ struct bt_notification_iterator_next_return muxer_notif_iter_next(
        goto end;
 
 error:
+       /*
+        * Technically we already have a next "next" return value which
+        * is ready for this call, but we're failing within this call,
+        * so discard this buffer and return the error ASAP.
+        */
        BT_PUT(next_ret.notification);
        next_ret.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
 
@@ -932,7 +1051,6 @@ void muxer_port_connected(
        struct muxer_comp *muxer_comp =
                bt_private_component_get_user_data(priv_comp);
        size_t i;
-       int ret;
 
        assert(self_port);
        assert(muxer_comp);
@@ -954,16 +1072,20 @@ void muxer_port_connected(
                        g_ptr_array_index(muxer_comp->muxer_notif_iters, i);
 
                /*
-                * Here we update the list of upstream notification
-                * iterators, but we do NOT call
-                * muxer_notif_iter_set_next_next_return() because we
-                * already have a next "next" return value at this point
-                * (right after the muxer notification iterator
-                * initialization, and always after).
+                * Add this port to the list of newly connected ports
+                * for this muxer notification iterator. We append at
+                * the end of this list while
+                * muxer_notif_iter_handle_newly_connected_ports()
+                * removes the nodes from the beginning.
+                *
+                * The list node owns the private port.
                 */
-               ret = muxer_notif_iter_update_upstream_notif_iters(muxer_comp,
-                       muxer_notif_iter);
-               if (ret) {
+               muxer_notif_iter->newly_connected_priv_ports =
+                       g_list_append(
+                               muxer_notif_iter->newly_connected_priv_ports,
+                               bt_get(self_private_port));
+               if (!muxer_notif_iter->newly_connected_priv_ports) {
+                       bt_put(self_private_port);
                        muxer_comp->error = true;
                        goto end;
                }
@@ -984,6 +1106,16 @@ void muxer_port_disconnected(struct bt_private_component *priv_comp,
        assert(port);
        assert(muxer_comp);
 
+       /*
+        * There's nothing special to do when a port is disconnected
+        * because this component deals with upstream notification
+        * iterators which were already created thanks to connected
+        * ports. The fact that the port is disconnected does not cancel
+        * the upstream notification iterators created using its
+        * connection: they still exist. The only way to remove an
+        * upstream notification iterator is for its "next" operation to
+        * return BT_NOTIFICATION_ITERATOR_STATUS_END.
+        */
        if (bt_port_get_type(port) == BT_PORT_TYPE_INPUT) {
                /* One more available input port */
                muxer_comp->available_input_ports++;
This page took 0.0348349999999999 seconds and 4 git commands to generate.