#include <babeltrace/graph/component-filter.h>
#include <babeltrace/graph/port.h>
#include <babeltrace/compiler-internal.h>
+#include <babeltrace/types.h>
#include <unistd.h>
#include <glib.h>
struct bt_graph *graph = container_of(obj,
struct bt_graph, base);
- if (graph->components) {
- g_ptr_array_free(graph->components, TRUE);
- }
+ /*
+ * The graph's reference count is 0 if we're here. Increment
+ * it to avoid a double-destroy (possibly infinitely recursive)
+ * in this situation:
+ *
+ * 1. We put and destroy a connection.
+ * 2. This connection's destructor finalizes its active
+ * notification iterators.
+ * 3. A notification iterator's finalization function gets a
+ * new reference on its component (reference count goes from
+ * 0 to 1).
+ * 4. Since this component's reference count goes to 1, it takes
+ * a reference on its parent (this graph). This graph's
+ * reference count goes from 0 to 1.
+ * 5. The notification iterator's finalization function puts its
+ * component reference (reference count goes from 1 to 0).
+ * 6. Since this component's reference count goes from 1 to 0,
+ * it puts its parent (this graph). This graph's reference
+ * count goes from 1 to 0.
+ * 7. Since this graph's reference count goes from 1 to 0,
+ * its destructor is called (this function).
+ *
+ * With the incrementation below, the graph's reference count at
+ * step 4 goes from 1 to 2, and from 2 to 1 at step 6. This
+ * ensures that this function is not called two times.
+ */
+ obj->ref_count.count++;
+
if (graph->connections) {
g_ptr_array_free(graph->connections, TRUE);
}
+ if (graph->components) {
+ g_ptr_array_free(graph->components, TRUE);
+ }
if (graph->sinks_to_consume) {
g_queue_free(graph->sinks_to_consume);
}
struct bt_graph *downstream_graph = NULL;
struct bt_component *upstream_component = NULL;
struct bt_component *downstream_component = NULL;
- struct bt_connection *existing_conn = NULL;
enum bt_component_status component_status;
- bool upstream_was_already_in_graph;
- bool downstream_was_already_in_graph;
- int components_to_remove = 0;
- int i;
+ bt_bool upstream_was_already_in_graph;
+ bt_bool downstream_was_already_in_graph;
if (!graph || !upstream_port || !downstream_port) {
goto end;
}
+ if (graph->canceled) {
+ goto end;
+ }
+
/* Ensure appropriate types for upstream and downstream ports. */
if (bt_port_get_type(upstream_port) != BT_PORT_TYPE_OUTPUT) {
goto end;
}
/* Ensure that both ports are currently unconnected. */
- existing_conn = bt_port_get_connection(upstream_port);
- bt_put(existing_conn);
- if (existing_conn) {
+ if (bt_port_is_connected(upstream_port)) {
fprintf(stderr, "Upstream port is already connected\n");
goto end;
}
- existing_conn = bt_port_get_connection(downstream_port);
- bt_put(existing_conn);
- if (existing_conn) {
+ if (bt_port_is_connected(downstream_port)) {
fprintf(stderr, "Downstream port is already connected\n");
goto end;
}
}
downstream_was_already_in_graph = (graph == downstream_graph);
+ /*
+ * At this point the ports are not connected yet. Both
+ * components need to accept an eventual connection to their
+ * port by the other port before we continue.
+ */
+ component_status = bt_component_accept_port_connection(
+ upstream_component, upstream_port, downstream_port);
+ if (component_status != BT_COMPONENT_STATUS_OK) {
+ goto error;
+ }
+ component_status = bt_component_accept_port_connection(
+ downstream_component, downstream_port, upstream_port);
+ if (component_status != BT_COMPONENT_STATUS_OK) {
+ goto error;
+ }
+
connection = bt_connection_create(graph, upstream_port,
downstream_port);
if (!connection) {
}
/*
- * The graph is now the parent of these components which garantees their
- * existence for the duration of the graph's lifetime.
+ * The graph is now the parent of these components which
+ * garantees their existence for the duration of the graph's
+ * lifetime.
*/
/*
- * The components and connection are added to the graph before
- * invoking the `accept_port_connection` method in order to make
- * them visible to the components during the method's
- * invocation.
+ * Notify both components that their port is connected.
*/
- component_status = bt_component_accept_port_connection(
- upstream_component, upstream_port, downstream_port);
- if (component_status != BT_COMPONENT_STATUS_OK) {
- goto error_rollback;
- }
- component_status = bt_component_accept_port_connection(
- downstream_component, downstream_port, upstream_port);
- if (component_status != BT_COMPONENT_STATUS_OK) {
- goto error_rollback;
- }
+ bt_component_port_connected(upstream_component, upstream_port,
+ downstream_port);
+ bt_component_port_connected(downstream_component, downstream_port,
+ upstream_port);
/*
- * Both components accepted the connection. Notify the graph's
- * creator that both ports are connected.
+ * Notify the graph's creator that both ports are connected.
*/
bt_graph_notify_ports_connected(graph, upstream_port, downstream_port);
bt_put(upstream_component);
bt_put(downstream_component);
return connection;
-error_rollback:
- /*
- * Remove newly-added components from the graph, being careful
- * not to remove a component that was already present in the graph
- * and is connected to other components.
- */
- components_to_remove += upstream_was_already_in_graph ? 0 : 1;
- components_to_remove += downstream_was_already_in_graph ? 0 : 1;
-
- if (!downstream_was_already_in_graph) {
- if (bt_component_get_class_type(downstream_component) ==
- BT_COMPONENT_CLASS_TYPE_SINK) {
- g_queue_pop_tail(graph->sinks_to_consume);
- }
- }
- /* Remove newly created connection. */
- g_ptr_array_set_size(graph->connections,
- graph->connections->len - 1);
-
- /*
- * Remove newly added components.
- *
- * Note that this is a tricky situation. The graph, being the parent
- * of the components, does not hold a reference to them. Normally,
- * components are destroyed right away when the graph is released since
- * the graph, being their parent, bounds their lifetime
- * (see doc/ref-counting.md).
- *
- * In this particular case, we must take a number of steps:
- * 1) unset the components' parent to rollback the initial state of
- * the components being connected.
- * Note that the reference taken by the component on its graph is
- * released by the set_parent call.
- * 2) set the pointer in the components array to NULL so that the
- * destruction function called on the array's resize in invoked on
- * NULL (no effect),
- *
- * NOTE: Point #1 assumes that *something* holds a reference to both
- * components being connected. The fact that a reference is being
- * held to a component means that it must hold a reference to its
- * parent to prevent the parent from being destroyed (again, refer
- * to doc/red-counting.md). This reference to a component is
- * most likely being held *transitively* by the caller which holds
- * a reference to both ports (a port has its component as a
- * parent).
- *
- * This assumes that a graph is not connecting components by
- * itself while not holding a reference to the ports/components
- * being connected (i.e. "cheating" by using internal APIs).
- */
- for (i = 0; i < components_to_remove; i++) {
- struct bt_component *component = g_ptr_array_index(
- graph->components, graph->components->len - 1);
- bt_component_set_graph(component, NULL);
- g_ptr_array_index(graph->components,
- graph->components->len - 1) = NULL;
- g_ptr_array_set_size(graph->components,
- graph->components->len - 1);
- }
- /* NOTE: Resizing the ptr_arrays invokes the destruction of the elements. */
- goto end;
error:
BT_PUT(upstream_component);
BT_PUT(downstream_component);
static
enum bt_component_status get_component_port_counts(
- struct bt_component *component, uint64_t *input_count,
- uint64_t *output_count)
+ struct bt_component *component, int64_t *input_count,
+ int64_t *output_count)
{
enum bt_component_status ret;
switch (bt_component_get_class_type(component)) {
case BT_COMPONENT_CLASS_TYPE_SOURCE:
- ret = bt_component_source_get_output_port_count(component,
- output_count);
- if (ret != BT_COMPONENT_STATUS_OK) {
+ *output_count =
+ bt_component_source_get_output_port_count(component);
+ if (*output_count < 0) {
+ ret = BT_COMPONENT_STATUS_ERROR;
goto end;
}
break;
case BT_COMPONENT_CLASS_TYPE_FILTER:
- ret = bt_component_filter_get_output_port_count(component,
- output_count);
- if (ret != BT_COMPONENT_STATUS_OK) {
+ *output_count =
+ bt_component_filter_get_output_port_count(component);
+ if (*output_count < 0) {
+ ret = BT_COMPONENT_STATUS_ERROR;
goto end;
}
- ret = bt_component_filter_get_input_port_count(component,
- input_count);
- if (ret != BT_COMPONENT_STATUS_OK) {
+ *input_count =
+ bt_component_filter_get_input_port_count(component);
+ if (*input_count < 0) {
+ ret = BT_COMPONENT_STATUS_ERROR;
goto end;
}
break;
case BT_COMPONENT_CLASS_TYPE_SINK:
- ret = bt_component_sink_get_input_port_count(component,
- input_count);
- if (ret != BT_COMPONENT_STATUS_OK) {
+ *input_count =
+ bt_component_sink_get_input_port_count(component);
+ if (*input_count < 0) {
+ ret = BT_COMPONENT_STATUS_ERROR;
goto end;
}
break;
default:
- assert(false);
+ assert(BT_FALSE);
break;
}
ret = BT_COMPONENT_STATUS_OK;
struct bt_component *origin,
struct bt_component *new_component)
{
- uint64_t origin_input_port_count = 0;
- uint64_t origin_output_port_count = 0;
- uint64_t new_input_port_count = 0;
- uint64_t new_output_port_count = 0;
+ int64_t origin_input_port_count = 0;
+ int64_t origin_output_port_count = 0;
+ int64_t new_input_port_count = 0;
+ int64_t new_output_port_count = 0;
enum bt_graph_status status = BT_GRAPH_STATUS_OK;
struct bt_graph *origin_graph = NULL;
struct bt_graph *new_graph = NULL;
struct bt_port *downstream_port = NULL;
struct bt_connection *origin_connection = NULL;
struct bt_connection *new_connection = NULL;
- int port_index;
+ int64_t port_index;
if (!graph || !origin || !new_component) {
status = BT_GRAPH_STATUS_INVALID;
goto end;
}
+ if (graph->canceled) {
+ status = BT_GRAPH_STATUS_CANCELED;
+ goto end;
+ }
+
if (bt_component_get_class_type(origin) !=
bt_component_get_class_type(new_component)) {
status = BT_GRAPH_STATUS_INVALID;
/* Replicate input connections. */
for (port_index = 0; port_index< origin_input_port_count; port_index++) {
- origin_port = bt_component_get_input_port_at_index(origin,
+ origin_port = bt_component_get_input_port_by_index(origin,
port_index);
if (!origin_port) {
status = BT_GRAPH_STATUS_ERROR;
goto error_disconnect;
}
- new_port = bt_component_get_input_port_at_index(new_component,
+ new_port = bt_component_get_input_port_by_index(new_component,
port_index);
if (!new_port) {
status = BT_GRAPH_STATUS_ERROR;
/* Replicate output connections. */
for (port_index = 0; port_index < origin_output_port_count; port_index++) {
- origin_port = bt_component_get_output_port_at_index(origin,
+ origin_port = bt_component_get_output_port_by_index(origin,
port_index);
if (!origin_port) {
status = BT_GRAPH_STATUS_ERROR;
goto error_disconnect;
}
- new_port = bt_component_get_output_port_at_index(new_component,
+ new_port = bt_component_get_output_port_by_index(new_component,
port_index);
if (!new_port) {
status = BT_GRAPH_STATUS_ERROR;
goto end;
}
+ if (graph->canceled) {
+ status = BT_GRAPH_STATUS_CANCELED;
+ goto end;
+ }
+
if (g_queue_is_empty(graph->sinks_to_consume)) {
status = BT_GRAPH_STATUS_END;
goto end;
if (!graph) {
status = BT_GRAPH_STATUS_INVALID;
- goto error;
+ goto end;
}
do {
status = bt_graph_consume(graph);
if (status == BT_GRAPH_STATUS_AGAIN) {
/*
- * If AGAIN is received and there are multiple sinks,
- * go ahead and consume from the next sink.
+ * If AGAIN is received and there are multiple
+ * sinks, go ahead and consume from the next
+ * sink.
*
- * However, in the case where a single sink is left,
- * the caller can decide to busy-wait and call
- * bt_graph_run continuously until the source is ready
- * or it can decide to sleep for an arbitrary amount of
- * time.
+ * However, in the case where a single sink is
+ * left, the caller can decide to busy-wait and
+ * call bt_graph_run() continuously until the
+ * source is ready or it can decide to sleep for
+ * an arbitrary amount of time.
*/
if (graph->sinks_to_consume->length > 1) {
status = BT_GRAPH_STATUS_OK;
if (g_queue_is_empty(graph->sinks_to_consume)) {
status = BT_GRAPH_STATUS_END;
}
-error:
+end:
return status;
}
downstream_port, listener.data);
}
}
+
+extern enum bt_graph_status bt_graph_cancel(struct bt_graph *graph)
+{
+ enum bt_graph_status ret = BT_GRAPH_STATUS_OK;
+
+ if (!graph) {
+ ret = BT_GRAPH_STATUS_INVALID;
+ goto end;
+ }
+
+ graph->canceled = BT_TRUE;
+
+end:
+ return ret;
+}
+
+extern bt_bool bt_graph_is_canceled(struct bt_graph *graph)
+{
+ return graph ? graph->canceled : BT_FALSE;
+}
+
+BT_HIDDEN
+void bt_graph_remove_connection(struct bt_graph *graph,
+ struct bt_connection *connection)
+{
+ assert(graph);
+ assert(connection);
+ g_ptr_array_remove(graph->connections, connection);
+}