X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=lib%2Fgraph%2Fgraph.c;h=7db5b761a32c1fc28baf4e1a0c77a50ca1281d70;hb=3fea54f69edd1780566230255da196cb6e82df62;hp=e7f8db3f3e76f3b5c3e0f9b18a724a06ba6b00cd;hpb=36712f1d9ad9269638e493ca36a50979fe4da989;p=babeltrace.git diff --git a/lib/graph/graph.c b/lib/graph/graph.c index e7f8db3f..7db5b761 100644 --- a/lib/graph/graph.c +++ b/lib/graph/graph.c @@ -35,18 +35,56 @@ #include #include #include +#include +#include +#include #include #include #include #include +#include +#include #include #include struct bt_graph_listener { void *func; + bt_graph_listener_removed removed; void *data; }; +static +int init_listeners_array(GArray **listeners) +{ + int ret = 0; + + BT_ASSERT(listeners); + *listeners = g_array_new(FALSE, TRUE, sizeof(struct bt_graph_listener)); + if (!*listeners) { + BT_LOGE_STR("Failed to allocate one GArray."); + ret = -1; + goto end; + } + +end: + return ret; +} + +static +void call_remove_listeners(GArray *listeners) +{ + size_t i; + + for (i = 0; i < listeners->len; i++) { + struct bt_graph_listener listener = + g_array_index(listeners, struct bt_graph_listener, i); + + if (listener.removed) { + listener.removed(listener.data); + } + } +} + static void bt_graph_destroy(struct bt_object *obj) { @@ -80,7 +118,7 @@ void bt_graph_destroy(struct bt_object *obj) * ensures that this function is not called two times. */ BT_LOGD("Destroying graph: addr=%p", graph); - obj->ref_count.count++; + obj->ref_count++; /* * Cancel the graph to disallow some operations, like creating @@ -88,14 +126,26 @@ void bt_graph_destroy(struct bt_object *obj) */ (void) bt_graph_cancel(graph); + /* Call all remove listeners */ + call_remove_listeners(graph->listeners.port_added); + call_remove_listeners(graph->listeners.port_removed); + call_remove_listeners(graph->listeners.ports_connected); + call_remove_listeners(graph->listeners.ports_disconnected); + + if (graph->notifications) { + g_ptr_array_free(graph->notifications, TRUE); + } + if (graph->connections) { BT_LOGD_STR("Destroying connections."); g_ptr_array_free(graph->connections, TRUE); } + if (graph->components) { BT_LOGD_STR("Destroying components."); g_ptr_array_free(graph->components, TRUE); } + if (graph->sinks_to_consume) { g_queue_free(graph->sinks_to_consume); } @@ -116,24 +166,37 @@ void bt_graph_destroy(struct bt_object *obj) g_array_free(graph->listeners.ports_disconnected, TRUE); } + bt_object_pool_finalize(&graph->event_notif_pool); + bt_object_pool_finalize(&graph->packet_begin_notif_pool); + bt_object_pool_finalize(&graph->packet_end_notif_pool); g_free(graph); } static -int init_listeners_array(GArray **listeners) +void destroy_notification_event(struct bt_notification *notif, + struct bt_graph *graph) { - int ret = 0; + bt_notification_event_destroy(notif); +} - assert(listeners); - *listeners = g_array_new(FALSE, TRUE, sizeof(struct bt_graph_listener)); - if (!*listeners) { - BT_LOGE_STR("Failed to allocate one GArray."); - ret = -1; - goto end; - } +static +void destroy_notification_packet_begin(struct bt_notification *notif, + struct bt_graph *graph) +{ + bt_notification_packet_begin_destroy(notif); +} -end: - return ret; +static +void destroy_notification_packet_end(struct bt_notification *notif, + struct bt_graph *graph) +{ + bt_notification_packet_end_destroy(notif); +} + +static +void notify_notification_graph_is_destroyed(struct bt_notification *notif) +{ + bt_notification_unlink_graph(notif); } struct bt_graph *bt_graph_create(void) @@ -148,14 +211,15 @@ struct bt_graph *bt_graph_create(void) goto end; } - bt_object_init(graph, bt_graph_destroy); - - graph->connections = g_ptr_array_new_with_free_func(bt_object_release); + bt_object_init_shared(&graph->base, bt_graph_destroy); + graph->connections = g_ptr_array_new_with_free_func( + (GDestroyNotify) bt_object_try_spec_release); if (!graph->connections) { BT_LOGE_STR("Failed to allocate one GPtrArray."); goto error; } - graph->components = g_ptr_array_new_with_free_func(bt_object_release); + graph->components = g_ptr_array_new_with_free_func( + (GDestroyNotify) bt_object_try_spec_release); if (!graph->components) { BT_LOGE_STR("Failed to allocate one GPtrArray."); goto error; @@ -166,6 +230,7 @@ struct bt_graph *bt_graph_create(void) goto error; } + graph->can_consume = BT_TRUE; ret = init_listeners_array(&graph->listeners.port_added); if (ret) { BT_LOGE_STR("Cannot create the \"port added\" listener array."); @@ -190,6 +255,38 @@ struct bt_graph *bt_graph_create(void) goto error; } + ret = bt_object_pool_initialize(&graph->event_notif_pool, + (bt_object_pool_new_object_func) bt_notification_event_new, + (bt_object_pool_destroy_object_func) destroy_notification_event, + graph); + if (ret) { + BT_LOGE("Failed to initialize event notification pool: ret=%d", + ret); + goto error; + } + + ret = bt_object_pool_initialize(&graph->packet_begin_notif_pool, + (bt_object_pool_new_object_func) bt_notification_packet_begin_new, + (bt_object_pool_destroy_object_func) destroy_notification_packet_begin, + graph); + if (ret) { + BT_LOGE("Failed to initialize packet beginning notification pool: ret=%d", + ret); + goto error; + } + + ret = bt_object_pool_initialize(&graph->packet_end_notif_pool, + (bt_object_pool_new_object_func) bt_notification_packet_end_new, + (bt_object_pool_destroy_object_func) destroy_notification_packet_end, + graph); + if (ret) { + BT_LOGE("Failed to initialize packet end notification pool: ret=%d", + ret); + goto error; + } + + graph->notifications = g_ptr_array_new_with_free_func( + (GDestroyNotify) notify_notification_graph_is_destroyed); BT_LOGD("Created graph object: addr=%p", graph); end: @@ -210,12 +307,14 @@ enum bt_graph_status bt_graph_connect_ports(struct bt_graph *graph, struct bt_component *upstream_component = NULL; struct bt_component *downstream_component = NULL; enum bt_component_status component_status; + bt_bool init_can_consume; if (!graph) { BT_LOGW_STR("Invalid parameter: graph is NULL."); status = BT_GRAPH_STATUS_INVALID; goto end; } + init_can_consume = graph->can_consume; if (!upstream_port) { BT_LOGW_STR("Invalid parameter: upstream port is NULL."); @@ -242,6 +341,8 @@ enum bt_graph_status bt_graph_connect_ports(struct bt_graph *graph, goto end; } + graph->can_consume = BT_FALSE; + /* Ensure appropriate types for upstream and downstream ports. */ if (bt_port_get_type(upstream_port) != BT_PORT_TYPE_OUTPUT) { BT_LOGW_STR("Invalid parameter: upstream port is not an output port."); @@ -384,32 +485,24 @@ end: bt_put(upstream_component); bt_put(downstream_component); bt_put(connection); + if (graph) { + graph->can_consume = init_can_consume; + } return status; } static -enum bt_graph_status bt_graph_consume_no_check(struct bt_graph *graph) +enum bt_graph_status consume_graph_sink(struct bt_component *sink) { - struct bt_component *sink; enum bt_graph_status status = BT_GRAPH_STATUS_OK; enum bt_component_status comp_status; - GList *current_node; - - BT_LOGV("Making next sink consume: addr=%p", graph); - - if (g_queue_is_empty(graph->sinks_to_consume)) { - BT_LOGV_STR("Graph's sink queue is empty: end of graph."); - status = BT_GRAPH_STATUS_END; - goto end; - } - current_node = g_queue_pop_head_link(graph->sinks_to_consume); - sink = current_node->data; - BT_LOGV("Chose next sink to consume: comp-addr=%p, comp-name=\"%s\"", - sink, bt_component_get_name(sink)); + BT_ASSERT(sink); comp_status = bt_component_sink_consume(sink); - BT_LOGV("Consumed from sink: status=%s", + BT_LOGV("Consumed from sink: addr=%p, name=\"%s\", status=%s", + sink, bt_component_get_name(sink), bt_component_status_string(comp_status)); + switch (comp_status) { case BT_COMPONENT_STATUS_OK: break; @@ -427,47 +520,118 @@ enum bt_graph_status bt_graph_consume_no_check(struct bt_graph *graph) break; } + return status; +} + +/* + * `node` is removed from the queue of sinks to consume when passed to + * this function. This function adds it back to the queue if there's + * still something to consume afterwards. + */ +static +enum bt_graph_status consume_sink_node(struct bt_graph *graph, + GList *node) +{ + enum bt_graph_status status; + struct bt_component *sink; + + sink = node->data; + status = consume_graph_sink(sink); if (status != BT_GRAPH_STATUS_END) { - g_queue_push_tail_link(graph->sinks_to_consume, current_node); + g_queue_push_tail_link(graph->sinks_to_consume, node); goto end; } /* End reached, the node is not added back to the queue and free'd. */ - g_queue_delete_link(graph->sinks_to_consume, current_node); + g_queue_delete_link(graph->sinks_to_consume, node); /* Don't forward an END status if there are sinks left to consume. */ if (!g_queue_is_empty(graph->sinks_to_consume)) { status = BT_GRAPH_STATUS_OK; goto end; } + end: - BT_LOGV("Graph consumed: status=%s", bt_graph_status_string(status)); + BT_LOGV("Consumed sink node: status=%s", bt_graph_status_string(status)); return status; } -enum bt_graph_status bt_graph_consume(struct bt_graph *graph) +BT_HIDDEN +enum bt_graph_status bt_graph_consume_sink_no_check(struct bt_graph *graph, + struct bt_component *sink) { - enum bt_graph_status status = BT_GRAPH_STATUS_OK; + enum bt_graph_status status; + GList *sink_node; + int index; - if (!graph) { - BT_LOGW_STR("Invalid parameter: graph is NULL."); - status = BT_GRAPH_STATUS_INVALID; + BT_LOGV("Making specific sink consume: addr=%p, " + "comp-addr=%p, comp-name=\"%s\"", + graph, sink, bt_component_get_name(sink)); + + BT_ASSERT(bt_component_borrow_graph(sink) == graph); + + if (g_queue_is_empty(graph->sinks_to_consume)) { + BT_LOGV_STR("Graph's sink queue is empty: end of graph."); + status = BT_GRAPH_STATUS_END; goto end; } - if (graph->canceled) { - BT_LOGW("Invalid parameter: graph is canceled: " - "graph-addr=%p", graph); - status = BT_GRAPH_STATUS_CANCELED; + index = g_queue_index(graph->sinks_to_consume, sink); + if (index < 0) { + BT_LOGV_STR("Sink is not marked as consumable: sink is ended."); + status = BT_GRAPH_STATUS_END; goto end; } - status = bt_graph_consume_no_check(graph); + sink_node = g_queue_pop_nth_link(graph->sinks_to_consume, index); + BT_ASSERT(sink_node); + status = consume_sink_node(graph, sink_node); end: return status; } +BT_HIDDEN +enum bt_graph_status bt_graph_consume_no_check(struct bt_graph *graph) +{ + enum bt_graph_status status = BT_GRAPH_STATUS_OK; + struct bt_component *sink; + GList *current_node; + + BT_LOGV("Making next sink consume: addr=%p", graph); + BT_ASSERT_PRE(graph->has_sink, + "Graph has no sink component: %!+g", graph); + + if (g_queue_is_empty(graph->sinks_to_consume)) { + BT_LOGV_STR("Graph's sink queue is empty: end of graph."); + status = BT_GRAPH_STATUS_END; + goto end; + } + + current_node = g_queue_pop_head_link(graph->sinks_to_consume); + sink = current_node->data; + BT_LOGV("Chose next sink to consume: comp-addr=%p, comp-name=\"%s\"", + sink, bt_component_get_name(sink)); + status = consume_sink_node(graph, current_node); + +end: + return status; +} + +enum bt_graph_status bt_graph_consume(struct bt_graph *graph) +{ + enum bt_graph_status status; + + BT_ASSERT_PRE_NON_NULL(graph, "Graph"); + BT_ASSERT_PRE(!graph->canceled, "Graph is canceled: %!+g", graph); + BT_ASSERT_PRE(graph->can_consume, + "Cannot consume graph in its current state: %!+g", graph); + graph->can_consume = BT_FALSE; + status = bt_graph_consume_no_check(graph); + graph->can_consume = BT_TRUE; + return status; +} + enum bt_graph_status bt_graph_run(struct bt_graph *graph) { enum bt_graph_status status = BT_GRAPH_STATUS_OK; @@ -485,6 +649,13 @@ enum bt_graph_status bt_graph_run(struct bt_graph *graph) goto end; } + if (!graph->can_consume) { + BT_LOGW_STR("Cannot run graph in its current state."); + status = BT_GRAPH_STATUS_CANNOT_CONSUME; + goto end; + } + + graph->can_consume = BT_FALSE; BT_LOGV("Running graph: addr=%p", graph); do { @@ -501,7 +672,7 @@ enum bt_graph_status bt_graph_run(struct bt_graph *graph) goto end; } - status = bt_graph_consume(graph); + status = bt_graph_consume_no_check(graph); if (status == BT_GRAPH_STATUS_AGAIN) { /* * If AGAIN is received and there are multiple @@ -517,6 +688,8 @@ enum bt_graph_status bt_graph_run(struct bt_graph *graph) if (graph->sinks_to_consume->length > 1) { status = BT_GRAPH_STATUS_OK; } + } else if (status == BT_GRAPH_STATUS_NO_SINK) { + goto end; } } while (status == BT_GRAPH_STATUS_OK); @@ -526,14 +699,18 @@ enum bt_graph_status bt_graph_run(struct bt_graph *graph) end: BT_LOGV("Graph ran: status=%s", bt_graph_status_string(status)); + if (graph) { + graph->can_consume = BT_TRUE; + } return status; } static -int add_listener(GArray *listeners, void *func, void *data) +int add_listener(GArray *listeners, void *func, void *removed, void *data) { struct bt_graph_listener listener = { .func = func, + .removed = removed, .data = data, }; @@ -543,7 +720,8 @@ int add_listener(GArray *listeners, void *func, void *data) int bt_graph_add_port_added_listener( struct bt_graph *graph, - bt_graph_port_added_listener listener, void *data) + bt_graph_port_added_listener listener, + bt_graph_listener_removed listener_removed, void *data) { int ret; @@ -553,13 +731,21 @@ int bt_graph_add_port_added_listener( goto end; } + if (graph->in_remove_listener) { + BT_LOGW("Cannot call this function during the execution of a remove listener: " + "addr=%p", graph); + ret = -1; + goto end; + } + if (!listener) { BT_LOGW_STR("Invalid parameter: listener is NULL."); ret = -1; goto end; } - ret = add_listener(graph->listeners.port_added, listener, data); + ret = add_listener(graph->listeners.port_added, listener, + listener_removed, data); BT_LOGV("Added \"port added\" listener to graph: " "graph-addr=%p, listener-addr=%p, pos=%d", graph, listener, ret); @@ -570,7 +756,8 @@ end: int bt_graph_add_port_removed_listener( struct bt_graph *graph, - bt_graph_port_removed_listener listener, void *data) + bt_graph_port_removed_listener listener, + bt_graph_listener_removed listener_removed, void *data) { int ret; @@ -580,13 +767,21 @@ int bt_graph_add_port_removed_listener( goto end; } + if (graph->in_remove_listener) { + BT_LOGW("Cannot call this function during the execution of a remove listener: " + "addr=%p", graph); + ret = -1; + goto end; + } + if (!listener) { BT_LOGW_STR("Invalid parameter: listener is NULL."); ret = -1; goto end; } - ret = add_listener(graph->listeners.port_removed, listener, data); + ret = add_listener(graph->listeners.port_removed, listener, + listener_removed, data); BT_LOGV("Added \"port removed\" listener to graph: " "graph-addr=%p, listener-addr=%p, pos=%d", graph, listener, ret); @@ -597,7 +792,8 @@ end: int bt_graph_add_ports_connected_listener( struct bt_graph *graph, - bt_graph_ports_connected_listener listener, void *data) + bt_graph_ports_connected_listener listener, + bt_graph_listener_removed listener_removed, void *data) { int ret; @@ -607,13 +803,21 @@ int bt_graph_add_ports_connected_listener( goto end; } + if (graph->in_remove_listener) { + BT_LOGW("Cannot call this function during the execution of a remove listener: " + "addr=%p", graph); + ret = -1; + goto end; + } + if (!listener) { BT_LOGW_STR("Invalid parameter: listener is NULL."); ret = -1; goto end; } - ret = add_listener(graph->listeners.ports_connected, listener, data); + ret = add_listener(graph->listeners.ports_connected, listener, + listener_removed, data); BT_LOGV("Added \"port connected\" listener to graph: " "graph-addr=%p, listener-addr=%p, pos=%d", graph, listener, ret); @@ -624,7 +828,8 @@ end: int bt_graph_add_ports_disconnected_listener( struct bt_graph *graph, - bt_graph_ports_disconnected_listener listener, void *data) + bt_graph_ports_disconnected_listener listener, + bt_graph_listener_removed listener_removed, void *data) { int ret; @@ -634,13 +839,21 @@ int bt_graph_add_ports_disconnected_listener( goto end; } + if (graph->in_remove_listener) { + BT_LOGW("Cannot call this function during the execution of a remove listener: " + "addr=%p", graph); + ret = -1; + goto end; + } + if (!listener) { BT_LOGW_STR("Invalid parameter: listener is NULL."); ret = -1; goto end; } - ret = add_listener(graph->listeners.ports_disconnected, listener, data); + ret = add_listener(graph->listeners.ports_disconnected, listener, + listener_removed, data); BT_LOGV("Added \"port disconnected\" listener to graph: " "graph-addr=%p, listener-addr=%p, pos=%d", graph, listener, ret); @@ -664,7 +877,7 @@ void bt_graph_notify_port_added(struct bt_graph *graph, struct bt_port *port) struct bt_graph_listener, i); bt_graph_port_added_listener func = listener.func; - assert(func); + BT_ASSERT(func); func(port, listener.data); } } @@ -685,7 +898,7 @@ void bt_graph_notify_port_removed(struct bt_graph *graph, struct bt_graph_listener, i); bt_graph_port_removed_listener func = listener.func; - assert(func); + BT_ASSERT(func); func(comp, port, listener.data); } } @@ -709,7 +922,7 @@ void bt_graph_notify_ports_connected(struct bt_graph *graph, struct bt_graph_listener, i); bt_graph_ports_connected_listener func = listener.func; - assert(func); + BT_ASSERT(func); func(upstream_port, downstream_port, listener.data); } } @@ -735,13 +948,13 @@ void bt_graph_notify_ports_disconnected(struct bt_graph *graph, struct bt_graph_listener, i); bt_graph_ports_disconnected_listener func = listener.func; - assert(func); + BT_ASSERT(func); func(upstream_comp, downstream_comp, upstream_port, downstream_port, listener.data); } } -extern enum bt_graph_status bt_graph_cancel(struct bt_graph *graph) +enum bt_graph_status bt_graph_cancel(struct bt_graph *graph) { enum bt_graph_status ret = BT_GRAPH_STATUS_OK; @@ -758,17 +971,27 @@ end: return ret; } -extern bt_bool bt_graph_is_canceled(struct bt_graph *graph) +bt_bool bt_graph_is_canceled(struct bt_graph *graph) { - return graph ? graph->canceled : BT_FALSE; + bt_bool canceled = BT_FALSE; + + if (!graph) { + BT_LOGW_STR("Invalid parameter: graph is NULL."); + goto end; + } + + canceled = graph->canceled; + +end: + return canceled; } BT_HIDDEN void bt_graph_remove_connection(struct bt_graph *graph, struct bt_connection *connection) { - assert(graph); - assert(connection); + BT_ASSERT(graph); + BT_ASSERT(connection); BT_LOGV("Removing graph's connection: graph-addr=%p, conn-addr=%p", graph, connection); g_ptr_array_remove(graph->connections, connection); @@ -786,6 +1009,7 @@ enum bt_graph_status bt_graph_add_component_with_init_method_data( struct bt_component *component = NULL; enum bt_component_class_type type; size_t i; + bt_bool init_can_consume; bt_get(params); @@ -794,6 +1018,7 @@ enum bt_graph_status bt_graph_add_component_with_init_method_data( graph_status = BT_GRAPH_STATUS_INVALID; goto end; } + init_can_consume = graph->can_consume; if (!component_class) { BT_LOGW_STR("Invalid parameter: component class is NULL."); @@ -801,6 +1026,7 @@ enum bt_graph_status bt_graph_add_component_with_init_method_data( goto end; } + graph->can_consume = BT_FALSE; type = bt_component_class_get_type(component_class); BT_LOGD("Adding component to graph: " "graph-addr=%p, comp-cls-addr=%p, " @@ -908,6 +1134,7 @@ enum bt_graph_status bt_graph_add_component_with_init_method_data( * sink queue to be consumed by bt_graph_consume(). */ if (bt_component_is_sink(component)) { + graph->has_sink = BT_TRUE; g_queue_push_tail(graph->sinks_to_consume, component); } @@ -933,6 +1160,9 @@ enum bt_graph_status bt_graph_add_component_with_init_method_data( end: bt_put(component); bt_put(params); + if (graph) { + graph->can_consume = init_can_consume; + } return graph_status; } @@ -945,3 +1175,105 @@ enum bt_graph_status bt_graph_add_component( return bt_graph_add_component_with_init_method_data(graph, component_class, name, params, NULL, component); } + +BT_HIDDEN +int bt_graph_remove_unconnected_component(struct bt_graph *graph, + struct bt_component *component) +{ + bt_bool init_can_consume; + int64_t count; + uint64_t i; + int ret = 0; + + BT_ASSERT(graph); + BT_ASSERT(component); + BT_ASSERT(component->base.ref_count == 0); + BT_ASSERT(bt_component_borrow_graph(component) == graph); + + init_can_consume = graph->can_consume; + count = bt_component_get_input_port_count(component); + + for (i = 0; i < count; i++) { + struct bt_port *port = + bt_component_get_input_port_by_index(component, i); + + BT_ASSERT(port); + bt_put(port); + + if (bt_port_is_connected(port)) { + BT_LOGW("Cannot remove component from graph: " + "an input port is connected: " + "graph-addr=%p, comp-addr=%p, " + "comp-name=\"%s\", connected-port-addr=%p, " + "connected-port-name=\"%s\"", + graph, component, + bt_component_get_name(component), + port, bt_port_get_name(port)); + goto error; + } + } + + count = bt_component_get_output_port_count(component); + + for (i = 0; i < count; i++) { + struct bt_port *port = + bt_component_get_output_port_by_index(component, i); + + BT_ASSERT(port); + bt_put(port); + + if (bt_port_is_connected(port)) { + BT_LOGW("Cannot remove component from graph: " + "an output port is connected: " + "graph-addr=%p, comp-addr=%p, " + "comp-name=\"%s\", connected-port-addr=%p, " + "connected-port-name=\"%s\"", + graph, component, + bt_component_get_name(component), + port, bt_port_get_name(port)); + goto error; + } + } + + graph->can_consume = BT_FALSE; + + /* Possibly remove from sinks to consume */ + (void) g_queue_remove(graph->sinks_to_consume, component); + + if (graph->sinks_to_consume->length == 0) { + graph->has_sink = BT_FALSE; + } + + /* + * This calls bt_object_try_spec_release() on the component, and + * since its reference count is 0, its destructor is called. Its + * destructor calls the user's finalization method (if set). + */ + g_ptr_array_remove(graph->components, component); + goto end; + +error: + ret = -1; + +end: + graph->can_consume = init_can_consume; + return ret; +} + +BT_HIDDEN +void bt_graph_add_notification(struct bt_graph *graph, + struct bt_notification *notif) +{ + BT_ASSERT(graph); + BT_ASSERT(notif); + + /* + * It's okay not to take a reference because, when a + * notification's reference count drops to 0, either: + * + * * It is recycled back to one of this graph's pool. + * * It is destroyed because it doesn't have any link to any + * graph, which means the original graph is already destroyed. + */ + g_ptr_array_add(graph->notifications, notif); +}