X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=lib%2Fgraph%2Fgraph.c;h=c62712ccc4c83badfacb4069ce2742b4526f22ac;hb=f3ea12b321e64bdcd1bb77ae172357b0fb6745bc;hp=10dd1944e840808e7b9669b9b8761deca26e590e;hpb=1a6a376a7c1fff0d5da2559f4f9a515950fb15ba;p=babeltrace.git diff --git a/lib/graph/graph.c b/lib/graph/graph.c index 10dd1944..c62712cc 100644 --- a/lib/graph/graph.c +++ b/lib/graph/graph.c @@ -37,14 +37,49 @@ #include #include #include +#include +#include #include #include struct bt_graph_listener { void *func; + bt_graph_listener_removed removed; void *data; }; +static +int init_listeners_array(GArray **listeners) +{ + int ret = 0; + + assert(listeners); + *listeners = g_array_new(FALSE, TRUE, sizeof(struct bt_graph_listener)); + if (!*listeners) { + BT_LOGE_STR("Failed to allocate one GArray."); + ret = -1; + goto end; + } + +end: + return ret; +} + +static +void call_remove_listeners(GArray *listeners) +{ + size_t i; + + for (i = 0; i < listeners->len; i++) { + struct bt_graph_listener listener = + g_array_index(listeners, struct bt_graph_listener, i); + + if (listener.removed) { + listener.removed(listener.data); + } + } +} + static void bt_graph_destroy(struct bt_object *obj) { @@ -80,6 +115,18 @@ void bt_graph_destroy(struct bt_object *obj) BT_LOGD("Destroying graph: addr=%p", graph); obj->ref_count.count++; + /* + * Cancel the graph to disallow some operations, like creating + * notification iterators and adding ports to components. + */ + (void) bt_graph_cancel(graph); + + /* Call all remove listeners */ + call_remove_listeners(graph->listeners.port_added); + call_remove_listeners(graph->listeners.port_removed); + call_remove_listeners(graph->listeners.ports_connected); + call_remove_listeners(graph->listeners.ports_disconnected); + if (graph->connections) { BT_LOGD_STR("Destroying connections."); g_ptr_array_free(graph->connections, TRUE); @@ -111,23 +158,6 @@ void bt_graph_destroy(struct bt_object *obj) g_free(graph); } -static -int init_listeners_array(GArray **listeners) -{ - int ret = 0; - - assert(listeners); - *listeners = g_array_new(FALSE, TRUE, sizeof(struct bt_graph_listener)); - if (!*listeners) { - BT_LOGE_STR("Failed to allocate one GArray."); - ret = -1; - goto end; - } - -end: - return ret; -} - struct bt_graph *bt_graph_create(void) { struct bt_graph *graph; @@ -158,6 +188,7 @@ struct bt_graph *bt_graph_create(void) goto error; } + graph->can_consume = BT_TRUE; ret = init_listeners_array(&graph->listeners.port_added); if (ret) { BT_LOGE_STR("Cannot create the \"port added\" listener array."); @@ -202,14 +233,14 @@ enum bt_graph_status bt_graph_connect_ports(struct bt_graph *graph, struct bt_component *upstream_component = NULL; struct bt_component *downstream_component = NULL; enum bt_component_status component_status; - bt_bool upstream_was_already_in_graph; - bt_bool downstream_was_already_in_graph; + bt_bool init_can_consume; if (!graph) { BT_LOGW_STR("Invalid parameter: graph is NULL."); status = BT_GRAPH_STATUS_INVALID; goto end; } + init_can_consume = graph->can_consume; if (!upstream_port) { BT_LOGW_STR("Invalid parameter: upstream port is NULL."); @@ -236,6 +267,8 @@ enum bt_graph_status bt_graph_connect_ports(struct bt_graph *graph, goto end; } + graph->can_consume = BT_FALSE; + /* Ensure appropriate types for upstream and downstream ports. */ if (bt_port_get_type(upstream_port) != BT_PORT_TYPE_OUTPUT) { BT_LOGW_STR("Invalid parameter: upstream port is not an output port."); @@ -285,24 +318,6 @@ enum bt_graph_status bt_graph_connect_ports(struct bt_graph *graph, upstream_component, bt_component_get_name(upstream_component), downstream_component, bt_component_get_name(downstream_component)); - /* Ensure the components are not already part of another graph. */ - upstream_graph = bt_component_get_graph(upstream_component); - if (upstream_graph && (graph != upstream_graph)) { - BT_LOGW("Invalid parameter: upstream port's component is already part of another graph: " - "other-graph-addr=%p", upstream_graph); - status = BT_GRAPH_STATUS_ALREADY_IN_A_GRAPH; - goto end; - } - upstream_was_already_in_graph = (graph == upstream_graph); - downstream_graph = bt_component_get_graph(downstream_component); - if (downstream_graph && (graph != downstream_graph)) { - BT_LOGW("Invalid parameter: downstream port's component is already part of another graph: " - "other-graph-addr=%p", downstream_graph); - status = BT_GRAPH_STATUS_ALREADY_IN_A_GRAPH; - goto end; - } - downstream_was_already_in_graph = (graph == downstream_graph); - /* * At this point the ports are not connected yet. Both * components need to accept an eventual connection to their @@ -357,26 +372,6 @@ enum bt_graph_status bt_graph_connect_ports(struct bt_graph *graph, */ g_ptr_array_add(graph->connections, connection); - if (!upstream_was_already_in_graph) { - g_ptr_array_add(graph->components, upstream_component); - bt_component_set_graph(upstream_component, graph); - } - if (!downstream_was_already_in_graph) { - g_ptr_array_add(graph->components, downstream_component); - bt_component_set_graph(downstream_component, graph); - if (bt_component_get_class_type(downstream_component) == - BT_COMPONENT_CLASS_TYPE_SINK) { - g_queue_push_tail(graph->sinks_to_consume, - downstream_component); - } - } - - /* - * The graph is now the parent of these components which - * garantees their existence for the duration of the graph's - * lifetime. - */ - /* * Notify both components that their port is connected. */ @@ -416,32 +411,24 @@ end: bt_put(upstream_component); bt_put(downstream_component); bt_put(connection); + if (graph) { + graph->can_consume = init_can_consume; + } return status; } static -enum bt_graph_status bt_graph_consume_no_check(struct bt_graph *graph) +enum bt_graph_status consume_graph_sink(struct bt_component *sink) { - struct bt_component *sink; enum bt_graph_status status = BT_GRAPH_STATUS_OK; enum bt_component_status comp_status; - GList *current_node; - BT_LOGV("Making next sink consume: addr=%p", graph); - - if (g_queue_is_empty(graph->sinks_to_consume)) { - BT_LOGV_STR("Graph's sink queue is empty: end of graph."); - status = BT_GRAPH_STATUS_END; - goto end; - } - - current_node = g_queue_pop_head_link(graph->sinks_to_consume); - sink = current_node->data; - BT_LOGV("Chose next sink to consume: comp-addr=%p, comp-name=\"%s\"", - sink, bt_component_get_name(sink)); + assert(sink); comp_status = bt_component_sink_consume(sink); - BT_LOGV("Consumed from sink: status=%s", + BT_LOGV("Consumed from sink: addr=%p, name=\"%s\", status=%s", + sink, bt_component_get_name(sink), bt_component_status_string(comp_status)); + switch (comp_status) { case BT_COMPONENT_STATUS_OK: break; @@ -459,21 +446,105 @@ enum bt_graph_status bt_graph_consume_no_check(struct bt_graph *graph) break; } + return status; +} + +/* + * `node` is removed from the queue of sinks to consume when passed to + * this function. This function adds it back to the queue if there's + * still something to consume afterwards. + */ +static +enum bt_graph_status consume_sink_node(struct bt_graph *graph, + GList *node) +{ + enum bt_graph_status status; + struct bt_component *sink; + + sink = node->data; + status = consume_graph_sink(sink); if (status != BT_GRAPH_STATUS_END) { - g_queue_push_tail_link(graph->sinks_to_consume, current_node); + g_queue_push_tail_link(graph->sinks_to_consume, node); goto end; } /* End reached, the node is not added back to the queue and free'd. */ - g_queue_delete_link(graph->sinks_to_consume, current_node); + g_queue_delete_link(graph->sinks_to_consume, node); /* Don't forward an END status if there are sinks left to consume. */ if (!g_queue_is_empty(graph->sinks_to_consume)) { status = BT_GRAPH_STATUS_OK; goto end; } + +end: + BT_LOGV("Consumed sink node: status=%s", bt_graph_status_string(status)); + return status; +} + +BT_HIDDEN +enum bt_graph_status bt_graph_consume_sink_no_check(struct bt_graph *graph, + struct bt_component *sink) +{ + enum bt_graph_status status; + GList *sink_node; + int index; + + BT_LOGV("Making specific sink consume: addr=%p, " + "comp-addr=%p, comp-name=\"%s\"", + graph, sink, bt_component_get_name(sink)); + + assert(bt_component_borrow_graph(sink) == graph); + + if (g_queue_is_empty(graph->sinks_to_consume)) { + BT_LOGV_STR("Graph's sink queue is empty: end of graph."); + status = BT_GRAPH_STATUS_END; + goto end; + } + + index = g_queue_index(graph->sinks_to_consume, sink); + if (index < 0) { + BT_LOGV_STR("Sink is not marked as consumable: sink is ended."); + status = BT_GRAPH_STATUS_END; + goto end; + } + + sink_node = g_queue_pop_nth_link(graph->sinks_to_consume, index); + assert(sink_node); + status = consume_sink_node(graph, sink_node); + +end: + return status; +} + +BT_HIDDEN +enum bt_graph_status bt_graph_consume_no_check(struct bt_graph *graph) +{ + enum bt_graph_status status = BT_GRAPH_STATUS_OK; + struct bt_component *sink; + GList *current_node; + + BT_LOGV("Making next sink consume: addr=%p", graph); + + if (!graph->has_sink) { + BT_LOGW_STR("Graph has no sink component."); + status = BT_GRAPH_STATUS_NO_SINK; + goto end; + } + + if (g_queue_is_empty(graph->sinks_to_consume)) { + BT_LOGV_STR("Graph's sink queue is empty: end of graph."); + status = BT_GRAPH_STATUS_END; + goto end; + } + + current_node = g_queue_pop_head_link(graph->sinks_to_consume); + sink = current_node->data; + BT_LOGV("Chose next sink to consume: comp-addr=%p, comp-name=\"%s\"", + sink, bt_component_get_name(sink)); + status = consume_sink_node(graph, current_node); + end: - BT_LOGV("Graph consumed: status=%s", bt_graph_status_string(status)); return status; } @@ -494,7 +565,15 @@ enum bt_graph_status bt_graph_consume(struct bt_graph *graph) goto end; } + if (!graph->can_consume) { + BT_LOGW_STR("Cannot consume graph in its current state."); + status = BT_GRAPH_STATUS_CANNOT_CONSUME; + goto end; + } + + graph->can_consume = BT_FALSE; status = bt_graph_consume_no_check(graph); + graph->can_consume = BT_TRUE; end: return status; @@ -517,6 +596,13 @@ enum bt_graph_status bt_graph_run(struct bt_graph *graph) goto end; } + if (!graph->can_consume) { + BT_LOGW_STR("Cannot run graph in its current state."); + status = BT_GRAPH_STATUS_CANNOT_CONSUME; + goto end; + } + + graph->can_consume = BT_FALSE; BT_LOGV("Running graph: addr=%p", graph); do { @@ -533,7 +619,7 @@ enum bt_graph_status bt_graph_run(struct bt_graph *graph) goto end; } - status = bt_graph_consume(graph); + status = bt_graph_consume_no_check(graph); if (status == BT_GRAPH_STATUS_AGAIN) { /* * If AGAIN is received and there are multiple @@ -549,6 +635,8 @@ enum bt_graph_status bt_graph_run(struct bt_graph *graph) if (graph->sinks_to_consume->length > 1) { status = BT_GRAPH_STATUS_OK; } + } else if (status == BT_GRAPH_STATUS_NO_SINK) { + goto end; } } while (status == BT_GRAPH_STATUS_OK); @@ -558,14 +646,18 @@ enum bt_graph_status bt_graph_run(struct bt_graph *graph) end: BT_LOGV("Graph ran: status=%s", bt_graph_status_string(status)); + if (graph) { + graph->can_consume = BT_TRUE; + } return status; } static -int add_listener(GArray *listeners, void *func, void *data) +int add_listener(GArray *listeners, void *func, void *removed, void *data) { struct bt_graph_listener listener = { .func = func, + .removed = removed, .data = data, }; @@ -575,7 +667,8 @@ int add_listener(GArray *listeners, void *func, void *data) int bt_graph_add_port_added_listener( struct bt_graph *graph, - bt_graph_port_added_listener listener, void *data) + bt_graph_port_added_listener listener, + bt_graph_listener_removed listener_removed, void *data) { int ret; @@ -585,13 +678,21 @@ int bt_graph_add_port_added_listener( goto end; } + if (graph->in_remove_listener) { + BT_LOGW("Cannot call this function during the execution of a remove listener: " + "addr=%p", graph); + ret = -1; + goto end; + } + if (!listener) { BT_LOGW_STR("Invalid parameter: listener is NULL."); ret = -1; goto end; } - ret = add_listener(graph->listeners.port_added, listener, data); + ret = add_listener(graph->listeners.port_added, listener, + listener_removed, data); BT_LOGV("Added \"port added\" listener to graph: " "graph-addr=%p, listener-addr=%p, pos=%d", graph, listener, ret); @@ -602,7 +703,8 @@ end: int bt_graph_add_port_removed_listener( struct bt_graph *graph, - bt_graph_port_removed_listener listener, void *data) + bt_graph_port_removed_listener listener, + bt_graph_listener_removed listener_removed, void *data) { int ret; @@ -612,13 +714,21 @@ int bt_graph_add_port_removed_listener( goto end; } + if (graph->in_remove_listener) { + BT_LOGW("Cannot call this function during the execution of a remove listener: " + "addr=%p", graph); + ret = -1; + goto end; + } + if (!listener) { BT_LOGW_STR("Invalid parameter: listener is NULL."); ret = -1; goto end; } - ret = add_listener(graph->listeners.port_removed, listener, data); + ret = add_listener(graph->listeners.port_removed, listener, + listener_removed, data); BT_LOGV("Added \"port removed\" listener to graph: " "graph-addr=%p, listener-addr=%p, pos=%d", graph, listener, ret); @@ -629,7 +739,8 @@ end: int bt_graph_add_ports_connected_listener( struct bt_graph *graph, - bt_graph_ports_connected_listener listener, void *data) + bt_graph_ports_connected_listener listener, + bt_graph_listener_removed listener_removed, void *data) { int ret; @@ -639,13 +750,21 @@ int bt_graph_add_ports_connected_listener( goto end; } + if (graph->in_remove_listener) { + BT_LOGW("Cannot call this function during the execution of a remove listener: " + "addr=%p", graph); + ret = -1; + goto end; + } + if (!listener) { BT_LOGW_STR("Invalid parameter: listener is NULL."); ret = -1; goto end; } - ret = add_listener(graph->listeners.ports_connected, listener, data); + ret = add_listener(graph->listeners.ports_connected, listener, + listener_removed, data); BT_LOGV("Added \"port connected\" listener to graph: " "graph-addr=%p, listener-addr=%p, pos=%d", graph, listener, ret); @@ -656,7 +775,8 @@ end: int bt_graph_add_ports_disconnected_listener( struct bt_graph *graph, - bt_graph_ports_disconnected_listener listener, void *data) + bt_graph_ports_disconnected_listener listener, + bt_graph_listener_removed listener_removed, void *data) { int ret; @@ -666,13 +786,21 @@ int bt_graph_add_ports_disconnected_listener( goto end; } + if (graph->in_remove_listener) { + BT_LOGW("Cannot call this function during the execution of a remove listener: " + "addr=%p", graph); + ret = -1; + goto end; + } + if (!listener) { BT_LOGW_STR("Invalid parameter: listener is NULL."); ret = -1; goto end; } - ret = add_listener(graph->listeners.ports_disconnected, listener, data); + ret = add_listener(graph->listeners.ports_disconnected, listener, + listener_removed, data); BT_LOGV("Added \"port disconnected\" listener to graph: " "graph-addr=%p, listener-addr=%p, pos=%d", graph, listener, ret); @@ -773,7 +901,7 @@ void bt_graph_notify_ports_disconnected(struct bt_graph *graph, } } -extern enum bt_graph_status bt_graph_cancel(struct bt_graph *graph) +enum bt_graph_status bt_graph_cancel(struct bt_graph *graph) { enum bt_graph_status ret = BT_GRAPH_STATUS_OK; @@ -790,9 +918,19 @@ end: return ret; } -extern bt_bool bt_graph_is_canceled(struct bt_graph *graph) +bt_bool bt_graph_is_canceled(struct bt_graph *graph) { - return graph ? graph->canceled : BT_FALSE; + bt_bool canceled = BT_FALSE; + + if (!graph) { + BT_LOGW_STR("Invalid parameter: graph is NULL."); + goto end; + } + + canceled = graph->canceled; + +end: + return canceled; } BT_HIDDEN @@ -805,3 +943,266 @@ void bt_graph_remove_connection(struct bt_graph *graph, graph, connection); g_ptr_array_remove(graph->connections, connection); } + +enum bt_graph_status bt_graph_add_component_with_init_method_data( + struct bt_graph *graph, + struct bt_component_class *component_class, + const char *name, struct bt_value *params, + void *init_method_data, + struct bt_component **user_component) +{ + enum bt_graph_status graph_status = BT_GRAPH_STATUS_OK; + enum bt_component_status comp_status; + struct bt_component *component = NULL; + enum bt_component_class_type type; + size_t i; + bt_bool init_can_consume; + + bt_get(params); + + if (!graph) { + BT_LOGW_STR("Invalid parameter: graph is NULL."); + graph_status = BT_GRAPH_STATUS_INVALID; + goto end; + } + init_can_consume = graph->can_consume; + + if (!component_class) { + BT_LOGW_STR("Invalid parameter: component class is NULL."); + graph_status = BT_GRAPH_STATUS_INVALID; + goto end; + } + + graph->can_consume = BT_FALSE; + type = bt_component_class_get_type(component_class); + BT_LOGD("Adding component to graph: " + "graph-addr=%p, comp-cls-addr=%p, " + "comp-cls-type=%s, name=\"%s\", params-addr=%p, " + "init-method-data-addr=%p", + graph, component_class, bt_component_class_type_string(type), + name, params, init_method_data); + + if (!name) { + BT_LOGW_STR("Invalid parameter: name is NULL."); + graph_status = BT_GRAPH_STATUS_INVALID; + goto end; + } + + if (graph->canceled) { + BT_LOGW_STR("Invalid parameter: graph is canceled."); + graph_status = BT_GRAPH_STATUS_CANCELED; + goto end; + } + + if (type != BT_COMPONENT_CLASS_TYPE_SOURCE && + type != BT_COMPONENT_CLASS_TYPE_FILTER && + type != BT_COMPONENT_CLASS_TYPE_SINK) { + BT_LOGW("Invalid parameter: unknown component class type: " + "type=%d", type); + graph_status = BT_GRAPH_STATUS_INVALID; + goto end; + } + + for (i = 0; i < graph->components->len; i++) { + void *other_comp = graph->components->pdata[i]; + + if (strcmp(name, bt_component_get_name(other_comp)) == 0) { + BT_LOGW("Invalid parameter: another component with the same name already exists in the graph: " + "other-comp-addr=%p, name=\"%s\"", + other_comp, name); + graph_status = BT_GRAPH_STATUS_INVALID; + goto end; + } + } + + /* + * Parameters must be a map value, but we create a convenient + * empty one if it's NULL. + */ + if (params) { + if (!bt_value_is_map(params)) { + BT_LOGW("Invalid parameter: initialization parameters must be a map value: " + "type=%s", + bt_value_type_string(bt_value_get_type(params))); + graph_status = BT_GRAPH_STATUS_INVALID; + goto end; + } + } else { + params = bt_value_map_create(); + if (!params) { + BT_LOGE_STR("Cannot create map value object."); + graph_status = BT_GRAPH_STATUS_NOMEM; + goto end; + } + } + + comp_status = bt_component_create(component_class, name, &component); + if (comp_status != BT_COMPONENT_STATUS_OK) { + BT_LOGE("Cannot create empty component object: status=%s", + bt_component_status_string(comp_status)); + graph_status = bt_graph_status_from_component_status( + comp_status); + goto end; + } + + /* + * The user's initialization method needs to see that this + * component is part of the graph. If the user method fails, we + * immediately remove the component from the graph's components. + */ + g_ptr_array_add(graph->components, component); + bt_component_set_graph(component, graph); + + if (component_class->methods.init) { + BT_LOGD_STR("Calling user's initialization method."); + comp_status = component_class->methods.init( + bt_private_component_from_component(component), params, + init_method_data); + BT_LOGD("User method returned: status=%s", + bt_component_status_string(comp_status)); + if (comp_status != BT_COMPONENT_STATUS_OK) { + BT_LOGW_STR("Initialization method failed."); + graph_status = bt_graph_status_from_component_status( + comp_status); + bt_component_set_graph(component, NULL); + g_ptr_array_remove_fast(graph->components, component); + goto end; + } + } + + /* + * Mark the component as initialized so that its finalization + * method is called when it is destroyed. + */ + component->initialized = true; + + /* + * If it's a sink component, it needs to be part of the graph's + * sink queue to be consumed by bt_graph_consume(). + */ + if (bt_component_is_sink(component)) { + graph->has_sink = BT_TRUE; + g_queue_push_tail(graph->sinks_to_consume, component); + } + + /* + * Freeze the component class now that it's instantiated at + * least once. + */ + BT_LOGD_STR("Freezing component class."); + bt_component_class_freeze(component->class); + BT_LOGD("Added component to graph: " + "graph-addr=%p, comp-cls-addr=%p, " + "comp-cls-type=%s, name=\"%s\", params-addr=%p, " + "init-method-data-addr=%p, comp-addr=%p", + graph, component_class, bt_component_class_type_string(type), + name, params, init_method_data, component); + + if (user_component) { + /* Move reference to user */ + *user_component = component; + component = NULL; + } + +end: + bt_put(component); + bt_put(params); + if (graph) { + graph->can_consume = init_can_consume; + } + return graph_status; +} + +enum bt_graph_status bt_graph_add_component( + struct bt_graph *graph, + struct bt_component_class *component_class, + const char *name, struct bt_value *params, + struct bt_component **component) +{ + return bt_graph_add_component_with_init_method_data(graph, + component_class, name, params, NULL, component); +} + +BT_HIDDEN +int bt_graph_remove_unconnected_component(struct bt_graph *graph, + struct bt_component *component) +{ + bt_bool init_can_consume; + int64_t count; + uint64_t i; + int ret = 0; + + assert(graph); + assert(component); + assert(component->base.ref_count.count == 0); + assert(bt_component_borrow_graph(component) == graph); + + init_can_consume = graph->can_consume; + count = bt_component_get_input_port_count(component); + + for (i = 0; i < count; i++) { + struct bt_port *port = + bt_component_get_input_port_by_index(component, i); + + assert(port); + bt_put(port); + + if (bt_port_is_connected(port)) { + BT_LOGW("Cannot remove component from graph: " + "an input port is connected: " + "graph-addr=%p, comp-addr=%p, " + "comp-name=\"%s\", connected-port-addr=%p, " + "connected-port-name=\"%s\"", + graph, component, + bt_component_get_name(component), + port, bt_port_get_name(port)); + goto error; + } + } + + count = bt_component_get_output_port_count(component); + + for (i = 0; i < count; i++) { + struct bt_port *port = + bt_component_get_output_port_by_index(component, i); + + assert(port); + bt_put(port); + + if (bt_port_is_connected(port)) { + BT_LOGW("Cannot remove component from graph: " + "an output port is connected: " + "graph-addr=%p, comp-addr=%p, " + "comp-name=\"%s\", connected-port-addr=%p, " + "connected-port-name=\"%s\"", + graph, component, + bt_component_get_name(component), + port, bt_port_get_name(port)); + goto error; + } + } + + graph->can_consume = BT_FALSE; + + /* Possibly remove from sinks to consume */ + (void) g_queue_remove(graph->sinks_to_consume, component); + + if (graph->sinks_to_consume->length == 0) { + graph->has_sink = BT_FALSE; + } + + /* + * This calls bt_object_release() on the component, and since + * its reference count is 0, its destructor is called. Its + * destructor calls the user's finalization method (if set). + */ + g_ptr_array_remove(graph->components, component); + goto end; + +error: + ret = -1; + +end: + graph->can_consume = init_can_consume; + return ret; +}