Fix: muxer: handle CANCELED status
[babeltrace.git] / plugins / utils / muxer / muxer.c
index e967093b25c02c40fef867a5e2887ae202522b2c..e0553423a927ead2e2d0cf6a13bd06e24e349b82 100644 (file)
 #include <babeltrace/graph/private-port.h>
 #include <plugins-common.h>
 #include <glib.h>
+#include <stdbool.h>
 #include <assert.h>
 
+#define IGNORE_ABSOLUTE_PARAM_NAME     "ignore-absolute"
+
 struct muxer_comp {
        /* Array of struct bt_private_notification_iterator * (weak refs) */
        GPtrArray *muxer_notif_iters;
@@ -51,13 +54,14 @@ struct muxer_comp {
        size_t available_input_ports;
        bool error;
        bool initializing_muxer_notif_iter;
+       bool ignore_absolute;
 };
 
 struct muxer_upstream_notif_iter {
        /* Owned by this, NULL if ended */
        struct bt_notification_iterator *notif_iter;
 
-       /* Owned by this*/
+       /* Weak */
        struct bt_private_port *priv_port;
 
        /*
@@ -86,7 +90,7 @@ struct muxer_notif_iter {
        GPtrArray *muxer_upstream_notif_iters;
 
        /*
-        * List of "recently" connected input ports (owned by this) to
+        * List of "recently" connected input ports (weak) to
         * handle by this muxer notification iterator.
         * muxer_port_connected() adds entries to this list, and the
         * entries are removed when a notification iterator is created
@@ -112,7 +116,6 @@ void destroy_muxer_upstream_notif_iter(
        }
 
        bt_put(muxer_upstream_notif_iter->notif_iter);
-       bt_put(muxer_upstream_notif_iter->priv_port);
        g_free(muxer_upstream_notif_iter);
 }
 
@@ -130,7 +133,7 @@ struct muxer_upstream_notif_iter *muxer_notif_iter_add_upstream_notif_iter(
        }
 
        muxer_upstream_notif_iter->notif_iter = bt_get(notif_iter);
-       muxer_upstream_notif_iter->priv_port = bt_get(priv_port);
+       muxer_upstream_notif_iter->priv_port = priv_port;
        muxer_upstream_notif_iter->is_valid = false;
        g_ptr_array_add(muxer_notif_iter->muxer_upstream_notif_iters,
                muxer_upstream_notif_iter);
@@ -176,36 +179,6 @@ end:
                g_string_free(port_name, TRUE);
        }
 
-       BT_PUT(priv_port);
-       return ret;
-}
-
-static
-int remove_default_ports(struct bt_private_component *priv_comp)
-{
-       struct bt_private_port *priv_port;
-       int ret = 0;
-
-       priv_port = bt_private_component_filter_get_default_input_private_port(
-               priv_comp);
-       if (priv_port) {
-               ret = bt_private_port_remove_from_component(priv_port);
-               if (ret) {
-                       goto end;
-               }
-       }
-
-       bt_put(priv_port);
-       priv_port = bt_private_component_filter_get_default_output_private_port(
-               priv_comp);
-       if (priv_port) {
-               ret = bt_private_port_remove_from_component(priv_port);
-               if (ret) {
-                       goto end;
-               }
-       }
-
-end:
        bt_put(priv_port);
        return ret;
 }
@@ -240,6 +213,75 @@ void destroy_muxer_comp(struct muxer_comp *muxer_comp)
        g_free(muxer_comp);
 }
 
+static
+struct bt_value *get_default_params(void)
+{
+       struct bt_value *params;
+       int ret;
+
+       params = bt_value_map_create();
+       if (!params) {
+               goto error;
+       }
+
+       ret = bt_value_map_insert_bool(params, IGNORE_ABSOLUTE_PARAM_NAME,
+               false);
+       if (ret) {
+               goto error;
+       }
+
+       goto end;
+
+error:
+       BT_PUT(params);
+
+end:
+       return params;
+}
+
+static
+int configure_muxer_comp(struct muxer_comp *muxer_comp, struct bt_value *params)
+{
+       struct bt_value *default_params = NULL;
+       struct bt_value *real_params = NULL;
+       struct bt_value *ignore_absolute = NULL;
+       int ret = 0;
+       bt_bool bool_val;
+
+       default_params = get_default_params();
+       if (!default_params) {
+               goto error;
+       }
+
+       real_params = bt_value_map_extend(default_params, params);
+       if (!real_params) {
+               goto error;
+       }
+
+       ignore_absolute = bt_value_map_get(real_params,
+               IGNORE_ABSOLUTE_PARAM_NAME);
+       if (!bt_value_is_bool(ignore_absolute)) {
+               goto error;
+       }
+
+       if (bt_value_bool_get(ignore_absolute, &bool_val)) {
+               goto error;
+       }
+
+       muxer_comp->ignore_absolute = (bool) bool_val;
+
+       goto end;
+
+error:
+       ret = -1;
+
+end:
+       bt_put(default_params);
+       bt_put(real_params);
+       bt_put(ignore_absolute);
+       return ret;
+}
+
 BT_HIDDEN
 enum bt_component_status muxer_init(
                struct bt_private_component *priv_comp,
@@ -253,6 +295,11 @@ enum bt_component_status muxer_init(
                goto error;
        }
 
+       ret = configure_muxer_comp(muxer_comp, params);
+       if (ret) {
+               goto error;
+       }
+
        muxer_comp->muxer_notif_iters = g_ptr_array_new();
        if (!muxer_comp->muxer_notif_iters) {
                goto error;
@@ -261,11 +308,6 @@ enum bt_component_status muxer_init(
        muxer_comp->priv_comp = priv_comp;
        ret = bt_private_component_set_user_data(priv_comp, muxer_comp);
        assert(ret == 0);
-       ret = remove_default_ports(priv_comp);
-       if (ret) {
-               goto error;
-       }
-
        ret = ensure_available_input_port(priv_comp);
        if (ret) {
                goto error;
@@ -320,7 +362,7 @@ struct bt_notification_iterator *create_notif_iter_on_input_port(
        //       returned notification by the muxer notification
        //       iterator which creates it.
        notif_iter = bt_private_connection_create_notification_iterator(
-               priv_conn);
+               priv_conn, NULL);
        if (!notif_iter) {
                *ret = -1;
                goto end;
@@ -358,7 +400,8 @@ enum bt_notification_iterator_status muxer_upstream_notif_iter_next(
                 */
                muxer_upstream_notif_iter->is_valid = false;
                break;
-       case BT_NOTIFICATION_ITERATOR_STATUS_END:
+       case BT_NOTIFICATION_ITERATOR_STATUS_END:       /* Fall-through. */
+       case BT_NOTIFICATION_ITERATOR_STATUS_CANCELED:
                /*
                 * Notification iterator reached the end: release it. It
                 * won't be considered again to find the youngest
@@ -421,7 +464,6 @@ int muxer_notif_iter_handle_newly_connected_ports(
                        &ret);
                if (ret) {
                        assert(!upstream_notif_iter);
-                       bt_put(priv_port);
                        goto error;
                }
 
@@ -429,7 +471,6 @@ int muxer_notif_iter_handle_newly_connected_ports(
                        muxer_notif_iter_add_upstream_notif_iter(
                                muxer_notif_iter, upstream_notif_iter,
                                priv_port);
-               BT_PUT(priv_port);
                BT_PUT(upstream_notif_iter);
                if (!muxer_upstream_notif_iter) {
                        goto error;
@@ -438,7 +479,6 @@ int muxer_notif_iter_handle_newly_connected_ports(
 remove_node:
                bt_put(upstream_notif_iter);
                bt_put(port);
-               bt_put(priv_port);
                muxer_notif_iter->newly_connected_priv_ports =
                        g_list_delete_link(
                                muxer_notif_iter->newly_connected_priv_ports,
@@ -509,8 +549,8 @@ int get_notif_ts_ns(struct muxer_comp *muxer_comp,
                goto error;
        }
 
-       if (!bt_ctf_clock_class_is_absolute(clock_class)) {
-               // TODO: Allow this with an explicit parameter
+       if (!muxer_comp->ignore_absolute &&
+                       !bt_ctf_clock_class_is_absolute(clock_class)) {
                goto error;
        }
 
@@ -725,7 +765,8 @@ struct bt_notification_iterator_next_return muxer_notif_iter_do_next(
                        muxer_notif_iter, &muxer_upstream_notif_iter,
                        &next_return_ts);
        if (next_return.status < 0 ||
-                       next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_END) {
+                       next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_END ||
+                       next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_CANCELED) {
                goto end;
        }
 
@@ -755,8 +796,6 @@ end:
 static
 void destroy_muxer_notif_iter(struct muxer_notif_iter *muxer_notif_iter)
 {
-       GList *node;
-
        if (!muxer_notif_iter) {
                return;
        }
@@ -766,11 +805,6 @@ void destroy_muxer_notif_iter(struct muxer_notif_iter *muxer_notif_iter)
                        muxer_notif_iter->muxer_upstream_notif_iters, TRUE);
        }
 
-       for (node = muxer_notif_iter->newly_connected_priv_ports;
-                       node; node = g_list_next(node)) {
-               bt_put(node->data);
-       }
-
        g_list_free(muxer_notif_iter->newly_connected_priv_ports);
        g_free(muxer_notif_iter);
 }
@@ -813,12 +847,12 @@ int muxer_notif_iter_init_newly_connected_ports(struct muxer_comp *muxer_comp,
                }
 
                bt_put(port);
+               bt_put(priv_port);
                muxer_notif_iter->newly_connected_priv_ports =
                        g_list_append(
                                muxer_notif_iter->newly_connected_priv_ports,
                                priv_port);
                if (!muxer_notif_iter->newly_connected_priv_ports) {
-                       bt_put(priv_port);
                        ret = -1;
                        goto end;
                }
@@ -974,24 +1008,13 @@ void muxer_port_connected(
        struct muxer_comp *muxer_comp =
                bt_private_component_get_user_data(priv_comp);
        size_t i;
+       int ret;
 
        assert(self_port);
        assert(muxer_comp);
 
-       if (bt_port_get_type(self_port) == BT_PORT_TYPE_INPUT) {
-               int ret;
-
-               /* One less available input port */
-               muxer_comp->available_input_ports--;
-               ret = ensure_available_input_port(priv_comp);
-               if (ret) {
-                       /*
-                        * Only way to report an error later since this
-                        * method does not return anything.
-                        */
-                       muxer_comp->error = true;
-                       goto end;
-               }
+       if (bt_port_get_type(self_port) == BT_PORT_TYPE_OUTPUT) {
+               goto end;
        }
 
        for (i = 0; i < muxer_comp->muxer_notif_iters->len; i++) {
@@ -1010,15 +1033,26 @@ void muxer_port_connected(
                muxer_notif_iter->newly_connected_priv_ports =
                        g_list_append(
                                muxer_notif_iter->newly_connected_priv_ports,
-                               bt_get(self_private_port));
+                               self_private_port);
                if (!muxer_notif_iter->newly_connected_priv_ports) {
                        /* Put reference taken by bt_get() above */
-                       bt_put(self_private_port);
                        muxer_comp->error = true;
                        goto end;
                }
        }
 
+       /* One less available input port */
+       muxer_comp->available_input_ports--;
+       ret = ensure_available_input_port(priv_comp);
+       if (ret) {
+               /*
+                * Only way to report an error later since this
+                * method does not return anything.
+                */
+               muxer_comp->error = true;
+               goto end;
+       }
+
 end:
        bt_put(self_port);
 }
This page took 0.02736 seconds and 4 git commands to generate.