Implement the component graph interface
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Sat, 18 Feb 2017 22:21:33 +0000 (17:21 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Sun, 28 May 2017 16:57:38 +0000 (12:57 -0400)
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
include/babeltrace/component/component-graph-internal.h
include/babeltrace/component/component-graph.h
include/babeltrace/component/component-internal.h
include/babeltrace/component/component.h
lib/component/component-graph.c
lib/component/component.c

index ab89a0367f99aef2bcdd2f539d86a4d4d005a32b..615f4aafb5fbe8b33ecfca1ebd63c42d27af71e0 100644 (file)
@@ -4,7 +4,7 @@
 /*
  * BabelTrace - Component Graph Internal
  *
- * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ * Copyright 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
  * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
 #include <babeltrace/object-internal.h>
 #include <glib.h>
 
-struct bt_component_graph {
-       struct bt_object base;
-       /* Array of pointers to bt_component_connection. */
-       GPtrArray *connections;
-       /*
-        * Array of pointers to bt_component.
-        *
-        * Components which were added to the graph, but have not been connected
-        * yet.
-        */
-       GPtrArray *loose_components;
-       /*
-        * Array of pointers to sink bt_component.
+struct bt_graph {
+       /**
+        * A component graph contains components and point-to-point connection
+        * between these components.
         *
-        * A reference is held to the Sink components in order to implement the
-        * "run" interface, which executes the sinks in a round-robin pattern.
+        * In terms of ownership:
+        * 1) The graph is the components' parent,
+        * 2) The graph is the connnections' parent,
+        * 3) Components share the ownership of their connections,
+        * 4) A connection holds weak references to its two component endpoints.
         */
-       GPtrArray *sinks;
+       struct bt_object base;
+
+       /* Array of pointers to bt_connection. */
+       GPtrArray *connections;
+       /* Array of pointers to bt_component. */
+       GPtrArray *components;
+       /* Queue of pointers (weak references) to sink bt_components. */
+       GQueue *sinks_to_consume;
 };
 
 #endif /* BABELTRACE_COMPONENT_COMPONENT_GRAPH_INTERNAL_H */
index d87b43b9a42174edbb72ac72a09b0dc109f2a353..814218fe9817132d140dba19e52874fc5bc32f5f 100644 (file)
@@ -1,10 +1,10 @@
-#ifndef BABELTRACE_COMPONENT_COMPONENT_GRAPH_H
-#define BABELTRACE_COMPONENT_COMPONENT_GRAPH_H
+#ifndef BABELTRACE_COMPONENT_GRAPH_H
+#define BABELTRACE_COMPONENT_GRAPH_H
 
 /*
- * BabelTrace - Babeltrace Component Graph Interface
+ * BabelTrace - Babeltrace Graph Interface
  *
- * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ * Copyright 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
  * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
  * SOFTWARE.
  */
 
-#include <babeltrace/component/component-class.h>
 #include <babeltrace/component/component.h>
-#include <babeltrace/values.h>
-#include <stdio.h>
 
 #ifdef __cplusplus
 extern "C" {
 #endif
 
-enum bt_component_graph_status {
-       BT_COMPONENT_GRAPH_STATUS_OK = 0,
-};
-
-/*
-Graph Ownership:
-
-                  Graph
-                    ^
-                    |
-               Connection
-                 ^     ^
-                /       \
-         ComponentA    ComponentB
+struct bt_port;
+struct bt_connection;
 
-1) A graph only owns a set of connections.
-2) Components should _never_ own each other.
-3) A component can keep the complete graph "alive".
+enum bt_graph_status {
+       /** Downstream component does not support multiple inputs. */
+       BT_GRAPH_STATUS_END = 1,
+       BT_GRAPH_STATUS_OK = 0,
+       /** Downstream component does not support multiple inputs. */
+       BT_GRAPH_STATUS_MULTIPLE_INPUTS_UNSUPPORTED = -1,
+       /** Component is already part of another graph. */
+       BT_GRAPH_STATUS_ALREADY_IN_A_GRAPH = -2,
+       /** Invalid arguments. */
+       BT_GRAPH_STATUS_INVALID = -3,
+       /** No sink in graph. */
+       BT_GRAPH_STATUS_NO_SINK = -4,
+       /** General error. */
+       BT_GRAPH_STATUS_ERROR = -5,
+       /** No sink can consume at the moment. */
+       BT_GRAPH_STATUS_AGAIN = -6,
+};
 
-*/
-extern struct bt_component_graph *bt_component_graph_create(void);
+extern struct bt_graph *bt_graph_create(void);
 
 /**
- * Creates a connection object which owns both components, invokes the
- * components' connection callback, and add the connection to the component
- * graph's set of connection.
- *
- * Will add any component that is not already part of the graph.
+ * Creates a connection between two components using the two ports specified
+ * and adds the connection and components (if not already added) to the graph.
  */
-extern enum bt_component_graph_status bt_component_graph_connect(
-               struct bt_component_graph *graph, struct bt_component *upstream,
-               struct bt_component *downstream);
-
-/**
- * Add component to the graph
- */
-extern enum bt_component_graph_status bt_component_graph_add_component(
-               struct bt_component_graph *graph,
-               struct bt_component *component);
+extern struct bt_connection *bt_graph_connect(struct bt_graph *graph,
+               struct bt_port *upstream,
+               struct bt_port *downstream);
 
 /**
  * Add a component as a "sibling" of the origin component. Sibling share
  * connections equivalent to each other at the time of connection (same
- * parents and children).
+ * upstream and downstream ports).
  */
-extern enum bt_component_graph_status bt_component_graph_add_component_as_sibling(
-               struct bt_component_graph *graph, struct bt_component *origin,
+extern enum bt_graph_status bt_graph_add_component_as_sibling(
+               struct bt_graph *graph, struct bt_component *origin,
                struct bt_component *new_component);
 
 /**
+ * Run graph to completion or until a single sink is left and "AGAIN" is received.
+ *
  * Runs "bt_component_sink_consume()" on all sinks in round-robin until they all
- * indicate that the end is reached on that an error occured.
+ * indicate that the end is reached or that an error occured.
+ */
+extern enum bt_graph_status bt_graph_run(struct bt_graph *graph,
+               enum bt_component_status *component_status);
+
+/**
+ * Runs "bt_component_sink_consume()" on the graph's sinks. Each invokation will
+ * invoke "bt_component_sink_consume()" on the next sink, in round-robin, until
+ * they all indicated that the end is reached.
  */
-extern enum bt_component_graph_status bt_component_graph_run(
-               struct bt_component_graph *graph);
+extern enum bt_component_status bt_graph_consume(struct bt_graph *graph);
 
 #ifdef __cplusplus
 }
 #endif
 
-#endif /* BABELTRACE_COMPONENT_COMPONENT_GRAPH_H */
+#endif /* BABELTRACE_COMPONENT_GRAPH_H */
index bc876c128f547c7db6c44db783f19fd6daffaea2..3915cd836545b3a65205226a71574eb51e1c0ff2 100644 (file)
@@ -68,7 +68,7 @@ struct bt_notification_iterator *bt_component_create_iterator(
                struct bt_component *component, void *init_method_data);
 
 BT_HIDDEN
-enum bt_component_status bt_component_set_graph(struct bt_component *component,
+void bt_component_set_graph(struct bt_component *component,
                struct bt_graph *graph);
 
 BT_HIDDEN
index b6a4830c3007dd4b551316500028ff70e49cc4e5..0d3e355ac75e7c3b676f2f7eabc644f6ee9e0960 100644 (file)
@@ -27,6 +27,7 @@
  * SOFTWARE.
  */
 
+#include <babeltrace/component/component-class.h>
 #include <babeltrace/component/notification/iterator.h>
 #include <babeltrace/values.h>
 #include <stdio.h>
 extern "C" {
 #endif
 
+struct bt_component_class;
+struct bt_component_graph;
+struct bt_component;
+struct bt_value;
+
 /**
  * Status code. Errors are always negative.
  */
@@ -56,12 +62,10 @@ enum bt_component_status {
        BT_COMPONENT_STATUS_INVALID =           -3,
        /** Memory allocation failure. */
        BT_COMPONENT_STATUS_NOMEM =             -4,
+       /** Element not found. */
+       BT_COMPONENT_STATUS_NOT_FOUND =         -5,
 };
 
-struct bt_component_class;
-struct bt_component;
-struct bt_value;
-
 /**
  * Create an instance of a component from a component class.
  *
@@ -116,6 +120,8 @@ extern struct bt_component_class *bt_component_get_class(
 extern enum bt_component_class_type bt_component_get_class_type(
                struct bt_component *component);
 
+extern struct bt_graph *bt_component_get_graph(struct bt_component *component);
+
 #ifdef __cplusplus
 }
 #endif
index c058998d350bfa1145036be25c8dd290eb7fb90a..57a8cc41ca008f0461874a42d76fb2873419224b 100644 (file)
@@ -3,7 +3,7 @@
  *
  * Babeltrace Plugin Component Graph
  *
- * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ * Copyright 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
  * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
  * SOFTWARE.
  */
 
+#include <babeltrace/component/component-internal.h>
 #include <babeltrace/component/component-graph-internal.h>
+#include <babeltrace/component/component-connection-internal.h>
+#include <babeltrace/component/component-sink-internal.h>
+#include <babeltrace/component/component-source.h>
+#include <babeltrace/component/component-filter.h>
+#include <babeltrace/component/component-port.h>
 #include <babeltrace/compiler.h>
+#include <unistd.h>
 
-static void bt_component_graph_destroy(struct bt_object *obj)
+static
+void bt_graph_destroy(struct bt_object *obj)
 {
-       struct bt_component_graph *graph = container_of(obj,
-                       struct bt_component_graph, base);
+       struct bt_graph *graph = container_of(obj,
+                       struct bt_graph, base);
 
+       if (graph->components) {
+               g_ptr_array_free(graph->components, TRUE);
+       }
        if (graph->connections) {
                g_ptr_array_free(graph->connections, TRUE);
        }
-       if (graph->sinks) {
-               g_ptr_array_free(graph->sinks, TRUE);
+       if (graph->sinks_to_consume) {
+               g_queue_free(graph->sinks_to_consume);
        }
        g_free(graph);
 }
 
-struct bt_component_graph *bt_component_graph_create(void)
+struct bt_graph *bt_graph_create(void)
 {
-       struct bt_component_graph *graph;
+       struct bt_graph *graph;
 
-       graph = g_new0(struct bt_component_graph, 1);
+       graph = g_new0(struct bt_graph, 1);
        if (!graph) {
                goto end;
        }
 
-       bt_object_init(graph, bt_component_graph_destroy);
+       bt_object_init(graph, bt_graph_destroy);
 
-       graph->connections = g_ptr_array_new_with_free_func(bt_put);
+       graph->connections = g_ptr_array_new_with_free_func(bt_object_release);
        if (!graph->connections) {
                goto error;
        }
-       graph->sinks = g_ptr_array_new_with_free_func(bt_put);
-       if (!graph->sinks) {
+       graph->components = g_ptr_array_new_with_free_func(bt_object_release);
+       if (!graph->components) {
+               goto error;
+       }
+       graph->sinks_to_consume = g_queue_new();
+       if (!graph->sinks_to_consume) {
                goto error;
        }
 end:
@@ -69,30 +84,442 @@ error:
        goto end;
 }
 
-enum bt_component_graph_status bt_component_graph_connect(
-               struct bt_component_graph *graph, struct bt_component *upstream,
-               struct bt_component *downstream)
+struct bt_connection *bt_graph_connect(struct bt_graph *graph,
+               struct bt_port *upstream_port,
+               struct bt_port *downstream_port)
+{
+       struct bt_connection *connection = NULL;
+       struct bt_graph *upstream_graph = NULL;
+       struct bt_graph *downstream_graph = NULL;
+       struct bt_component *upstream_component = NULL;
+       struct bt_component *downstream_component = NULL;
+       enum bt_component_status component_status;
+       bool components_added = false;
+
+       if (!graph || !upstream_port || !downstream_port) {
+               goto end;
+       }
+
+       if (bt_port_get_type(upstream_port) != BT_PORT_TYPE_OUTPUT) {
+               goto end;
+       }
+       if (bt_port_get_type(downstream_port) != BT_PORT_TYPE_INPUT) {
+               goto end;
+       }
+
+       /* Ensure the components are not already part of another graph. */
+       upstream_component = bt_port_get_component(upstream_port);
+       assert(upstream_component);
+       upstream_graph = bt_component_get_graph(upstream_component);
+       if (upstream_graph && (graph != upstream_graph)) {
+               fprintf(stderr, "Upstream component is already part of another graph\n");
+               goto error;
+       }
+
+       downstream_component = bt_port_get_component(downstream_port);
+       assert(downstream_component);
+       downstream_graph = bt_component_get_graph(downstream_component);
+       if (downstream_graph && (graph != downstream_graph)) {
+               fprintf(stderr, "Downstream component is already part of another graph\n");
+               goto error;
+       }
+
+       connection = bt_connection_create(graph, upstream_port,
+                       downstream_port);
+       if (!connection) {
+               goto error;
+       }
+
+       /*
+        * Ownership of up/downstream_component and of the connection object is
+        * transferred to the graph.
+        */
+       g_ptr_array_add(graph->connections, connection);
+       g_ptr_array_add(graph->components, upstream_component);
+       g_ptr_array_add(graph->components, downstream_component);
+       if (bt_component_get_class_type(downstream_component) ==
+                       BT_COMPONENT_CLASS_TYPE_SINK) {
+               g_queue_push_tail(graph->sinks_to_consume,
+                               downstream_component);
+       }
+
+       /*
+        * The graph is now the parent of these components which garantees their
+        * existence for the duration of the graph's lifetime.
+        */
+       bt_component_set_graph(upstream_component, graph);
+       bt_put(upstream_component);
+       bt_component_set_graph(downstream_component, graph);
+       bt_put(downstream_component);
+
+       /* Rollback the connection from this point on. */
+       components_added = true;
+
+       /*
+        * The components and connection are added to the graph before invoking
+        * the new_connection method in order to make them visible to the
+        * components during the method's invocation.
+        */
+       component_status = bt_component_new_connection(upstream_component,
+                       upstream_port, connection);
+       if (component_status != BT_COMPONENT_STATUS_OK) {
+               goto error;
+       }
+       component_status = bt_component_new_connection(downstream_component,
+                       downstream_port, connection);
+       if (component_status != BT_COMPONENT_STATUS_OK) {
+               goto error;
+       }
+end:
+       bt_put(upstream_graph);
+       bt_put(downstream_graph);
+       return connection;
+error:
+       if (components_added) {
+               if (bt_component_get_class_type(downstream_component) ==
+                               BT_COMPONENT_CLASS_TYPE_SINK) {
+                       g_queue_pop_tail(graph->sinks_to_consume);
+               }
+               g_ptr_array_set_size(graph->connections,
+                               graph->connections->len - 1);
+               g_ptr_array_set_size(graph->components,
+                               graph->components->len - 2);
+       }
+       goto end;
+}
+
+static
+int get_component_port_counts(struct bt_component *component, int *input_count,
+               int *output_count)
+{
+       int ret = -1;
+
+       switch (bt_component_get_class_type(component)) {
+       case BT_COMPONENT_CLASS_TYPE_SOURCE:
+               ret = bt_component_source_get_output_port_count(component);
+               if (ret < 0) {
+                       goto end;
+               }
+               *output_count = ret;
+               break;
+       case BT_COMPONENT_CLASS_TYPE_FILTER:
+               ret = bt_component_filter_get_output_port_count(component);
+               if (ret < 0) {
+                       goto end;
+               }
+               *output_count = ret;
+               break;
+               ret = bt_component_filter_get_input_port_count(component);
+               if (ret < 0) {
+                       goto end;
+               }
+               *input_count = ret;
+               break;
+       case BT_COMPONENT_CLASS_TYPE_SINK:
+               ret = bt_component_sink_get_input_port_count(component);
+               if (ret < 0) {
+                       goto end;
+               }
+               *input_count = ret;
+               break;
+       default:
+               assert(false);
+               break;
+       }
+       ret = 0;
+end:
+       return ret;
+}
+
+static
+struct bt_port *get_input_port(struct bt_component *component, int index)
 {
-       return BT_COMPONENT_GRAPH_STATUS_OK;
+       struct bt_port *port = NULL;
+
+        switch (bt_component_get_class_type(component)) {
+       case BT_COMPONENT_CLASS_TYPE_FILTER:
+               port = bt_component_filter_get_input_port_at_index(component,
+                               index);
+               break;
+       case BT_COMPONENT_CLASS_TYPE_SINK:
+               port = bt_component_sink_get_input_port_at_index(component,
+                               index);
+               break;
+       default:
+               assert(false);
+       }
+       return port;
 }
 
-enum bt_component_graph_status bt_component_graph_add_component(
-               struct bt_component_graph *graph,
-               struct bt_component *component)
+static
+struct bt_port *get_output_port(struct bt_component *component, int index)
 {
-       return BT_COMPONENT_GRAPH_STATUS_OK;
+       struct bt_port *port = NULL;
+
+        switch (bt_component_get_class_type(component)) {
+       case BT_COMPONENT_CLASS_TYPE_SOURCE:
+               port = bt_component_source_get_output_port_at_index(component,
+                               index);
+               break;
+       case BT_COMPONENT_CLASS_TYPE_FILTER:
+               port = bt_component_filter_get_output_port_at_index(component,
+                               index);
+               break;
+       default:
+               assert(false);
+       }
+       return port;
 }
 
-enum bt_component_graph_status bt_component_graph_add_component_as_sibling(
-               struct bt_component_graph *graph, struct bt_component *origin,
+enum bt_graph_status bt_graph_add_component_as_sibling(struct bt_graph *graph,
+               struct bt_component *origin,
                struct bt_component *new_component)
 {
-       return BT_COMPONENT_GRAPH_STATUS_OK;
+       int origin_input_port_count = 0;
+       int origin_output_port_count = 0;
+       int new_input_port_count = 0;
+       int new_output_port_count = 0;
+       enum bt_graph_status status = BT_GRAPH_STATUS_OK;
+       struct bt_graph *origin_graph = NULL;
+       struct bt_graph *new_graph = NULL;
+       struct bt_port *origin_port = NULL;
+       struct bt_port *new_port = NULL;
+       struct bt_port *upstream_port = NULL;
+       struct bt_port *downstream_port = NULL;
+       struct bt_connection *origin_connection = NULL;
+       struct bt_connection *new_connection = NULL;
+        int port_index;
+
+       if (!graph || !origin || !new_component) {
+               status = BT_GRAPH_STATUS_INVALID;
+               goto end;
+       }
+
+       if (bt_component_get_class_type(origin) !=
+                       bt_component_get_class_type(new_component)) {
+               status = BT_GRAPH_STATUS_INVALID;
+               goto end;
+       }
+
+        origin_graph = bt_component_get_graph(origin);
+       if (!origin_graph || (origin_graph != graph)) {
+               status = BT_GRAPH_STATUS_INVALID;
+               goto end;
+       }
+
+        new_graph = bt_component_get_graph(new_component);
+       if (new_graph) {
+               status = BT_GRAPH_STATUS_ALREADY_IN_A_GRAPH;
+               goto end;
+       }
+
+       if (get_component_port_counts(origin, &origin_input_port_count,
+                       &origin_output_port_count)) {
+               status = BT_GRAPH_STATUS_INVALID;
+               goto end;
+       }
+       if (get_component_port_counts(new_component, &new_input_port_count,
+                       &new_output_port_count)) {
+               status = BT_GRAPH_STATUS_INVALID;
+               goto end;
+       }
+
+       if (origin_input_port_count != new_input_port_count ||
+                       origin_output_port_count != new_output_port_count) {
+               status = BT_GRAPH_STATUS_INVALID;
+               goto end;
+       }
+
+       /* Replicate input connections. */
+       for (port_index = 0; port_index< origin_input_port_count; port_index++) {
+               uint64_t connection_count, connection_index;
+
+               origin_port = get_input_port(origin, port_index);
+               if (!origin_port) {
+                       status = BT_GRAPH_STATUS_ERROR;
+                       goto error_disconnect;
+               }
+               new_port = get_input_port(new_component, port_index);
+               if (!new_port) {
+                       status = BT_GRAPH_STATUS_ERROR;
+                       goto error_disconnect;
+               }
+
+               if (bt_port_get_connection_count(origin_port, &connection_count) !=
+                               BT_PORT_STATUS_OK) {
+                       status = BT_GRAPH_STATUS_ERROR;
+                       goto error_disconnect;
+               }
+
+               for (connection_index = 0; connection_index < connection_count;
+                               connection_index++) {
+                       origin_connection = bt_port_get_connection(origin_port,
+                                       connection_index);
+                       if (!origin_connection) {
+                               goto error_disconnect;
+                       }
+
+                       upstream_port = bt_connection_get_output_port(
+                                       origin_connection);
+                       if (!upstream_port) {
+                               goto error_disconnect;
+                       }
+
+                       new_connection = bt_graph_connect(graph, upstream_port,
+                                       new_port);
+                       if (!new_connection) {
+                               goto error_disconnect;
+                       }
+
+                       BT_PUT(upstream_port);
+                       BT_PUT(origin_connection);
+                       BT_PUT(new_connection);
+               }
+               BT_PUT(origin_port);
+               BT_PUT(new_port);
+       }
+
+       /* Replicate output connections. */
+       for (port_index = 0; port_index< origin_output_port_count; port_index++) {
+               uint64_t connection_count, connection_index;
+
+               origin_port = get_output_port(origin, port_index);
+               if (!origin_port) {
+                       status = BT_GRAPH_STATUS_ERROR;
+                       goto error_disconnect;
+               }
+               new_port = get_output_port(new_component, port_index);
+               if (!new_port) {
+                       status = BT_GRAPH_STATUS_ERROR;
+                       goto error_disconnect;
+               }
+
+               if (bt_port_get_connection_count(origin_port, &connection_count) !=
+                               BT_PORT_STATUS_OK) {
+                       status = BT_GRAPH_STATUS_ERROR;
+                       goto error_disconnect;
+               }
+
+               for (connection_index = 0; connection_index < connection_count;
+                               connection_index++) {
+                       origin_connection = bt_port_get_connection(origin_port,
+                                       connection_index);
+                       if (!origin_connection) {
+                               goto error_disconnect;
+                       }
+
+                       downstream_port = bt_connection_get_input_port(
+                                       origin_connection);
+                       if (!downstream_port) {
+                               goto error_disconnect;
+                       }
+
+                       new_connection = bt_graph_connect(graph, new_port,
+                                       downstream_port);
+                       if (!new_connection) {
+                               goto error_disconnect;
+                       }
+
+                       BT_PUT(downstream_port);
+                       BT_PUT(origin_connection);
+                       BT_PUT(new_connection);
+               }
+               BT_PUT(origin_port);
+               BT_PUT(new_port);
+       }
+end:
+       bt_put(origin_graph);
+       bt_put(new_graph);
+       bt_put(origin_port);
+       bt_put(new_port);
+       bt_put(upstream_port);
+       bt_put(downstream_port);
+       bt_put(origin_connection);
+       bt_put(new_connection);
+       return status;
+error_disconnect:
+       /* Destroy all connections of the new component. */
+       /* FIXME. */
+       goto end;
 }
 
-enum bt_component_graph_status bt_component_graph_run(
-               struct bt_component_graph *graph)
+enum bt_component_status bt_graph_consume(struct bt_graph *graph)
 {
-       return BT_COMPONENT_GRAPH_STATUS_OK;
+       struct bt_component *sink;
+       enum bt_component_status status;
+       GList *current_node;
+
+       if (!graph) {
+               status = BT_COMPONENT_STATUS_INVALID;
+               goto end;
+       }
+
+       if (g_queue_is_empty(graph->sinks_to_consume)) {
+               status = BT_COMPONENT_STATUS_END;
+               goto end;
+       }
+
+       current_node = g_queue_pop_head_link(graph->sinks_to_consume);
+       sink = current_node->data;
+       status = bt_component_sink_consume(sink);
+       if (status != BT_COMPONENT_STATUS_END) {
+               g_queue_push_tail_link(graph->sinks_to_consume, current_node);
+               goto end;
+       }
+
+       /* End reached, the node is not added back to the queue and free'd. */
+       g_queue_delete_link(graph->sinks_to_consume, current_node);
+
+       /* Don't forward an END status if there are sinks left to consume. */
+       if (!g_queue_is_empty(graph->sinks_to_consume)) {
+               status = BT_GRAPH_STATUS_OK;
+               goto end;
+       }
+end:
+       return status;
 }
 
+enum bt_graph_status bt_graph_run(struct bt_graph *graph,
+               enum bt_component_status *_component_status)
+{
+       enum bt_component_status component_status;
+       enum bt_graph_status graph_status = BT_GRAPH_STATUS_OK;
+
+       if (!graph) {
+               graph_status = BT_GRAPH_STATUS_INVALID;
+               goto error;
+       }
+
+       do {
+               component_status = bt_graph_consume(graph);
+               if (component_status == BT_COMPONENT_STATUS_AGAIN) {
+                       /*
+                        * If AGAIN is received and there are multiple sinks,
+                        * go ahead and consume from the next sink.
+                        *
+                        * However, in the case where a single sink is left,
+                        * the caller can decide to busy-wait and call
+                        * bt_graph_run continuously until the source is ready
+                        * or it can decide to sleep for an arbitrary amount of
+                        * time.
+                        */
+                       if (graph->sinks_to_consume->length > 1) {
+                               component_status = BT_COMPONENT_STATUS_OK;
+                       }
+               }
+       } while (component_status == BT_COMPONENT_STATUS_OK);
+
+       if (_component_status) {
+               *_component_status = component_status;
+       }
+
+       if (g_queue_is_empty(graph->sinks_to_consume)) {
+               graph_status = BT_GRAPH_STATUS_END;
+       } else if (component_status == BT_COMPONENT_STATUS_AGAIN) {
+               graph_status = BT_GRAPH_STATUS_AGAIN;
+       } else {
+               graph_status = BT_GRAPH_STATUS_ERROR;
+       }
+error:
+       return graph_status;
+}
index 31aad81be46bb3f60bde84d3a04cdd0864d817be..8056db14eb1feced347ed804f672ba5ffe65b92c 100644 (file)
@@ -95,6 +95,7 @@ enum bt_component_status bt_component_init(struct bt_component *component,
                goto end;
        }
 
+       component->initializing = true;
        component->destroy = destroy;
 end:
        return ret;
@@ -210,8 +211,6 @@ struct bt_component *bt_component_create_with_init_method_data(
                goto end;
        }
 
-       component->initializing = true;
-
        if (component_class->methods.init) {
                ret = component_class->methods.init(component, params,
                        init_method_data);
@@ -298,11 +297,16 @@ end:
 }
 
 BT_HIDDEN
-enum bt_component_status bt_component_set_graph(struct bt_component *component,
+void bt_component_set_graph(struct bt_component *component,
                struct bt_graph *graph)
 {
-       bt_object_set_parent(component, &graph->base);
-       return BT_COMPONENT_STATUS_OK;
+       struct bt_object *parent = bt_object_get_parent(&component->base);
+
+       assert(!parent || parent == &graph->base);
+       if (!parent) {
+               bt_object_set_parent(component, &graph->base);
+       }
+       bt_put(parent);
 }
 
 struct bt_graph *bt_component_get_graph(
@@ -319,19 +323,18 @@ int bt_component_init_input_ports(struct bt_component *component,
        struct bt_port *default_port;
 
        *input_ports = g_ptr_array_new_with_free_func(bt_object_release);
-       if (*input_ports) {
+       if (!*input_ports) {
                ret = -1;
                goto end;
        }
 
-       default_port = bt_port_create(component, BT_PORT_TYPE_INPUT,
-                       DEFAULT_INPUT_PORT_NAME);
+       default_port = bt_component_add_port(component, *input_ports,
+                       BT_PORT_TYPE_INPUT, DEFAULT_INPUT_PORT_NAME);
        if (!default_port) {
                ret = -1;
                goto end;
        }
-
-       g_ptr_array_add(*input_ports, default_port);
+       bt_put(default_port);
 end:
        return ret;
 }
@@ -344,19 +347,18 @@ int bt_component_init_output_ports(struct bt_component *component,
        struct bt_port *default_port;
 
        *output_ports = g_ptr_array_new_with_free_func(bt_object_release);
-       if (*output_ports) {
+       if (!*output_ports) {
                ret = -1;
                goto end;
        }
 
-       default_port = bt_port_create(component, BT_PORT_TYPE_OUTPUT,
-                       DEFAULT_OUTPUT_PORT_NAME);
+       default_port = bt_component_add_port(component, *output_ports,
+                       BT_PORT_TYPE_OUTPUT, DEFAULT_OUTPUT_PORT_NAME);
        if (!default_port) {
                ret = -1;
                goto end;
        }
-
-       g_ptr_array_add(*output_ports, default_port);
+       bt_put(default_port);
 end:
        return ret;
 }
@@ -404,15 +406,9 @@ struct bt_port *bt_component_add_port(
                enum bt_port_type port_type, const char *name)
 {
        size_t i;
-       struct bt_port *new_port;
+       struct bt_port *new_port = NULL;
 
        if (!component->initializing || !name || *name == '\0') {
-               new_port = NULL;
-               goto end;
-       }
-
-       new_port = bt_port_create(component, port_type, name);
-       if (!new_port) {
                goto end;
        }
 
@@ -429,17 +425,24 @@ struct bt_port *bt_component_add_port(
 
                if (!strcmp(name, port_name)) {
                        /* Port name clash, abort. */
-                       goto error;
+                       goto end;
                }
        }
 
-       /* No name clash, add the port. */
-       g_ptr_array_add(ports, bt_get(new_port));
+       new_port = bt_port_create(component, port_type, name);
+       if (!new_port) {
+               goto end;
+       }
+
+       /*
+        * No name clash, add the port.
+        * The component is now the port's parent; it should _not_
+        * hold a reference to the port since the port's lifetime
+        * is now protected by the component's own lifetime.
+        */
+       g_ptr_array_add(ports, new_port);
 end:
        return new_port;
-error:
-       BT_PUT(new_port);
-       goto end;
 }
 
 BT_HIDDEN
@@ -473,3 +476,18 @@ enum bt_component_status bt_component_remove_port(
 end:
        return status;
 }
+
+BT_HIDDEN
+enum bt_component_status bt_component_new_connection(
+               struct bt_component *component, struct bt_port *own_port,
+               struct bt_connection *connection)
+{
+       enum bt_component_status status = BT_COMPONENT_STATUS_OK;
+
+       if (component->class->methods.new_connection_method) {
+               status = component->class->methods.new_connection_method(
+                               own_port, connection);
+       }
+
+       return status;
+}
This page took 0.037795 seconds and 4 git commands to generate.