Make bt_private_connection_create_notification_iterator() return a status code
[babeltrace.git] / plugins / utils / muxer / muxer.c
index 67ba0a99421b805e61ff149c90115df30c60b50d..47ffb866002018bdf5b5471ed6f7126fc355151c 100644 (file)
 #include <babeltrace/graph/private-connection.h>
 #include <babeltrace/graph/private-notification-iterator.h>
 #include <babeltrace/graph/private-port.h>
+#include <babeltrace/graph/connection.h>
 #include <plugins-common.h>
 #include <glib.h>
 #include <stdbool.h>
 #include <assert.h>
+#include <stdlib.h>
 
-#define IGNORE_ABSOLUTE_PARAM_NAME     "ignore-absolute"
+#define ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME       "assume-absolute-clock-classes"
 
 struct muxer_comp {
        /* Array of struct bt_private_notification_iterator * (weak refs) */
@@ -61,9 +63,6 @@ struct muxer_upstream_notif_iter {
        /* Owned by this, NULL if ended */
        struct bt_notification_iterator *notif_iter;
 
-       /* Owned by this*/
-       struct bt_private_port *priv_port;
-
        /*
         * This flag is true if the upstream notification iterator's
         * current notification must be considered for the multiplexing
@@ -90,7 +89,7 @@ struct muxer_notif_iter {
        GPtrArray *muxer_upstream_notif_iters;
 
        /*
-        * List of "recently" connected input ports (owned by this) to
+        * List of "recently" connected input ports (weak) to
         * handle by this muxer notification iterator.
         * muxer_port_connected() adds entries to this list, and the
         * entries are removed when a notification iterator is created
@@ -116,7 +115,6 @@ void destroy_muxer_upstream_notif_iter(
        }
 
        bt_put(muxer_upstream_notif_iter->notif_iter);
-       bt_put(muxer_upstream_notif_iter->priv_port);
        g_free(muxer_upstream_notif_iter);
 }
 
@@ -134,7 +132,6 @@ struct muxer_upstream_notif_iter *muxer_notif_iter_add_upstream_notif_iter(
        }
 
        muxer_upstream_notif_iter->notif_iter = bt_get(notif_iter);
-       muxer_upstream_notif_iter->priv_port = bt_get(priv_port);
        muxer_upstream_notif_iter->is_valid = false;
        g_ptr_array_add(muxer_notif_iter->muxer_upstream_notif_iters,
                muxer_upstream_notif_iter);
@@ -144,13 +141,13 @@ end:
 }
 
 static
-int ensure_available_input_port(struct bt_private_component *priv_comp)
+enum bt_component_status ensure_available_input_port(
+               struct bt_private_component *priv_comp)
 {
        struct muxer_comp *muxer_comp =
                bt_private_component_get_user_data(priv_comp);
-       int ret = 0;
+       enum bt_component_status status = BT_COMPONENT_STATUS_OK;
        GString *port_name = NULL;
-       void *priv_port = NULL;
 
        assert(muxer_comp);
 
@@ -160,15 +157,14 @@ int ensure_available_input_port(struct bt_private_component *priv_comp)
 
        port_name = g_string_new("in");
        if (!port_name) {
-               ret = -1;
+               status = BT_COMPONENT_STATUS_NOMEM;
                goto end;
        }
 
        g_string_append_printf(port_name, "%u", muxer_comp->next_port_num);
-       priv_port = bt_private_component_filter_add_input_private_port(
-               priv_comp, port_name->str, NULL);
-       if (!priv_port) {
-               ret = -1;
+       status = bt_private_component_filter_add_input_private_port(
+               priv_comp, port_name->str, NULL, NULL);
+       if (status != BT_COMPONENT_STATUS_OK) {
                goto end;
        }
 
@@ -180,24 +176,15 @@ end:
                g_string_free(port_name, TRUE);
        }
 
-       BT_PUT(priv_port);
-       return ret;
+       return status;
 }
 
 static
-int create_output_port(struct bt_private_component *priv_comp)
+enum bt_component_status create_output_port(
+               struct bt_private_component *priv_comp)
 {
-       void *priv_port;
-       int ret = 0;
-
-       priv_port = bt_private_component_filter_add_output_private_port(
-               priv_comp, "out", NULL);
-       if (!priv_port) {
-               ret = -1;
-       }
-
-       bt_put(priv_port);
-       return ret;
+       return bt_private_component_filter_add_output_private_port(
+               priv_comp, "out", NULL, NULL);
 }
 
 static
@@ -225,8 +212,8 @@ struct bt_value *get_default_params(void)
                goto error;
        }
 
-       ret = bt_value_map_insert_bool(params, IGNORE_ABSOLUTE_PARAM_NAME,
-               false);
+       ret = bt_value_map_insert_bool(params,
+               ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME, false);
        if (ret) {
                goto error;
        }
@@ -260,7 +247,7 @@ int configure_muxer_comp(struct muxer_comp *muxer_comp, struct bt_value *params)
        }
 
        ignore_absolute = bt_value_map_get(real_params,
-               IGNORE_ABSOLUTE_PARAM_NAME);
+               ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME);
        if (!bt_value_is_bool(ignore_absolute)) {
                goto error;
        }
@@ -309,8 +296,8 @@ enum bt_component_status muxer_init(
        muxer_comp->priv_comp = priv_comp;
        ret = bt_private_component_set_user_data(priv_comp, muxer_comp);
        assert(ret == 0);
-       ret = ensure_available_input_port(priv_comp);
-       if (ret) {
+       status = ensure_available_input_port(priv_comp);
+       if (status != BT_COMPONENT_STATUS_OK) {
                goto error;
        }
 
@@ -325,7 +312,10 @@ error:
        destroy_muxer_comp(muxer_comp);
        ret = bt_private_component_set_user_data(priv_comp, NULL);
        assert(ret == 0);
-       status = BT_COMPONENT_STATUS_ERROR;
+
+       if (status == BT_COMPONENT_STATUS_OK) {
+               status = BT_COMPONENT_STATUS_ERROR;
+       }
 
 end:
        return status;
@@ -347,6 +337,7 @@ struct bt_notification_iterator *create_notif_iter_on_input_port(
        struct bt_port *port = bt_port_from_private_port(priv_port);
        struct bt_notification_iterator *notif_iter = NULL;
        struct bt_private_connection *priv_conn = NULL;
+       enum bt_connection_status conn_status;
 
        assert(ret);
        *ret = 0;
@@ -362,9 +353,9 @@ struct bt_notification_iterator *create_notif_iter_on_input_port(
        // TODO: Advance the iterator to >= the time of the latest
        //       returned notification by the muxer notification
        //       iterator which creates it.
-       notif_iter = bt_private_connection_create_notification_iterator(
-               priv_conn, NULL);
-       if (!notif_iter) {
+       conn_status = bt_private_connection_create_notification_iterator(
+               priv_conn, NULL, &notif_iter);
+       if (conn_status != BT_CONNECTION_STATUS_OK) {
                *ret = -1;
                goto end;
        }
@@ -401,7 +392,8 @@ enum bt_notification_iterator_status muxer_upstream_notif_iter_next(
                 */
                muxer_upstream_notif_iter->is_valid = false;
                break;
-       case BT_NOTIFICATION_ITERATOR_STATUS_END:
+       case BT_NOTIFICATION_ITERATOR_STATUS_END:       /* Fall-through. */
+       case BT_NOTIFICATION_ITERATOR_STATUS_CANCELED:
                /*
                 * Notification iterator reached the end: release it. It
                 * won't be considered again to find the youngest
@@ -464,7 +456,6 @@ int muxer_notif_iter_handle_newly_connected_ports(
                        &ret);
                if (ret) {
                        assert(!upstream_notif_iter);
-                       bt_put(priv_port);
                        goto error;
                }
 
@@ -472,7 +463,6 @@ int muxer_notif_iter_handle_newly_connected_ports(
                        muxer_notif_iter_add_upstream_notif_iter(
                                muxer_notif_iter, upstream_notif_iter,
                                priv_port);
-               BT_PUT(priv_port);
                BT_PUT(upstream_notif_iter);
                if (!muxer_upstream_notif_iter) {
                        goto error;
@@ -481,7 +471,6 @@ int muxer_notif_iter_handle_newly_connected_ports(
 remove_node:
                bt_put(upstream_notif_iter);
                bt_put(port);
-               bt_put(priv_port);
                muxer_notif_iter->newly_connected_priv_ports =
                        g_list_delete_link(
                                muxer_notif_iter->newly_connected_priv_ports,
@@ -569,7 +558,7 @@ int get_notif_ts_ns(struct muxer_comp *muxer_comp,
                        notif, clock_class);
                break;
        default:
-               assert(false);
+               abort();
        }
 
        if (!clock_value) {
@@ -708,6 +697,21 @@ enum bt_notification_iterator_status validate_muxer_upstream_notif_iters(
                if (status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
                        goto end;
                }
+
+               /*
+                * Remove this muxer upstream notification iterator
+                * if it's ended or canceled.
+                */
+               if (!muxer_upstream_notif_iter->notif_iter) {
+                       /*
+                        * Use g_ptr_array_remove_fast() because the
+                        * order of those elements is not important.
+                        */
+                       g_ptr_array_remove_index_fast(
+                               muxer_notif_iter->muxer_upstream_notif_iters,
+                               i);
+                       i--;
+               }
        }
 
 end:
@@ -768,7 +772,8 @@ struct bt_notification_iterator_next_return muxer_notif_iter_do_next(
                        muxer_notif_iter, &muxer_upstream_notif_iter,
                        &next_return_ts);
        if (next_return.status < 0 ||
-                       next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_END) {
+                       next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_END ||
+                       next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_CANCELED) {
                goto end;
        }
 
@@ -798,8 +803,6 @@ end:
 static
 void destroy_muxer_notif_iter(struct muxer_notif_iter *muxer_notif_iter)
 {
-       GList *node;
-
        if (!muxer_notif_iter) {
                return;
        }
@@ -809,11 +812,6 @@ void destroy_muxer_notif_iter(struct muxer_notif_iter *muxer_notif_iter)
                        muxer_notif_iter->muxer_upstream_notif_iters, TRUE);
        }
 
-       for (node = muxer_notif_iter->newly_connected_priv_ports;
-                       node; node = g_list_next(node)) {
-               bt_put(node->data);
-       }
-
        g_list_free(muxer_notif_iter->newly_connected_priv_ports);
        g_free(muxer_notif_iter);
 }
@@ -856,12 +854,12 @@ int muxer_notif_iter_init_newly_connected_ports(struct muxer_comp *muxer_comp,
                }
 
                bt_put(port);
+               bt_put(priv_port);
                muxer_notif_iter->newly_connected_priv_ports =
                        g_list_append(
                                muxer_notif_iter->newly_connected_priv_ports,
                                priv_port);
                if (!muxer_notif_iter->newly_connected_priv_ports) {
-                       bt_put(priv_port);
                        ret = -1;
                        goto end;
                }
@@ -1017,24 +1015,13 @@ void muxer_port_connected(
        struct muxer_comp *muxer_comp =
                bt_private_component_get_user_data(priv_comp);
        size_t i;
+       int ret;
 
        assert(self_port);
        assert(muxer_comp);
 
-       if (bt_port_get_type(self_port) == BT_PORT_TYPE_INPUT) {
-               int ret;
-
-               /* One less available input port */
-               muxer_comp->available_input_ports--;
-               ret = ensure_available_input_port(priv_comp);
-               if (ret) {
-                       /*
-                        * Only way to report an error later since this
-                        * method does not return anything.
-                        */
-                       muxer_comp->error = true;
-                       goto end;
-               }
+       if (bt_port_get_type(self_port) == BT_PORT_TYPE_OUTPUT) {
+               goto end;
        }
 
        for (i = 0; i < muxer_comp->muxer_notif_iters->len; i++) {
@@ -1053,15 +1040,26 @@ void muxer_port_connected(
                muxer_notif_iter->newly_connected_priv_ports =
                        g_list_append(
                                muxer_notif_iter->newly_connected_priv_ports,
-                               bt_get(self_private_port));
+                               self_private_port);
                if (!muxer_notif_iter->newly_connected_priv_ports) {
                        /* Put reference taken by bt_get() above */
-                       bt_put(self_private_port);
                        muxer_comp->error = true;
                        goto end;
                }
        }
 
+       /* One less available input port */
+       muxer_comp->available_input_ports--;
+       ret = ensure_available_input_port(priv_comp);
+       if (ret) {
+               /*
+                * Only way to report an error later since this
+                * method does not return anything.
+                */
+               muxer_comp->error = true;
+               goto end;
+       }
+
 end:
        bt_put(self_port);
 }
This page took 0.039248 seconds and 4 git commands to generate.