struct bt_graph *downstream_graph = NULL;
struct bt_component *upstream_component = NULL;
struct bt_component *downstream_component = NULL;
+ struct bt_connection *existing_conn = NULL;
enum bt_component_status component_status;
bool upstream_was_already_in_graph;
bool downstream_was_already_in_graph;
+ int components_to_remove = 0;
+ int i;
if (!graph || !upstream_port || !downstream_port) {
goto end;
}
+ /* Ensure appropriate types for upstream and downstream ports. */
if (bt_port_get_type(upstream_port) != BT_PORT_TYPE_OUTPUT) {
goto end;
}
goto end;
}
- /* Ensure the components are not already part of another graph. */
+ /* Ensure that both ports are currently unconnected. */
+ existing_conn = bt_port_get_connection(upstream_port);
+ bt_put(existing_conn);
+ if (existing_conn) {
+ fprintf(stderr, "Upstream port is already connected\n");
+ goto end;
+ }
+
+ existing_conn = bt_port_get_connection(downstream_port);
+ bt_put(existing_conn);
+ if (existing_conn) {
+ fprintf(stderr, "Downstream port is already connected\n");
+ goto end;
+ }
+
+ /*
+ * Ensure that both ports are still attached to their creating
+ * component.
+ */
upstream_component = bt_port_get_component(upstream_port);
- assert(upstream_component);
+ if (!upstream_component) {
+ fprintf(stderr, "Upstream port does not belong to a component\n");
+ goto end;
+ }
+
+ downstream_component = bt_port_get_component(downstream_port);
+ if (!downstream_component) {
+ fprintf(stderr, "Downstream port does not belong to a component\n");
+ goto end;
+ }
+
+ /* Ensure the components are not already part of another graph. */
upstream_graph = bt_component_get_graph(upstream_component);
if (upstream_graph && (graph != upstream_graph)) {
fprintf(stderr, "Upstream component is already part of another graph\n");
goto error;
}
upstream_was_already_in_graph = (graph == upstream_graph);
-
- downstream_component = bt_port_get_component(downstream_port);
- assert(downstream_component);
downstream_graph = bt_component_get_graph(downstream_component);
if (downstream_graph && (graph != downstream_graph)) {
fprintf(stderr, "Downstream component is already part of another graph\n");
}
/*
- * Ownership of up/downstream_component and of the connection object is
- * transferred to the graph.
+ * Ownership of upstream_component/downstream_component and of
+ * the connection object is transferred to the graph.
*/
g_ptr_array_add(graph->connections, connection);
*/
/*
- * The components and connection are added to the graph before invoking
- * the new_connection method in order to make them visible to the
- * components during the method's invocation.
+ * The components and connection are added to the graph before
+ * invoking the `accept_port_connection` method in order to make
+ * them visible to the components during the method's
+ * invocation.
*/
- component_status = bt_component_new_connection(upstream_component,
- upstream_port, connection);
+ component_status = bt_component_accept_port_connection(
+ upstream_component, upstream_port);
if (component_status != BT_COMPONENT_STATUS_OK) {
- goto error;
+ goto error_rollback;
}
- component_status = bt_component_new_connection(downstream_component,
- downstream_port, connection);
+ component_status = bt_component_accept_port_connection(
+ downstream_component, downstream_port);
if (component_status != BT_COMPONENT_STATUS_OK) {
- goto error;
+ goto error_rollback;
}
end:
bt_put(upstream_graph);
bt_put(downstream_graph);
+ bt_put(upstream_component);
+ bt_put(downstream_component);
return connection;
-error:
- if (components_added) {
+error_rollback:
+ /*
+ * Remove newly-added components from the graph, being careful
+ * not to remove a component that was already present in the graph
+ * and is connected to other components.
+ */
+ components_to_remove += upstream_was_already_in_graph ? 0 : 1;
+ components_to_remove += downstream_was_already_in_graph ? 0 : 1;
+
+ if (!downstream_was_already_in_graph) {
if (bt_component_get_class_type(downstream_component) ==
BT_COMPONENT_CLASS_TYPE_SINK) {
g_queue_pop_tail(graph->sinks_to_consume);
}
- g_ptr_array_set_size(graph->connections,
- graph->connections->len - 1);
+ }
+ /* Remove newly created connection. */
+ g_ptr_array_set_size(graph->connections,
+ graph->connections->len - 1);
+
+ /*
+ * Remove newly added components.
+ *
+ * Note that this is a tricky situation. The graph, being the parent
+ * of the components, does not hold a reference to them. Normally,
+ * components are destroyed right away when the graph is released since
+ * the graph, being their parent, bounds their lifetime
+ * (see doc/ref-counting.md).
+ *
+ * In this particular case, we must take a number of steps:
+ * 1) unset the components' parent to rollback the initial state of
+ * the components being connected.
+ * Note that the reference taken by the component on its graph is
+ * released by the set_parent call.
+ * 2) set the pointer in the components array to NULL so that the
+ * destruction function called on the array's resize in invoked on
+ * NULL (no effect),
+ *
+ * NOTE: Point #1 assumes that *something* holds a reference to both
+ * components being connected. The fact that a reference is being
+ * held to a component means that it must hold a reference to its
+ * parent to prevent the parent from being destroyed (again, refer
+ * to doc/red-counting.md). This reference to a component is
+ * most likely being held *transitively* by the caller which holds
+ * a reference to both ports (a port has its component as a
+ * parent).
+ *
+ * This assumes that a graph is not connecting components by
+ * itself while not holding a reference to the ports/components
+ * being connected (i.e. "cheating" by using internal APIs).
+ */
+ for (i = 0; i < components_to_remove; i++) {
+ struct bt_component *component = g_ptr_array_index(
+ graph->components, graph->components->len - 1);
+
+ bt_component_set_graph(component, NULL);
+ g_ptr_array_index(graph->components,
+ graph->components->len - 1) = NULL;
g_ptr_array_set_size(graph->components,
- graph->components->len - 2);
+ graph->components->len - 1);
}
+ /* NOTE: Resizing the ptr_arrays invokes the destruction of the elements. */
+ goto end;
+error:
+ BT_PUT(upstream_component);
+ BT_PUT(downstream_component);
goto end;
}
return ret;
}
-static
-struct bt_port *get_input_port(struct bt_component *component, int index)
-{
- struct bt_port *port = NULL;
-
- switch (bt_component_get_class_type(component)) {
- case BT_COMPONENT_CLASS_TYPE_FILTER:
- port = bt_component_filter_get_input_port_at_index(component,
- index);
- break;
- case BT_COMPONENT_CLASS_TYPE_SINK:
- port = bt_component_sink_get_input_port_at_index(component,
- index);
- break;
- default:
- assert(false);
- }
- return port;
-}
-
-static
-struct bt_port *get_output_port(struct bt_component *component, int index)
-{
- struct bt_port *port = NULL;
-
- switch (bt_component_get_class_type(component)) {
- case BT_COMPONENT_CLASS_TYPE_SOURCE:
- port = bt_component_source_get_output_port_at_index(component,
- index);
- break;
- case BT_COMPONENT_CLASS_TYPE_FILTER:
- port = bt_component_filter_get_output_port_at_index(component,
- index);
- break;
- default:
- assert(false);
- }
- return port;
-}
-
enum bt_graph_status bt_graph_add_component_as_sibling(struct bt_graph *graph,
struct bt_component *origin,
struct bt_component *new_component)
/* Replicate input connections. */
for (port_index = 0; port_index< origin_input_port_count; port_index++) {
- uint64_t connection_count, connection_index;
-
- origin_port = get_input_port(origin, port_index);
+ origin_port = bt_component_get_input_port_at_index(origin,
+ port_index);
if (!origin_port) {
status = BT_GRAPH_STATUS_ERROR;
goto error_disconnect;
}
- new_port = get_input_port(new_component, port_index);
- if (!new_port) {
- status = BT_GRAPH_STATUS_ERROR;
- goto error_disconnect;
- }
- if (bt_port_get_connection_count(origin_port, &connection_count) !=
- BT_PORT_STATUS_OK) {
+ new_port = bt_component_get_input_port_at_index(new_component,
+ port_index);
+ if (!new_port) {
status = BT_GRAPH_STATUS_ERROR;
goto error_disconnect;
}
- for (connection_index = 0; connection_index < connection_count;
- connection_index++) {
- origin_connection = bt_port_get_connection(origin_port,
- connection_index);
- if (!origin_connection) {
- goto error_disconnect;
- }
-
- upstream_port = bt_connection_get_output_port(
- origin_connection);
+ origin_connection = bt_port_get_connection(origin_port);
+ if (origin_connection) {
+ upstream_port = bt_connection_get_upstream_port(
+ origin_connection);
if (!upstream_port) {
goto error_disconnect;
}
if (!new_connection) {
goto error_disconnect;
}
-
- BT_PUT(upstream_port);
- BT_PUT(origin_connection);
- BT_PUT(new_connection);
}
+
+ BT_PUT(upstream_port);
+ BT_PUT(origin_connection);
+ BT_PUT(new_connection);
BT_PUT(origin_port);
BT_PUT(new_port);
}
/* Replicate output connections. */
- for (port_index = 0; port_index< origin_output_port_count; port_index++) {
- uint64_t connection_count, connection_index;
-
- origin_port = get_output_port(origin, port_index);
+ for (port_index = 0; port_index < origin_output_port_count; port_index++) {
+ origin_port = bt_component_get_output_port_at_index(origin,
+ port_index);
if (!origin_port) {
status = BT_GRAPH_STATUS_ERROR;
goto error_disconnect;
}
- new_port = get_output_port(new_component, port_index);
+ new_port = bt_component_get_output_port_at_index(new_component,
+ port_index);
if (!new_port) {
status = BT_GRAPH_STATUS_ERROR;
goto error_disconnect;
}
- if (bt_port_get_connection_count(origin_port, &connection_count) !=
- BT_PORT_STATUS_OK) {
- status = BT_GRAPH_STATUS_ERROR;
- goto error_disconnect;
- }
-
- for (connection_index = 0; connection_index < connection_count;
- connection_index++) {
- origin_connection = bt_port_get_connection(origin_port,
- connection_index);
- if (!origin_connection) {
- goto error_disconnect;
- }
-
- downstream_port = bt_connection_get_input_port(
+ origin_connection = bt_port_get_connection(origin_port);
+ if (origin_connection) {
+ downstream_port = bt_connection_get_downstream_port(
origin_connection);
if (!downstream_port) {
goto error_disconnect;
if (!new_connection) {
goto error_disconnect;
}
-
- BT_PUT(downstream_port);
- BT_PUT(origin_connection);
- BT_PUT(new_connection);
}
+
+ BT_PUT(downstream_port);
+ BT_PUT(origin_connection);
+ BT_PUT(new_connection);
BT_PUT(origin_port);
BT_PUT(new_port);
}
goto end;
}
-enum bt_component_status bt_graph_consume(struct bt_graph *graph)
+enum bt_graph_status bt_graph_consume(struct bt_graph *graph)
{
struct bt_component *sink;
- enum bt_component_status status;
+ enum bt_graph_status status = BT_GRAPH_STATUS_OK;
+ enum bt_component_status comp_status;
GList *current_node;
if (!graph) {
- status = BT_COMPONENT_STATUS_INVALID;
+ status = BT_GRAPH_STATUS_INVALID;
goto end;
}
if (g_queue_is_empty(graph->sinks_to_consume)) {
- status = BT_COMPONENT_STATUS_END;
+ status = BT_GRAPH_STATUS_END;
goto end;
}
current_node = g_queue_pop_head_link(graph->sinks_to_consume);
sink = current_node->data;
- status = bt_component_sink_consume(sink);
- if (status != BT_COMPONENT_STATUS_END) {
+ comp_status = bt_component_sink_consume(sink);
+ switch (comp_status) {
+ case BT_COMPONENT_STATUS_OK:
+ break;
+ case BT_COMPONENT_STATUS_END:
+ status = BT_GRAPH_STATUS_END;
+ break;
+ case BT_COMPONENT_STATUS_AGAIN:
+ status = BT_GRAPH_STATUS_AGAIN;
+ break;
+ case BT_COMPONENT_STATUS_INVALID:
+ status = BT_GRAPH_STATUS_INVALID;
+ break;
+ default:
+ status = BT_GRAPH_STATUS_ERROR;
+ break;
+ }
+
+ if (status != BT_GRAPH_STATUS_END) {
g_queue_push_tail_link(graph->sinks_to_consume, current_node);
goto end;
}
return status;
}
-enum bt_graph_status bt_graph_run(struct bt_graph *graph,
- enum bt_component_status *_component_status)
+enum bt_graph_status bt_graph_run(struct bt_graph *graph)
{
- enum bt_component_status component_status;
- enum bt_graph_status graph_status = BT_GRAPH_STATUS_OK;
+ enum bt_graph_status status = BT_GRAPH_STATUS_OK;
if (!graph) {
- graph_status = BT_GRAPH_STATUS_INVALID;
+ status = BT_GRAPH_STATUS_INVALID;
goto error;
}
do {
- component_status = bt_graph_consume(graph);
- if (component_status == BT_COMPONENT_STATUS_AGAIN) {
+ status = bt_graph_consume(graph);
+ if (status == BT_GRAPH_STATUS_AGAIN) {
/*
* If AGAIN is received and there are multiple sinks,
* go ahead and consume from the next sink.
* time.
*/
if (graph->sinks_to_consume->length > 1) {
- component_status = BT_COMPONENT_STATUS_OK;
+ status = BT_GRAPH_STATUS_OK;
}
}
- } while (component_status == BT_COMPONENT_STATUS_OK);
-
- if (_component_status) {
- *_component_status = component_status;
- }
+ } while (status == BT_GRAPH_STATUS_OK);
if (g_queue_is_empty(graph->sinks_to_consume)) {
- graph_status = BT_GRAPH_STATUS_END;
- } else if (component_status == BT_COMPONENT_STATUS_AGAIN) {
- graph_status = BT_GRAPH_STATUS_AGAIN;
- } else {
- graph_status = BT_GRAPH_STATUS_ERROR;
+ status = BT_GRAPH_STATUS_END;
}
error:
- return graph_status;
+ return status;
}