X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=lib%2Fgraph%2Fgraph.c;h=7db5b761a32c1fc28baf4e1a0c77a50ca1281d70;hb=3fea54f69edd1780566230255da196cb6e82df62;hp=b6460f74e228bd684c6ad9f59b8fa139f440475a;hpb=1d9157896dc759fcf7da3803523cf32a2400f1c0;p=babeltrace.git diff --git a/lib/graph/graph.c b/lib/graph/graph.c index b6460f74..7db5b761 100644 --- a/lib/graph/graph.c +++ b/lib/graph/graph.c @@ -35,10 +35,15 @@ #include #include #include +#include +#include +#include #include #include #include #include +#include +#include #include #include @@ -53,7 +58,7 @@ int init_listeners_array(GArray **listeners) { int ret = 0; - assert(listeners); + BT_ASSERT(listeners); *listeners = g_array_new(FALSE, TRUE, sizeof(struct bt_graph_listener)); if (!*listeners) { BT_LOGE_STR("Failed to allocate one GArray."); @@ -113,7 +118,7 @@ void bt_graph_destroy(struct bt_object *obj) * ensures that this function is not called two times. */ BT_LOGD("Destroying graph: addr=%p", graph); - obj->ref_count.count++; + obj->ref_count++; /* * Cancel the graph to disallow some operations, like creating @@ -127,14 +132,20 @@ void bt_graph_destroy(struct bt_object *obj) call_remove_listeners(graph->listeners.ports_connected); call_remove_listeners(graph->listeners.ports_disconnected); + if (graph->notifications) { + g_ptr_array_free(graph->notifications, TRUE); + } + if (graph->connections) { BT_LOGD_STR("Destroying connections."); g_ptr_array_free(graph->connections, TRUE); } + if (graph->components) { BT_LOGD_STR("Destroying components."); g_ptr_array_free(graph->components, TRUE); } + if (graph->sinks_to_consume) { g_queue_free(graph->sinks_to_consume); } @@ -155,9 +166,39 @@ void bt_graph_destroy(struct bt_object *obj) g_array_free(graph->listeners.ports_disconnected, TRUE); } + bt_object_pool_finalize(&graph->event_notif_pool); + bt_object_pool_finalize(&graph->packet_begin_notif_pool); + bt_object_pool_finalize(&graph->packet_end_notif_pool); g_free(graph); } +static +void destroy_notification_event(struct bt_notification *notif, + struct bt_graph *graph) +{ + bt_notification_event_destroy(notif); +} + +static +void destroy_notification_packet_begin(struct bt_notification *notif, + struct bt_graph *graph) +{ + bt_notification_packet_begin_destroy(notif); +} + +static +void destroy_notification_packet_end(struct bt_notification *notif, + struct bt_graph *graph) +{ + bt_notification_packet_end_destroy(notif); +} + +static +void notify_notification_graph_is_destroyed(struct bt_notification *notif) +{ + bt_notification_unlink_graph(notif); +} + struct bt_graph *bt_graph_create(void) { struct bt_graph *graph; @@ -170,14 +211,15 @@ struct bt_graph *bt_graph_create(void) goto end; } - bt_object_init(graph, bt_graph_destroy); - - graph->connections = g_ptr_array_new_with_free_func(bt_object_release); + bt_object_init_shared(&graph->base, bt_graph_destroy); + graph->connections = g_ptr_array_new_with_free_func( + (GDestroyNotify) bt_object_try_spec_release); if (!graph->connections) { BT_LOGE_STR("Failed to allocate one GPtrArray."); goto error; } - graph->components = g_ptr_array_new_with_free_func(bt_object_release); + graph->components = g_ptr_array_new_with_free_func( + (GDestroyNotify) bt_object_try_spec_release); if (!graph->components) { BT_LOGE_STR("Failed to allocate one GPtrArray."); goto error; @@ -213,6 +255,38 @@ struct bt_graph *bt_graph_create(void) goto error; } + ret = bt_object_pool_initialize(&graph->event_notif_pool, + (bt_object_pool_new_object_func) bt_notification_event_new, + (bt_object_pool_destroy_object_func) destroy_notification_event, + graph); + if (ret) { + BT_LOGE("Failed to initialize event notification pool: ret=%d", + ret); + goto error; + } + + ret = bt_object_pool_initialize(&graph->packet_begin_notif_pool, + (bt_object_pool_new_object_func) bt_notification_packet_begin_new, + (bt_object_pool_destroy_object_func) destroy_notification_packet_begin, + graph); + if (ret) { + BT_LOGE("Failed to initialize packet beginning notification pool: ret=%d", + ret); + goto error; + } + + ret = bt_object_pool_initialize(&graph->packet_end_notif_pool, + (bt_object_pool_new_object_func) bt_notification_packet_end_new, + (bt_object_pool_destroy_object_func) destroy_notification_packet_end, + graph); + if (ret) { + BT_LOGE("Failed to initialize packet end notification pool: ret=%d", + ret); + goto error; + } + + graph->notifications = g_ptr_array_new_with_free_func( + (GDestroyNotify) notify_notification_graph_is_destroyed); BT_LOGD("Created graph object: addr=%p", graph); end: @@ -233,13 +307,14 @@ enum bt_graph_status bt_graph_connect_ports(struct bt_graph *graph, struct bt_component *upstream_component = NULL; struct bt_component *downstream_component = NULL; enum bt_component_status component_status; - const bt_bool init_can_consume = graph->can_consume; + bt_bool init_can_consume; if (!graph) { BT_LOGW_STR("Invalid parameter: graph is NULL."); status = BT_GRAPH_STATUS_INVALID; goto end; } + init_can_consume = graph->can_consume; if (!upstream_port) { BT_LOGW_STR("Invalid parameter: upstream port is NULL."); @@ -410,39 +485,24 @@ end: bt_put(upstream_component); bt_put(downstream_component); bt_put(connection); - graph->can_consume = init_can_consume; + if (graph) { + graph->can_consume = init_can_consume; + } return status; } static -enum bt_graph_status bt_graph_consume_no_check(struct bt_graph *graph) +enum bt_graph_status consume_graph_sink(struct bt_component *sink) { - struct bt_component *sink; enum bt_graph_status status = BT_GRAPH_STATUS_OK; enum bt_component_status comp_status; - GList *current_node; - BT_LOGV("Making next sink consume: addr=%p", graph); - - if (!graph->has_sink) { - BT_LOGW_STR("Graph has no sink component."); - status = BT_GRAPH_STATUS_NO_SINK; - goto end; - } - - if (g_queue_is_empty(graph->sinks_to_consume)) { - BT_LOGV_STR("Graph's sink queue is empty: end of graph."); - status = BT_GRAPH_STATUS_END; - goto end; - } - - current_node = g_queue_pop_head_link(graph->sinks_to_consume); - sink = current_node->data; - BT_LOGV("Chose next sink to consume: comp-addr=%p, comp-name=\"%s\"", - sink, bt_component_get_name(sink)); + BT_ASSERT(sink); comp_status = bt_component_sink_consume(sink); - BT_LOGV("Consumed from sink: status=%s", + BT_LOGV("Consumed from sink: addr=%p, name=\"%s\", status=%s", + sink, bt_component_get_name(sink), bt_component_status_string(comp_status)); + switch (comp_status) { case BT_COMPONENT_STATUS_OK: break; @@ -460,52 +520,115 @@ enum bt_graph_status bt_graph_consume_no_check(struct bt_graph *graph) break; } + return status; +} + +/* + * `node` is removed from the queue of sinks to consume when passed to + * this function. This function adds it back to the queue if there's + * still something to consume afterwards. + */ +static +enum bt_graph_status consume_sink_node(struct bt_graph *graph, + GList *node) +{ + enum bt_graph_status status; + struct bt_component *sink; + + sink = node->data; + status = consume_graph_sink(sink); if (status != BT_GRAPH_STATUS_END) { - g_queue_push_tail_link(graph->sinks_to_consume, current_node); + g_queue_push_tail_link(graph->sinks_to_consume, node); goto end; } /* End reached, the node is not added back to the queue and free'd. */ - g_queue_delete_link(graph->sinks_to_consume, current_node); + g_queue_delete_link(graph->sinks_to_consume, node); /* Don't forward an END status if there are sinks left to consume. */ if (!g_queue_is_empty(graph->sinks_to_consume)) { status = BT_GRAPH_STATUS_OK; goto end; } + end: - BT_LOGV("Graph consumed: status=%s", bt_graph_status_string(status)); + BT_LOGV("Consumed sink node: status=%s", bt_graph_status_string(status)); return status; } -enum bt_graph_status bt_graph_consume(struct bt_graph *graph) +BT_HIDDEN +enum bt_graph_status bt_graph_consume_sink_no_check(struct bt_graph *graph, + struct bt_component *sink) { - enum bt_graph_status status = BT_GRAPH_STATUS_OK; + enum bt_graph_status status; + GList *sink_node; + int index; - if (!graph) { - BT_LOGW_STR("Invalid parameter: graph is NULL."); - status = BT_GRAPH_STATUS_INVALID; + BT_LOGV("Making specific sink consume: addr=%p, " + "comp-addr=%p, comp-name=\"%s\"", + graph, sink, bt_component_get_name(sink)); + + BT_ASSERT(bt_component_borrow_graph(sink) == graph); + + if (g_queue_is_empty(graph->sinks_to_consume)) { + BT_LOGV_STR("Graph's sink queue is empty: end of graph."); + status = BT_GRAPH_STATUS_END; goto end; } - if (graph->canceled) { - BT_LOGW("Invalid parameter: graph is canceled: " - "graph-addr=%p", graph); - status = BT_GRAPH_STATUS_CANCELED; + index = g_queue_index(graph->sinks_to_consume, sink); + if (index < 0) { + BT_LOGV_STR("Sink is not marked as consumable: sink is ended."); + status = BT_GRAPH_STATUS_END; goto end; } - if (!graph->can_consume) { - BT_LOGW_STR("Cannot consume graph in its current state."); - status = BT_GRAPH_STATUS_CANNOT_CONSUME; + sink_node = g_queue_pop_nth_link(graph->sinks_to_consume, index); + BT_ASSERT(sink_node); + status = consume_sink_node(graph, sink_node); + +end: + return status; +} + +BT_HIDDEN +enum bt_graph_status bt_graph_consume_no_check(struct bt_graph *graph) +{ + enum bt_graph_status status = BT_GRAPH_STATUS_OK; + struct bt_component *sink; + GList *current_node; + + BT_LOGV("Making next sink consume: addr=%p", graph); + BT_ASSERT_PRE(graph->has_sink, + "Graph has no sink component: %!+g", graph); + + if (g_queue_is_empty(graph->sinks_to_consume)) { + BT_LOGV_STR("Graph's sink queue is empty: end of graph."); + status = BT_GRAPH_STATUS_END; goto end; } + current_node = g_queue_pop_head_link(graph->sinks_to_consume); + sink = current_node->data; + BT_LOGV("Chose next sink to consume: comp-addr=%p, comp-name=\"%s\"", + sink, bt_component_get_name(sink)); + status = consume_sink_node(graph, current_node); + +end: + return status; +} + +enum bt_graph_status bt_graph_consume(struct bt_graph *graph) +{ + enum bt_graph_status status; + + BT_ASSERT_PRE_NON_NULL(graph, "Graph"); + BT_ASSERT_PRE(!graph->canceled, "Graph is canceled: %!+g", graph); + BT_ASSERT_PRE(graph->can_consume, + "Cannot consume graph in its current state: %!+g", graph); graph->can_consume = BT_FALSE; status = bt_graph_consume_no_check(graph); graph->can_consume = BT_TRUE; - -end: return status; } @@ -576,7 +699,9 @@ enum bt_graph_status bt_graph_run(struct bt_graph *graph) end: BT_LOGV("Graph ran: status=%s", bt_graph_status_string(status)); - graph->can_consume = BT_TRUE; + if (graph) { + graph->can_consume = BT_TRUE; + } return status; } @@ -752,7 +877,7 @@ void bt_graph_notify_port_added(struct bt_graph *graph, struct bt_port *port) struct bt_graph_listener, i); bt_graph_port_added_listener func = listener.func; - assert(func); + BT_ASSERT(func); func(port, listener.data); } } @@ -773,7 +898,7 @@ void bt_graph_notify_port_removed(struct bt_graph *graph, struct bt_graph_listener, i); bt_graph_port_removed_listener func = listener.func; - assert(func); + BT_ASSERT(func); func(comp, port, listener.data); } } @@ -797,7 +922,7 @@ void bt_graph_notify_ports_connected(struct bt_graph *graph, struct bt_graph_listener, i); bt_graph_ports_connected_listener func = listener.func; - assert(func); + BT_ASSERT(func); func(upstream_port, downstream_port, listener.data); } } @@ -823,7 +948,7 @@ void bt_graph_notify_ports_disconnected(struct bt_graph *graph, struct bt_graph_listener, i); bt_graph_ports_disconnected_listener func = listener.func; - assert(func); + BT_ASSERT(func); func(upstream_comp, downstream_comp, upstream_port, downstream_port, listener.data); } @@ -865,8 +990,8 @@ BT_HIDDEN void bt_graph_remove_connection(struct bt_graph *graph, struct bt_connection *connection) { - assert(graph); - assert(connection); + BT_ASSERT(graph); + BT_ASSERT(connection); BT_LOGV("Removing graph's connection: graph-addr=%p, conn-addr=%p", graph, connection); g_ptr_array_remove(graph->connections, connection); @@ -884,7 +1009,7 @@ enum bt_graph_status bt_graph_add_component_with_init_method_data( struct bt_component *component = NULL; enum bt_component_class_type type; size_t i; - const bt_bool init_can_consume = graph->can_consume; + bt_bool init_can_consume; bt_get(params); @@ -893,6 +1018,7 @@ enum bt_graph_status bt_graph_add_component_with_init_method_data( graph_status = BT_GRAPH_STATUS_INVALID; goto end; } + init_can_consume = graph->can_consume; if (!component_class) { BT_LOGW_STR("Invalid parameter: component class is NULL."); @@ -1034,7 +1160,9 @@ enum bt_graph_status bt_graph_add_component_with_init_method_data( end: bt_put(component); bt_put(params); - graph->can_consume = init_can_consume; + if (graph) { + graph->can_consume = init_can_consume; + } return graph_status; } @@ -1047,3 +1175,105 @@ enum bt_graph_status bt_graph_add_component( return bt_graph_add_component_with_init_method_data(graph, component_class, name, params, NULL, component); } + +BT_HIDDEN +int bt_graph_remove_unconnected_component(struct bt_graph *graph, + struct bt_component *component) +{ + bt_bool init_can_consume; + int64_t count; + uint64_t i; + int ret = 0; + + BT_ASSERT(graph); + BT_ASSERT(component); + BT_ASSERT(component->base.ref_count == 0); + BT_ASSERT(bt_component_borrow_graph(component) == graph); + + init_can_consume = graph->can_consume; + count = bt_component_get_input_port_count(component); + + for (i = 0; i < count; i++) { + struct bt_port *port = + bt_component_get_input_port_by_index(component, i); + + BT_ASSERT(port); + bt_put(port); + + if (bt_port_is_connected(port)) { + BT_LOGW("Cannot remove component from graph: " + "an input port is connected: " + "graph-addr=%p, comp-addr=%p, " + "comp-name=\"%s\", connected-port-addr=%p, " + "connected-port-name=\"%s\"", + graph, component, + bt_component_get_name(component), + port, bt_port_get_name(port)); + goto error; + } + } + + count = bt_component_get_output_port_count(component); + + for (i = 0; i < count; i++) { + struct bt_port *port = + bt_component_get_output_port_by_index(component, i); + + BT_ASSERT(port); + bt_put(port); + + if (bt_port_is_connected(port)) { + BT_LOGW("Cannot remove component from graph: " + "an output port is connected: " + "graph-addr=%p, comp-addr=%p, " + "comp-name=\"%s\", connected-port-addr=%p, " + "connected-port-name=\"%s\"", + graph, component, + bt_component_get_name(component), + port, bt_port_get_name(port)); + goto error; + } + } + + graph->can_consume = BT_FALSE; + + /* Possibly remove from sinks to consume */ + (void) g_queue_remove(graph->sinks_to_consume, component); + + if (graph->sinks_to_consume->length == 0) { + graph->has_sink = BT_FALSE; + } + + /* + * This calls bt_object_try_spec_release() on the component, and + * since its reference count is 0, its destructor is called. Its + * destructor calls the user's finalization method (if set). + */ + g_ptr_array_remove(graph->components, component); + goto end; + +error: + ret = -1; + +end: + graph->can_consume = init_can_consume; + return ret; +} + +BT_HIDDEN +void bt_graph_add_notification(struct bt_graph *graph, + struct bt_notification *notif) +{ + BT_ASSERT(graph); + BT_ASSERT(notif); + + /* + * It's okay not to take a reference because, when a + * notification's reference count drops to 0, either: + * + * * It is recycled back to one of this graph's pool. + * * It is destroyed because it doesn't have any link to any + * graph, which means the original graph is already destroyed. + */ + g_ptr_array_add(graph->notifications, notif); +}