*
* Babeltrace Plugin Component Graph
*
- * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ * Copyright 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
* Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
* SOFTWARE.
*/
+#include <babeltrace/component/component-internal.h>
#include <babeltrace/component/component-graph-internal.h>
+#include <babeltrace/component/component-connection-internal.h>
+#include <babeltrace/component/component-sink-internal.h>
+#include <babeltrace/component/component-source.h>
+#include <babeltrace/component/component-filter.h>
+#include <babeltrace/component/component-port.h>
#include <babeltrace/compiler.h>
+#include <unistd.h>
-static void bt_component_graph_destroy(struct bt_object *obj)
+static
+void bt_graph_destroy(struct bt_object *obj)
{
- struct bt_component_graph *graph = container_of(obj,
- struct bt_component_graph, base);
+ struct bt_graph *graph = container_of(obj,
+ struct bt_graph, base);
+ if (graph->components) {
+ g_ptr_array_free(graph->components, TRUE);
+ }
if (graph->connections) {
g_ptr_array_free(graph->connections, TRUE);
}
- if (graph->sinks) {
- g_ptr_array_free(graph->sinks, TRUE);
+ if (graph->sinks_to_consume) {
+ g_queue_free(graph->sinks_to_consume);
}
g_free(graph);
}
-struct bt_component_graph *bt_component_graph_create(void)
+struct bt_graph *bt_graph_create(void)
{
- struct bt_component_graph *graph;
+ struct bt_graph *graph;
- graph = g_new0(struct bt_component_graph, 1);
+ graph = g_new0(struct bt_graph, 1);
if (!graph) {
goto end;
}
- bt_object_init(graph, bt_component_graph_destroy);
+ bt_object_init(graph, bt_graph_destroy);
- graph->connections = g_ptr_array_new_with_free_func(bt_put);
+ graph->connections = g_ptr_array_new_with_free_func(bt_object_release);
if (!graph->connections) {
goto error;
}
- graph->sinks = g_ptr_array_new_with_free_func(bt_put);
- if (!graph->sinks) {
+ graph->components = g_ptr_array_new_with_free_func(bt_object_release);
+ if (!graph->components) {
+ goto error;
+ }
+ graph->sinks_to_consume = g_queue_new();
+ if (!graph->sinks_to_consume) {
goto error;
}
end:
goto end;
}
-enum bt_component_graph_status bt_component_graph_connect(
- struct bt_component_graph *graph, struct bt_component *upstream,
- struct bt_component *downstream)
+struct bt_connection *bt_graph_connect(struct bt_graph *graph,
+ struct bt_port *upstream_port,
+ struct bt_port *downstream_port)
+{
+ struct bt_connection *connection = NULL;
+ struct bt_graph *upstream_graph = NULL;
+ struct bt_graph *downstream_graph = NULL;
+ struct bt_component *upstream_component = NULL;
+ struct bt_component *downstream_component = NULL;
+ enum bt_component_status component_status;
+ bool components_added = false;
+
+ if (!graph || !upstream_port || !downstream_port) {
+ goto end;
+ }
+
+ if (bt_port_get_type(upstream_port) != BT_PORT_TYPE_OUTPUT) {
+ goto end;
+ }
+ if (bt_port_get_type(downstream_port) != BT_PORT_TYPE_INPUT) {
+ goto end;
+ }
+
+ /* Ensure the components are not already part of another graph. */
+ upstream_component = bt_port_get_component(upstream_port);
+ assert(upstream_component);
+ upstream_graph = bt_component_get_graph(upstream_component);
+ if (upstream_graph && (graph != upstream_graph)) {
+ fprintf(stderr, "Upstream component is already part of another graph\n");
+ goto error;
+ }
+
+ downstream_component = bt_port_get_component(downstream_port);
+ assert(downstream_component);
+ downstream_graph = bt_component_get_graph(downstream_component);
+ if (downstream_graph && (graph != downstream_graph)) {
+ fprintf(stderr, "Downstream component is already part of another graph\n");
+ goto error;
+ }
+
+ connection = bt_connection_create(graph, upstream_port,
+ downstream_port);
+ if (!connection) {
+ goto error;
+ }
+
+ /*
+ * Ownership of up/downstream_component and of the connection object is
+ * transferred to the graph.
+ */
+ g_ptr_array_add(graph->connections, connection);
+ g_ptr_array_add(graph->components, upstream_component);
+ g_ptr_array_add(graph->components, downstream_component);
+ if (bt_component_get_class_type(downstream_component) ==
+ BT_COMPONENT_CLASS_TYPE_SINK) {
+ g_queue_push_tail(graph->sinks_to_consume,
+ downstream_component);
+ }
+
+ /*
+ * The graph is now the parent of these components which garantees their
+ * existence for the duration of the graph's lifetime.
+ */
+ bt_component_set_graph(upstream_component, graph);
+ bt_put(upstream_component);
+ bt_component_set_graph(downstream_component, graph);
+ bt_put(downstream_component);
+
+ /* Rollback the connection from this point on. */
+ components_added = true;
+
+ /*
+ * The components and connection are added to the graph before invoking
+ * the new_connection method in order to make them visible to the
+ * components during the method's invocation.
+ */
+ component_status = bt_component_new_connection(upstream_component,
+ upstream_port, connection);
+ if (component_status != BT_COMPONENT_STATUS_OK) {
+ goto error;
+ }
+ component_status = bt_component_new_connection(downstream_component,
+ downstream_port, connection);
+ if (component_status != BT_COMPONENT_STATUS_OK) {
+ goto error;
+ }
+end:
+ bt_put(upstream_graph);
+ bt_put(downstream_graph);
+ return connection;
+error:
+ if (components_added) {
+ if (bt_component_get_class_type(downstream_component) ==
+ BT_COMPONENT_CLASS_TYPE_SINK) {
+ g_queue_pop_tail(graph->sinks_to_consume);
+ }
+ g_ptr_array_set_size(graph->connections,
+ graph->connections->len - 1);
+ g_ptr_array_set_size(graph->components,
+ graph->components->len - 2);
+ }
+ goto end;
+}
+
+static
+int get_component_port_counts(struct bt_component *component, int *input_count,
+ int *output_count)
+{
+ int ret = -1;
+
+ switch (bt_component_get_class_type(component)) {
+ case BT_COMPONENT_CLASS_TYPE_SOURCE:
+ ret = bt_component_source_get_output_port_count(component);
+ if (ret < 0) {
+ goto end;
+ }
+ *output_count = ret;
+ break;
+ case BT_COMPONENT_CLASS_TYPE_FILTER:
+ ret = bt_component_filter_get_output_port_count(component);
+ if (ret < 0) {
+ goto end;
+ }
+ *output_count = ret;
+ break;
+ ret = bt_component_filter_get_input_port_count(component);
+ if (ret < 0) {
+ goto end;
+ }
+ *input_count = ret;
+ break;
+ case BT_COMPONENT_CLASS_TYPE_SINK:
+ ret = bt_component_sink_get_input_port_count(component);
+ if (ret < 0) {
+ goto end;
+ }
+ *input_count = ret;
+ break;
+ default:
+ assert(false);
+ break;
+ }
+ ret = 0;
+end:
+ return ret;
+}
+
+static
+struct bt_port *get_input_port(struct bt_component *component, int index)
{
- return BT_COMPONENT_GRAPH_STATUS_OK;
+ struct bt_port *port = NULL;
+
+ switch (bt_component_get_class_type(component)) {
+ case BT_COMPONENT_CLASS_TYPE_FILTER:
+ port = bt_component_filter_get_input_port_at_index(component,
+ index);
+ break;
+ case BT_COMPONENT_CLASS_TYPE_SINK:
+ port = bt_component_sink_get_input_port_at_index(component,
+ index);
+ break;
+ default:
+ assert(false);
+ }
+ return port;
}
-enum bt_component_graph_status bt_component_graph_add_component(
- struct bt_component_graph *graph,
- struct bt_component *component)
+static
+struct bt_port *get_output_port(struct bt_component *component, int index)
{
- return BT_COMPONENT_GRAPH_STATUS_OK;
+ struct bt_port *port = NULL;
+
+ switch (bt_component_get_class_type(component)) {
+ case BT_COMPONENT_CLASS_TYPE_SOURCE:
+ port = bt_component_source_get_output_port_at_index(component,
+ index);
+ break;
+ case BT_COMPONENT_CLASS_TYPE_FILTER:
+ port = bt_component_filter_get_output_port_at_index(component,
+ index);
+ break;
+ default:
+ assert(false);
+ }
+ return port;
}
-enum bt_component_graph_status bt_component_graph_add_component_as_sibling(
- struct bt_component_graph *graph, struct bt_component *origin,
+enum bt_graph_status bt_graph_add_component_as_sibling(struct bt_graph *graph,
+ struct bt_component *origin,
struct bt_component *new_component)
{
- return BT_COMPONENT_GRAPH_STATUS_OK;
+ int origin_input_port_count = 0;
+ int origin_output_port_count = 0;
+ int new_input_port_count = 0;
+ int new_output_port_count = 0;
+ enum bt_graph_status status = BT_GRAPH_STATUS_OK;
+ struct bt_graph *origin_graph = NULL;
+ struct bt_graph *new_graph = NULL;
+ struct bt_port *origin_port = NULL;
+ struct bt_port *new_port = NULL;
+ struct bt_port *upstream_port = NULL;
+ struct bt_port *downstream_port = NULL;
+ struct bt_connection *origin_connection = NULL;
+ struct bt_connection *new_connection = NULL;
+ int port_index;
+
+ if (!graph || !origin || !new_component) {
+ status = BT_GRAPH_STATUS_INVALID;
+ goto end;
+ }
+
+ if (bt_component_get_class_type(origin) !=
+ bt_component_get_class_type(new_component)) {
+ status = BT_GRAPH_STATUS_INVALID;
+ goto end;
+ }
+
+ origin_graph = bt_component_get_graph(origin);
+ if (!origin_graph || (origin_graph != graph)) {
+ status = BT_GRAPH_STATUS_INVALID;
+ goto end;
+ }
+
+ new_graph = bt_component_get_graph(new_component);
+ if (new_graph) {
+ status = BT_GRAPH_STATUS_ALREADY_IN_A_GRAPH;
+ goto end;
+ }
+
+ if (get_component_port_counts(origin, &origin_input_port_count,
+ &origin_output_port_count)) {
+ status = BT_GRAPH_STATUS_INVALID;
+ goto end;
+ }
+ if (get_component_port_counts(new_component, &new_input_port_count,
+ &new_output_port_count)) {
+ status = BT_GRAPH_STATUS_INVALID;
+ goto end;
+ }
+
+ if (origin_input_port_count != new_input_port_count ||
+ origin_output_port_count != new_output_port_count) {
+ status = BT_GRAPH_STATUS_INVALID;
+ goto end;
+ }
+
+ /* Replicate input connections. */
+ for (port_index = 0; port_index< origin_input_port_count; port_index++) {
+ uint64_t connection_count, connection_index;
+
+ origin_port = get_input_port(origin, port_index);
+ if (!origin_port) {
+ status = BT_GRAPH_STATUS_ERROR;
+ goto error_disconnect;
+ }
+ new_port = get_input_port(new_component, port_index);
+ if (!new_port) {
+ status = BT_GRAPH_STATUS_ERROR;
+ goto error_disconnect;
+ }
+
+ if (bt_port_get_connection_count(origin_port, &connection_count) !=
+ BT_PORT_STATUS_OK) {
+ status = BT_GRAPH_STATUS_ERROR;
+ goto error_disconnect;
+ }
+
+ for (connection_index = 0; connection_index < connection_count;
+ connection_index++) {
+ origin_connection = bt_port_get_connection(origin_port,
+ connection_index);
+ if (!origin_connection) {
+ goto error_disconnect;
+ }
+
+ upstream_port = bt_connection_get_output_port(
+ origin_connection);
+ if (!upstream_port) {
+ goto error_disconnect;
+ }
+
+ new_connection = bt_graph_connect(graph, upstream_port,
+ new_port);
+ if (!new_connection) {
+ goto error_disconnect;
+ }
+
+ BT_PUT(upstream_port);
+ BT_PUT(origin_connection);
+ BT_PUT(new_connection);
+ }
+ BT_PUT(origin_port);
+ BT_PUT(new_port);
+ }
+
+ /* Replicate output connections. */
+ for (port_index = 0; port_index< origin_output_port_count; port_index++) {
+ uint64_t connection_count, connection_index;
+
+ origin_port = get_output_port(origin, port_index);
+ if (!origin_port) {
+ status = BT_GRAPH_STATUS_ERROR;
+ goto error_disconnect;
+ }
+ new_port = get_output_port(new_component, port_index);
+ if (!new_port) {
+ status = BT_GRAPH_STATUS_ERROR;
+ goto error_disconnect;
+ }
+
+ if (bt_port_get_connection_count(origin_port, &connection_count) !=
+ BT_PORT_STATUS_OK) {
+ status = BT_GRAPH_STATUS_ERROR;
+ goto error_disconnect;
+ }
+
+ for (connection_index = 0; connection_index < connection_count;
+ connection_index++) {
+ origin_connection = bt_port_get_connection(origin_port,
+ connection_index);
+ if (!origin_connection) {
+ goto error_disconnect;
+ }
+
+ downstream_port = bt_connection_get_input_port(
+ origin_connection);
+ if (!downstream_port) {
+ goto error_disconnect;
+ }
+
+ new_connection = bt_graph_connect(graph, new_port,
+ downstream_port);
+ if (!new_connection) {
+ goto error_disconnect;
+ }
+
+ BT_PUT(downstream_port);
+ BT_PUT(origin_connection);
+ BT_PUT(new_connection);
+ }
+ BT_PUT(origin_port);
+ BT_PUT(new_port);
+ }
+end:
+ bt_put(origin_graph);
+ bt_put(new_graph);
+ bt_put(origin_port);
+ bt_put(new_port);
+ bt_put(upstream_port);
+ bt_put(downstream_port);
+ bt_put(origin_connection);
+ bt_put(new_connection);
+ return status;
+error_disconnect:
+ /* Destroy all connections of the new component. */
+ /* FIXME. */
+ goto end;
}
-enum bt_component_graph_status bt_component_graph_run(
- struct bt_component_graph *graph)
+enum bt_component_status bt_graph_consume(struct bt_graph *graph)
{
- return BT_COMPONENT_GRAPH_STATUS_OK;
+ struct bt_component *sink;
+ enum bt_component_status status;
+ GList *current_node;
+
+ if (!graph) {
+ status = BT_COMPONENT_STATUS_INVALID;
+ goto end;
+ }
+
+ if (g_queue_is_empty(graph->sinks_to_consume)) {
+ status = BT_COMPONENT_STATUS_END;
+ goto end;
+ }
+
+ current_node = g_queue_pop_head_link(graph->sinks_to_consume);
+ sink = current_node->data;
+ status = bt_component_sink_consume(sink);
+ if (status != BT_COMPONENT_STATUS_END) {
+ g_queue_push_tail_link(graph->sinks_to_consume, current_node);
+ goto end;
+ }
+
+ /* End reached, the node is not added back to the queue and free'd. */
+ g_queue_delete_link(graph->sinks_to_consume, current_node);
+
+ /* Don't forward an END status if there are sinks left to consume. */
+ if (!g_queue_is_empty(graph->sinks_to_consume)) {
+ status = BT_GRAPH_STATUS_OK;
+ goto end;
+ }
+end:
+ return status;
}
+enum bt_graph_status bt_graph_run(struct bt_graph *graph,
+ enum bt_component_status *_component_status)
+{
+ enum bt_component_status component_status;
+ enum bt_graph_status graph_status = BT_GRAPH_STATUS_OK;
+
+ if (!graph) {
+ graph_status = BT_GRAPH_STATUS_INVALID;
+ goto error;
+ }
+
+ do {
+ component_status = bt_graph_consume(graph);
+ if (component_status == BT_COMPONENT_STATUS_AGAIN) {
+ /*
+ * If AGAIN is received and there are multiple sinks,
+ * go ahead and consume from the next sink.
+ *
+ * However, in the case where a single sink is left,
+ * the caller can decide to busy-wait and call
+ * bt_graph_run continuously until the source is ready
+ * or it can decide to sleep for an arbitrary amount of
+ * time.
+ */
+ if (graph->sinks_to_consume->length > 1) {
+ component_status = BT_COMPONENT_STATUS_OK;
+ }
+ }
+ } while (component_status == BT_COMPONENT_STATUS_OK);
+
+ if (_component_status) {
+ *_component_status = component_status;
+ }
+
+ if (g_queue_is_empty(graph->sinks_to_consume)) {
+ graph_status = BT_GRAPH_STATUS_END;
+ } else if (component_status == BT_COMPONENT_STATUS_AGAIN) {
+ graph_status = BT_GRAPH_STATUS_AGAIN;
+ } else {
+ graph_status = BT_GRAPH_STATUS_ERROR;
+ }
+error:
+ return graph_status;
+}