Allow a component to remove a port and any user to disconnect one
[babeltrace.git] / lib / component / graph.c
index 51f3f957de65f0b7dd32776cc714c7bd1256b5cb..8b2bed4488ad6e29d5463d8d57c5b2286b7ba2f4 100644 (file)
@@ -93,6 +93,7 @@ struct bt_connection *bt_graph_connect(struct bt_graph *graph,
        struct bt_graph *downstream_graph = NULL;
        struct bt_component *upstream_component = NULL;
        struct bt_component *downstream_component = NULL;
+       struct bt_connection *existing_conn = NULL;
        enum bt_component_status component_status;
        bool upstream_was_already_in_graph;
        bool downstream_was_already_in_graph;
@@ -103,6 +104,7 @@ struct bt_connection *bt_graph_connect(struct bt_graph *graph,
                goto end;
        }
 
+       /* Ensure appropriate types for upstream and downstream ports. */
        if (bt_port_get_type(upstream_port) != BT_PORT_TYPE_OUTPUT) {
                goto end;
        }
@@ -110,18 +112,44 @@ struct bt_connection *bt_graph_connect(struct bt_graph *graph,
                goto end;
        }
 
-       /* Ensure the components are not already part of another graph. */
+       /* Ensure that both ports are currently unconnected. */
+       existing_conn = bt_port_get_connection(upstream_port);
+       bt_put(existing_conn);
+       if (existing_conn) {
+               fprintf(stderr, "Upstream port is already connected\n");
+               goto end;
+       }
+
+       existing_conn = bt_port_get_connection(downstream_port);
+       bt_put(existing_conn);
+       if (existing_conn) {
+               fprintf(stderr, "Downstream port is already connected\n");
+               goto end;
+       }
+
+       /*
+        * Ensure that both ports are still attached to their creating
+        * component.
+        */
        upstream_component = bt_port_get_component(upstream_port);
-       assert(upstream_component);
+       if (!upstream_component) {
+               fprintf(stderr, "Upstream port does not belong to a component\n");
+               goto end;
+       }
+
+       downstream_component = bt_port_get_component(downstream_port);
+       if (!downstream_component) {
+               fprintf(stderr, "Downstream port does not belong to a component\n");
+               goto end;
+       }
+
+       /* Ensure the components are not already part of another graph. */
        upstream_graph = bt_component_get_graph(upstream_component);
        if (upstream_graph && (graph != upstream_graph)) {
                fprintf(stderr, "Upstream component is already part of another graph\n");
                goto error;
        }
        upstream_was_already_in_graph = (graph == upstream_graph);
-
-       downstream_component = bt_port_get_component(downstream_port);
-       assert(downstream_component);
        downstream_graph = bt_component_get_graph(downstream_component);
        if (downstream_graph && (graph != downstream_graph)) {
                fprintf(stderr, "Downstream component is already part of another graph\n");
@@ -136,8 +164,8 @@ struct bt_connection *bt_graph_connect(struct bt_graph *graph,
        }
 
        /*
-        * Ownership of up/downstream_component and of the connection object is
-        * transferred to the graph.
+        * Ownership of upstream_component/downstream_component and of
+        * the connection object is transferred to the graph.
         */
        g_ptr_array_add(graph->connections, connection);
 
@@ -161,17 +189,18 @@ struct bt_connection *bt_graph_connect(struct bt_graph *graph,
         */
 
        /*
-        * The components and connection are added to the graph before invoking
-        * the new_connection method in order to make them visible to the
-        * components during the method's invocation.
+        * The components and connection are added to the graph before
+        * invoking the `accept_port_connection` method in order to make
+        * them visible to the components during the method's
+        * invocation.
         */
-       component_status = bt_component_new_connection(upstream_component,
-                       upstream_port, connection);
+       component_status = bt_component_accept_port_connection(
+               upstream_component, upstream_port);
        if (component_status != BT_COMPONENT_STATUS_OK) {
                goto error_rollback;
        }
-       component_status = bt_component_new_connection(downstream_component,
-                       downstream_port, connection);
+       component_status = bt_component_accept_port_connection(
+               downstream_component, downstream_port);
        if (component_status != BT_COMPONENT_STATUS_OK) {
                goto error_rollback;
        }
@@ -292,46 +321,6 @@ end:
        return ret;
 }
 
-static
-struct bt_port *get_input_port(struct bt_component *component, int index)
-{
-       struct bt_port *port = NULL;
-
-        switch (bt_component_get_class_type(component)) {
-       case BT_COMPONENT_CLASS_TYPE_FILTER:
-               port = bt_component_filter_get_input_port_at_index(component,
-                               index);
-               break;
-       case BT_COMPONENT_CLASS_TYPE_SINK:
-               port = bt_component_sink_get_input_port_at_index(component,
-                               index);
-               break;
-       default:
-               assert(false);
-       }
-       return port;
-}
-
-static
-struct bt_port *get_output_port(struct bt_component *component, int index)
-{
-       struct bt_port *port = NULL;
-
-        switch (bt_component_get_class_type(component)) {
-       case BT_COMPONENT_CLASS_TYPE_SOURCE:
-               port = bt_component_source_get_output_port_at_index(component,
-                               index);
-               break;
-       case BT_COMPONENT_CLASS_TYPE_FILTER:
-               port = bt_component_filter_get_output_port_at_index(component,
-                               index);
-               break;
-       default:
-               assert(false);
-       }
-       return port;
-}
-
 enum bt_graph_status bt_graph_add_component_as_sibling(struct bt_graph *graph,
                struct bt_component *origin,
                struct bt_component *new_component)
@@ -393,35 +382,24 @@ enum bt_graph_status bt_graph_add_component_as_sibling(struct bt_graph *graph,
 
        /* Replicate input connections. */
        for (port_index = 0; port_index< origin_input_port_count; port_index++) {
-               uint64_t connection_count, connection_index;
-
-               origin_port = get_input_port(origin, port_index);
+               origin_port = bt_component_get_input_port_at_index(origin,
+                       port_index);
                if (!origin_port) {
                        status = BT_GRAPH_STATUS_ERROR;
                        goto error_disconnect;
                }
-               new_port = get_input_port(new_component, port_index);
-               if (!new_port) {
-                       status = BT_GRAPH_STATUS_ERROR;
-                       goto error_disconnect;
-               }
 
-               if (bt_port_get_connection_count(origin_port, &connection_count) !=
-                               BT_PORT_STATUS_OK) {
+               new_port = bt_component_get_input_port_at_index(new_component,
+                       port_index);
+               if (!new_port) {
                        status = BT_GRAPH_STATUS_ERROR;
                        goto error_disconnect;
                }
 
-               for (connection_index = 0; connection_index < connection_count;
-                               connection_index++) {
-                       origin_connection = bt_port_get_connection(origin_port,
-                                       connection_index);
-                       if (!origin_connection) {
-                               goto error_disconnect;
-                       }
-
-                       upstream_port = bt_connection_get_output_port(
-                                       origin_connection);
+               origin_connection = bt_port_get_connection(origin_port);
+               if (origin_connection) {
+                       upstream_port = bt_connection_get_upstream_port(
+                               origin_connection);
                        if (!upstream_port) {
                                goto error_disconnect;
                        }
@@ -431,45 +409,33 @@ enum bt_graph_status bt_graph_add_component_as_sibling(struct bt_graph *graph,
                        if (!new_connection) {
                                goto error_disconnect;
                        }
-
-                       BT_PUT(upstream_port);
-                       BT_PUT(origin_connection);
-                       BT_PUT(new_connection);
                }
+
+               BT_PUT(upstream_port);
+               BT_PUT(origin_connection);
+               BT_PUT(new_connection);
                BT_PUT(origin_port);
                BT_PUT(new_port);
        }
 
        /* Replicate output connections. */
-       for (port_index = 0; port_index< origin_output_port_count; port_index++) {
-               uint64_t connection_count, connection_index;
-
-               origin_port = get_output_port(origin, port_index);
+       for (port_index = 0; port_index < origin_output_port_count; port_index++) {
+               origin_port = bt_component_get_output_port_at_index(origin,
+                       port_index);
                if (!origin_port) {
                        status = BT_GRAPH_STATUS_ERROR;
                        goto error_disconnect;
                }
-               new_port = get_output_port(new_component, port_index);
+               new_port = bt_component_get_output_port_at_index(new_component,
+                       port_index);
                if (!new_port) {
                        status = BT_GRAPH_STATUS_ERROR;
                        goto error_disconnect;
                }
 
-               if (bt_port_get_connection_count(origin_port, &connection_count) !=
-                               BT_PORT_STATUS_OK) {
-                       status = BT_GRAPH_STATUS_ERROR;
-                       goto error_disconnect;
-               }
-
-               for (connection_index = 0; connection_index < connection_count;
-                               connection_index++) {
-                       origin_connection = bt_port_get_connection(origin_port,
-                                       connection_index);
-                       if (!origin_connection) {
-                               goto error_disconnect;
-                       }
-
-                       downstream_port = bt_connection_get_input_port(
+               origin_connection = bt_port_get_connection(origin_port);
+               if (origin_connection) {
+                       downstream_port = bt_connection_get_downstream_port(
                                        origin_connection);
                        if (!downstream_port) {
                                goto error_disconnect;
@@ -480,11 +446,11 @@ enum bt_graph_status bt_graph_add_component_as_sibling(struct bt_graph *graph,
                        if (!new_connection) {
                                goto error_disconnect;
                        }
-
-                       BT_PUT(downstream_port);
-                       BT_PUT(origin_connection);
-                       BT_PUT(new_connection);
                }
+
+               BT_PUT(downstream_port);
+               BT_PUT(origin_connection);
+               BT_PUT(new_connection);
                BT_PUT(origin_port);
                BT_PUT(new_port);
        }
@@ -504,26 +470,44 @@ error_disconnect:
        goto end;
 }
 
-enum bt_component_status bt_graph_consume(struct bt_graph *graph)
+enum bt_graph_status bt_graph_consume(struct bt_graph *graph)
 {
        struct bt_component *sink;
-       enum bt_component_status status;
+       enum bt_graph_status status = BT_GRAPH_STATUS_OK;
+       enum bt_component_status comp_status;
        GList *current_node;
 
        if (!graph) {
-               status = BT_COMPONENT_STATUS_INVALID;
+               status = BT_GRAPH_STATUS_INVALID;
                goto end;
        }
 
        if (g_queue_is_empty(graph->sinks_to_consume)) {
-               status = BT_COMPONENT_STATUS_END;
+               status = BT_GRAPH_STATUS_END;
                goto end;
        }
 
        current_node = g_queue_pop_head_link(graph->sinks_to_consume);
        sink = current_node->data;
-       status = bt_component_sink_consume(sink);
-       if (status != BT_COMPONENT_STATUS_END) {
+       comp_status = bt_component_sink_consume(sink);
+       switch (comp_status) {
+       case BT_COMPONENT_STATUS_OK:
+               break;
+       case BT_COMPONENT_STATUS_END:
+               status = BT_GRAPH_STATUS_END;
+               break;
+       case BT_COMPONENT_STATUS_AGAIN:
+               status = BT_GRAPH_STATUS_AGAIN;
+               break;
+       case BT_COMPONENT_STATUS_INVALID:
+               status = BT_GRAPH_STATUS_INVALID;
+               break;
+       default:
+               status = BT_GRAPH_STATUS_ERROR;
+               break;
+       }
+
+       if (status != BT_GRAPH_STATUS_END) {
                g_queue_push_tail_link(graph->sinks_to_consume, current_node);
                goto end;
        }
@@ -540,20 +524,18 @@ end:
        return status;
 }
 
-enum bt_graph_status bt_graph_run(struct bt_graph *graph,
-               enum bt_component_status *_component_status)
+enum bt_graph_status bt_graph_run(struct bt_graph *graph)
 {
-       enum bt_component_status component_status;
-       enum bt_graph_status graph_status = BT_GRAPH_STATUS_OK;
+       enum bt_graph_status status = BT_GRAPH_STATUS_OK;
 
        if (!graph) {
-               graph_status = BT_GRAPH_STATUS_INVALID;
+               status = BT_GRAPH_STATUS_INVALID;
                goto error;
        }
 
        do {
-               component_status = bt_graph_consume(graph);
-               if (component_status == BT_COMPONENT_STATUS_AGAIN) {
+               status = bt_graph_consume(graph);
+               if (status == BT_GRAPH_STATUS_AGAIN) {
                        /*
                         * If AGAIN is received and there are multiple sinks,
                         * go ahead and consume from the next sink.
@@ -565,22 +547,14 @@ enum bt_graph_status bt_graph_run(struct bt_graph *graph,
                         * time.
                         */
                        if (graph->sinks_to_consume->length > 1) {
-                               component_status = BT_COMPONENT_STATUS_OK;
+                               status = BT_GRAPH_STATUS_OK;
                        }
                }
-       } while (component_status == BT_COMPONENT_STATUS_OK);
-
-       if (_component_status) {
-               *_component_status = component_status;
-       }
+       } while (status == BT_GRAPH_STATUS_OK);
 
        if (g_queue_is_empty(graph->sinks_to_consume)) {
-               graph_status = BT_GRAPH_STATUS_END;
-       } else if (component_status == BT_COMPONENT_STATUS_AGAIN) {
-               graph_status = BT_GRAPH_STATUS_AGAIN;
-       } else {
-               graph_status = BT_GRAPH_STATUS_ERROR;
+               status = BT_GRAPH_STATUS_END;
        }
 error:
-       return graph_status;
+       return status;
 }
This page took 0.028396 seconds and 4 git commands to generate.