Fix: muxer: handle CANCELED status
[babeltrace.git] / plugins / utils / muxer / muxer.c
index ba2fb3c0064be7c07a211e9fad2dc42b54af7d7f..e0553423a927ead2e2d0cf6a13bd06e24e349b82 100644 (file)
@@ -39,6 +39,7 @@
 #include <babeltrace/graph/private-port.h>
 #include <plugins-common.h>
 #include <glib.h>
+#include <stdbool.h>
 #include <assert.h>
 
 #define IGNORE_ABSOLUTE_PARAM_NAME     "ignore-absolute"
@@ -60,7 +61,7 @@ struct muxer_upstream_notif_iter {
        /* Owned by this, NULL if ended */
        struct bt_notification_iterator *notif_iter;
 
-       /* Owned by this*/
+       /* Weak */
        struct bt_private_port *priv_port;
 
        /*
@@ -89,7 +90,7 @@ struct muxer_notif_iter {
        GPtrArray *muxer_upstream_notif_iters;
 
        /*
-        * List of "recently" connected input ports (owned by this) to
+        * List of "recently" connected input ports (weak) to
         * handle by this muxer notification iterator.
         * muxer_port_connected() adds entries to this list, and the
         * entries are removed when a notification iterator is created
@@ -115,7 +116,6 @@ void destroy_muxer_upstream_notif_iter(
        }
 
        bt_put(muxer_upstream_notif_iter->notif_iter);
-       bt_put(muxer_upstream_notif_iter->priv_port);
        g_free(muxer_upstream_notif_iter);
 }
 
@@ -133,7 +133,7 @@ struct muxer_upstream_notif_iter *muxer_notif_iter_add_upstream_notif_iter(
        }
 
        muxer_upstream_notif_iter->notif_iter = bt_get(notif_iter);
-       muxer_upstream_notif_iter->priv_port = bt_get(priv_port);
+       muxer_upstream_notif_iter->priv_port = priv_port;
        muxer_upstream_notif_iter->is_valid = false;
        g_ptr_array_add(muxer_notif_iter->muxer_upstream_notif_iters,
                muxer_upstream_notif_iter);
@@ -179,7 +179,7 @@ end:
                g_string_free(port_name, TRUE);
        }
 
-       BT_PUT(priv_port);
+       bt_put(priv_port);
        return ret;
 }
 
@@ -246,6 +246,7 @@ int configure_muxer_comp(struct muxer_comp *muxer_comp, struct bt_value *params)
        struct bt_value *real_params = NULL;
        struct bt_value *ignore_absolute = NULL;
        int ret = 0;
+       bt_bool bool_val;
 
        default_params = get_default_params();
        if (!default_params) {
@@ -263,10 +264,12 @@ int configure_muxer_comp(struct muxer_comp *muxer_comp, struct bt_value *params)
                goto error;
        }
 
-       if (bt_value_bool_get(ignore_absolute, &muxer_comp->ignore_absolute)) {
+       if (bt_value_bool_get(ignore_absolute, &bool_val)) {
                goto error;
        }
 
+       muxer_comp->ignore_absolute = (bool) bool_val;
+
        goto end;
 
 error:
@@ -359,7 +362,7 @@ struct bt_notification_iterator *create_notif_iter_on_input_port(
        //       returned notification by the muxer notification
        //       iterator which creates it.
        notif_iter = bt_private_connection_create_notification_iterator(
-               priv_conn);
+               priv_conn, NULL);
        if (!notif_iter) {
                *ret = -1;
                goto end;
@@ -397,7 +400,8 @@ enum bt_notification_iterator_status muxer_upstream_notif_iter_next(
                 */
                muxer_upstream_notif_iter->is_valid = false;
                break;
-       case BT_NOTIFICATION_ITERATOR_STATUS_END:
+       case BT_NOTIFICATION_ITERATOR_STATUS_END:       /* Fall-through. */
+       case BT_NOTIFICATION_ITERATOR_STATUS_CANCELED:
                /*
                 * Notification iterator reached the end: release it. It
                 * won't be considered again to find the youngest
@@ -460,7 +464,6 @@ int muxer_notif_iter_handle_newly_connected_ports(
                        &ret);
                if (ret) {
                        assert(!upstream_notif_iter);
-                       bt_put(priv_port);
                        goto error;
                }
 
@@ -468,7 +471,6 @@ int muxer_notif_iter_handle_newly_connected_ports(
                        muxer_notif_iter_add_upstream_notif_iter(
                                muxer_notif_iter, upstream_notif_iter,
                                priv_port);
-               BT_PUT(priv_port);
                BT_PUT(upstream_notif_iter);
                if (!muxer_upstream_notif_iter) {
                        goto error;
@@ -477,7 +479,6 @@ int muxer_notif_iter_handle_newly_connected_ports(
 remove_node:
                bt_put(upstream_notif_iter);
                bt_put(port);
-               bt_put(priv_port);
                muxer_notif_iter->newly_connected_priv_ports =
                        g_list_delete_link(
                                muxer_notif_iter->newly_connected_priv_ports,
@@ -764,7 +765,8 @@ struct bt_notification_iterator_next_return muxer_notif_iter_do_next(
                        muxer_notif_iter, &muxer_upstream_notif_iter,
                        &next_return_ts);
        if (next_return.status < 0 ||
-                       next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_END) {
+                       next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_END ||
+                       next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_CANCELED) {
                goto end;
        }
 
@@ -794,8 +796,6 @@ end:
 static
 void destroy_muxer_notif_iter(struct muxer_notif_iter *muxer_notif_iter)
 {
-       GList *node;
-
        if (!muxer_notif_iter) {
                return;
        }
@@ -805,11 +805,6 @@ void destroy_muxer_notif_iter(struct muxer_notif_iter *muxer_notif_iter)
                        muxer_notif_iter->muxer_upstream_notif_iters, TRUE);
        }
 
-       for (node = muxer_notif_iter->newly_connected_priv_ports;
-                       node; node = g_list_next(node)) {
-               bt_put(node->data);
-       }
-
        g_list_free(muxer_notif_iter->newly_connected_priv_ports);
        g_free(muxer_notif_iter);
 }
@@ -852,12 +847,12 @@ int muxer_notif_iter_init_newly_connected_ports(struct muxer_comp *muxer_comp,
                }
 
                bt_put(port);
+               bt_put(priv_port);
                muxer_notif_iter->newly_connected_priv_ports =
                        g_list_append(
                                muxer_notif_iter->newly_connected_priv_ports,
                                priv_port);
                if (!muxer_notif_iter->newly_connected_priv_ports) {
-                       bt_put(priv_port);
                        ret = -1;
                        goto end;
                }
@@ -1013,24 +1008,13 @@ void muxer_port_connected(
        struct muxer_comp *muxer_comp =
                bt_private_component_get_user_data(priv_comp);
        size_t i;
+       int ret;
 
        assert(self_port);
        assert(muxer_comp);
 
-       if (bt_port_get_type(self_port) == BT_PORT_TYPE_INPUT) {
-               int ret;
-
-               /* One less available input port */
-               muxer_comp->available_input_ports--;
-               ret = ensure_available_input_port(priv_comp);
-               if (ret) {
-                       /*
-                        * Only way to report an error later since this
-                        * method does not return anything.
-                        */
-                       muxer_comp->error = true;
-                       goto end;
-               }
+       if (bt_port_get_type(self_port) == BT_PORT_TYPE_OUTPUT) {
+               goto end;
        }
 
        for (i = 0; i < muxer_comp->muxer_notif_iters->len; i++) {
@@ -1049,15 +1033,26 @@ void muxer_port_connected(
                muxer_notif_iter->newly_connected_priv_ports =
                        g_list_append(
                                muxer_notif_iter->newly_connected_priv_ports,
-                               bt_get(self_private_port));
+                               self_private_port);
                if (!muxer_notif_iter->newly_connected_priv_ports) {
                        /* Put reference taken by bt_get() above */
-                       bt_put(self_private_port);
                        muxer_comp->error = true;
                        goto end;
                }
        }
 
+       /* One less available input port */
+       muxer_comp->available_input_ports--;
+       ret = ensure_available_input_port(priv_comp);
+       if (ret) {
+               /*
+                * Only way to report an error later since this
+                * method does not return anything.
+                */
+               muxer_comp->error = true;
+               goto end;
+       }
+
 end:
        bt_put(self_port);
 }
This page took 0.025741 seconds and 4 git commands to generate.