X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Flib%2Fgraph%2Fgraph.c;h=3c9be9ef28c358a025f429748533d9d5b4fddae0;hb=1d5d041416eed2a830515eafb32fc774470e9247;hp=cf112fbd8393907c0f7a8f5637188fadeec0f606;hpb=a8f90e5d4a5876324aaefc98c3314b8a35c86644;p=babeltrace.git diff --git a/src/lib/graph/graph.c b/src/lib/graph/graph.c index cf112fbd..3c9be9ef 100644 --- a/src/lib/graph/graph.c +++ b/src/lib/graph/graph.c @@ -42,10 +42,12 @@ #include #include +#include "component-class-sink-simple.h" #include "component.h" #include "component-sink.h" #include "connection.h" #include "graph.h" +#include "interrupter.h" #include "message/event.h" #include "message/packet.h" @@ -56,8 +58,8 @@ typedef enum bt_graph_listener_func_status (*ports_connected_func_t)(const void *, const void *, const void *, const void *, void *); -typedef enum bt_component_class_init_method_status -(*comp_init_method_t)(const void *, const void *, void *); +typedef enum bt_component_class_initialize_method_status +(*comp_init_method_t)(const void *, void *, const void *, void *); struct bt_graph_listener { bt_graph_listener_removed_func removed; @@ -133,12 +135,7 @@ void destroy_graph(struct bt_object *obj) */ BT_LIB_LOGI("Destroying graph: %!+g", graph); obj->ref_count++; - - /* - * Cancel the graph to disallow some operations, like creating - * message iterators and adding ports to components. - */ - (void) bt_graph_cancel((void *) graph); + graph->config_state = BT_GRAPH_CONFIGURATION_STATE_DESTROYING; /* Call all remove listeners */ CALL_REMOVE_LISTENERS(struct bt_graph_listener_port_added, @@ -175,6 +172,14 @@ void destroy_graph(struct bt_object *obj) graph->components = NULL; } + if (graph->interrupters) { + BT_LOGD_STR("Putting interrupters."); + g_ptr_array_free(graph->interrupters, TRUE); + graph->interrupters = NULL; + } + + BT_OBJECT_PUT_REF_AND_RESET(graph->default_interrupter); + if (graph->sinks_to_consume) { g_queue_free(graph->sinks_to_consume); graph->sinks_to_consume = NULL; @@ -257,11 +262,15 @@ void notify_message_graph_is_destroyed(struct bt_message *msg) bt_message_unlink_graph(msg); } -struct bt_graph *bt_graph_create(void) +struct bt_graph *bt_graph_create(uint64_t mip_version) { struct bt_graph *graph; int ret; + BT_ASSERT_PRE(mip_version <= bt_get_maximal_mip_version(), + "Unknown MIP version: mip-version=%" PRIu64 ", " + "max-mip-version=%" PRIu64, + mip_version, bt_get_maximal_mip_version()); BT_LOGI_STR("Creating graph object."); graph = g_new0(struct bt_graph, 1); if (!graph) { @@ -270,6 +279,7 @@ struct bt_graph *bt_graph_create(void) } bt_object_init_shared(&graph->base, destroy_graph); + graph->mip_version = mip_version; graph->connections = g_ptr_array_new_with_free_func( (GDestroyNotify) bt_object_try_spec_release); if (!graph->connections) { @@ -293,7 +303,6 @@ struct bt_graph *bt_graph_create(void) graph->listeners.source_output_port_added); if (!graph->listeners.source_output_port_added) { - ret = -1; goto error; } @@ -301,7 +310,6 @@ struct bt_graph *bt_graph_create(void) graph->listeners.filter_output_port_added); if (!graph->listeners.filter_output_port_added) { - ret = -1; goto error; } @@ -309,7 +317,6 @@ struct bt_graph *bt_graph_create(void) graph->listeners.filter_input_port_added); if (!graph->listeners.filter_input_port_added) { - ret = -1; goto error; } @@ -317,7 +324,6 @@ struct bt_graph *bt_graph_create(void) graph->listeners.sink_input_port_added); if (!graph->listeners.sink_input_port_added) { - ret = -1; goto error; } @@ -325,7 +331,6 @@ struct bt_graph *bt_graph_create(void) graph->listeners.source_filter_ports_connected); if (!graph->listeners.source_filter_ports_connected) { - ret = -1; goto error; } @@ -333,7 +338,6 @@ struct bt_graph *bt_graph_create(void) graph->listeners.source_sink_ports_connected); if (!graph->listeners.source_sink_ports_connected) { - ret = -1; goto error; } @@ -341,7 +345,6 @@ struct bt_graph *bt_graph_create(void) graph->listeners.filter_filter_ports_connected); if (!graph->listeners.filter_filter_ports_connected) { - ret = -1; goto error; } @@ -349,10 +352,24 @@ struct bt_graph *bt_graph_create(void) graph->listeners.filter_sink_ports_connected); if (!graph->listeners.filter_sink_ports_connected) { - ret = -1; goto error; } + graph->interrupters = g_ptr_array_new_with_free_func( + (GDestroyNotify) bt_object_put_ref_no_null_check); + if (!graph->interrupters) { + BT_LIB_LOGE_APPEND_CAUSE("Failed to allocate one GPtrArray."); + goto error; + } + + graph->default_interrupter = bt_interrupter_create(); + if (!graph->default_interrupter) { + BT_LIB_LOGE_APPEND_CAUSE( + "Failed to create one interrupter object."); + goto error; + } + + bt_graph_add_interrupter(graph, graph->default_interrupter); ret = bt_object_pool_initialize(&graph->event_msg_pool, (bt_object_pool_new_object_func) bt_message_event_new, (bt_object_pool_destroy_object_func) destroy_message_event, @@ -417,7 +434,6 @@ enum bt_graph_connect_ports_status bt_graph_connect_ports( BT_ASSERT_PRE_NON_NULL(graph, "Graph"); BT_ASSERT_PRE_NON_NULL(upstream_port, "Upstream port"); BT_ASSERT_PRE_NON_NULL(downstream_port, "Downstream port port"); - BT_ASSERT_PRE(!graph->canceled, "Graph is canceled: %!+g", graph); BT_ASSERT_PRE( graph->config_state == BT_GRAPH_CONFIGURATION_STATE_CONFIGURING, "Graph is not in the \"configuring\" state: %!+g", graph); @@ -564,7 +580,7 @@ int consume_graph_sink(struct bt_component_sink *comp) consume_status = sink_class->methods.consume((void *) comp); BT_LOGD("User method returned: status=%s", bt_common_func_status_string(consume_status)); - BT_ASSERT_POST(consume_status == BT_FUNC_STATUS_OK || + BT_ASSERT_POST_DEV(consume_status == BT_FUNC_STATUS_OK || consume_status == BT_FUNC_STATUS_END || consume_status == BT_FUNC_STATUS_AGAIN || consume_status == BT_FUNC_STATUS_ERROR || @@ -663,7 +679,7 @@ int consume_no_check(struct bt_graph *graph) struct bt_component *sink; GList *current_node; - BT_ASSERT_PRE(graph->has_sink, + BT_ASSERT_PRE_DEV(graph->has_sink, "Graph has no sink component: %!+g", graph); BT_LIB_LOGD("Making next sink component consume: %![graph-]+g", graph); @@ -682,15 +698,15 @@ end: return status; } -enum bt_graph_consume_status bt_graph_consume(struct bt_graph *graph) +enum bt_graph_run_once_status bt_graph_run_once(struct bt_graph *graph) { - enum bt_graph_consume_status status; + enum bt_graph_run_once_status status; - BT_ASSERT_PRE_NON_NULL(graph, "Graph"); - BT_ASSERT_PRE(!graph->canceled, "Graph is canceled: %!+g", graph); - BT_ASSERT_PRE(graph->can_consume, + BT_ASSERT_PRE_DEV_NON_NULL(graph, "Graph"); + BT_ASSERT_PRE_DEV(graph->can_consume, "Cannot consume graph in its current state: %!+g", graph); - BT_ASSERT_PRE(graph->config_state != BT_GRAPH_CONFIGURATION_STATE_FAULTY, + BT_ASSERT_PRE_DEV(graph->config_state != + BT_GRAPH_CONFIGURATION_STATE_FAULTY, "Graph is in a faulty state: %!+g", graph); bt_graph_set_can_consume(graph, false); status = bt_graph_configure(graph); @@ -711,7 +727,6 @@ enum bt_graph_run_status bt_graph_run(struct bt_graph *graph) enum bt_graph_run_status status; BT_ASSERT_PRE_NON_NULL(graph, "Graph"); - BT_ASSERT_PRE(!graph->canceled, "Graph is canceled: %!+g", graph); BT_ASSERT_PRE(graph->can_consume, "Cannot consume graph in its current state: %!+g", graph); BT_ASSERT_PRE(graph->config_state != BT_GRAPH_CONFIGURATION_STATE_FAULTY, @@ -727,15 +742,15 @@ enum bt_graph_run_status bt_graph_run(struct bt_graph *graph) do { /* - * Check if the graph is canceled at each iteration. If - * the graph was canceled by another thread or by a - * signal handler, this is not a warning nor an error, - * it was intentional: log with a DEBUG level only. + * Check if the graph is interrupted at each iteration. + * If the graph was interrupted by another thread or by + * a signal handler, this is NOT a warning nor an error; + * it was intentional: log with an INFO level only. */ - if (G_UNLIKELY(graph->canceled)) { - BT_LIB_LOGI("Stopping the graph: graph is canceled: " - "%!+g", graph); - status = BT_FUNC_STATUS_CANCELED; + if (G_UNLIKELY(bt_graph_is_interrupted(graph))) { + BT_LIB_LOGI("Stopping the graph: " + "graph was interrupted: %!+g", graph); + status = BT_FUNC_STATUS_AGAIN; goto end; } @@ -774,7 +789,7 @@ bt_graph_add_source_component_output_port_added_listener( struct bt_graph *graph, bt_graph_source_component_output_port_added_listener_func func, bt_graph_listener_removed_func listener_removed, void *data, - int *out_listener_id) + bt_listener_id *out_listener_id) { struct bt_graph_listener_port_added listener = { .base = { @@ -783,7 +798,7 @@ bt_graph_add_source_component_output_port_added_listener( }, .func = (port_added_func_t) func, }; - int listener_id; + bt_listener_id listener_id; BT_ASSERT_PRE_NON_NULL(graph, "Graph"); BT_ASSERT_PRE_NON_NULL(func, "Listener"); @@ -809,7 +824,7 @@ bt_graph_add_filter_component_output_port_added_listener( struct bt_graph *graph, bt_graph_filter_component_output_port_added_listener_func func, bt_graph_listener_removed_func listener_removed, void *data, - int *out_listener_id) + bt_listener_id *out_listener_id) { struct bt_graph_listener_port_added listener = { .base = { @@ -818,7 +833,7 @@ bt_graph_add_filter_component_output_port_added_listener( }, .func = (port_added_func_t) func, }; - int listener_id; + bt_listener_id listener_id; BT_ASSERT_PRE_NON_NULL(graph, "Graph"); BT_ASSERT_PRE_NON_NULL(func, "Listener"); @@ -844,7 +859,7 @@ bt_graph_add_filter_component_input_port_added_listener( struct bt_graph *graph, bt_graph_filter_component_input_port_added_listener_func func, bt_graph_listener_removed_func listener_removed, void *data, - int *out_listener_id) + bt_listener_id *out_listener_id) { struct bt_graph_listener_port_added listener = { .base = { @@ -853,7 +868,7 @@ bt_graph_add_filter_component_input_port_added_listener( }, .func = (port_added_func_t) func, }; - int listener_id; + bt_listener_id listener_id; BT_ASSERT_PRE_NON_NULL(graph, "Graph"); BT_ASSERT_PRE_NON_NULL(func, "Listener"); @@ -879,7 +894,7 @@ bt_graph_add_sink_component_input_port_added_listener( struct bt_graph *graph, bt_graph_sink_component_input_port_added_listener_func func, bt_graph_listener_removed_func listener_removed, void *data, - int *out_listener_id) + bt_listener_id *out_listener_id) { struct bt_graph_listener_port_added listener = { .base = { @@ -888,7 +903,7 @@ bt_graph_add_sink_component_input_port_added_listener( }, .func = (port_added_func_t) func, }; - int listener_id; + bt_listener_id listener_id; BT_ASSERT_PRE_NON_NULL(graph, "Graph"); BT_ASSERT_PRE_NON_NULL(func, "Listener"); @@ -914,7 +929,7 @@ bt_graph_add_source_filter_component_ports_connected_listener( struct bt_graph *graph, bt_graph_source_filter_component_ports_connected_listener_func func, bt_graph_listener_removed_func listener_removed, void *data, - int *out_listener_id) + bt_listener_id *out_listener_id) { struct bt_graph_listener_ports_connected listener = { .base = { @@ -923,7 +938,7 @@ bt_graph_add_source_filter_component_ports_connected_listener( }, .func = (ports_connected_func_t) func, }; - int listener_id; + bt_listener_id listener_id; BT_ASSERT_PRE_NON_NULL(graph, "Graph"); BT_ASSERT_PRE_NON_NULL(func, "Listener"); @@ -950,7 +965,7 @@ bt_graph_add_source_sink_component_ports_connected_listener( struct bt_graph *graph, bt_graph_source_sink_component_ports_connected_listener_func func, bt_graph_listener_removed_func listener_removed, void *data, - int *out_listener_id) + bt_listener_id *out_listener_id) { struct bt_graph_listener_ports_connected listener = { .base = { @@ -959,7 +974,7 @@ bt_graph_add_source_sink_component_ports_connected_listener( }, .func = (ports_connected_func_t) func, }; - int listener_id; + bt_listener_id listener_id; BT_ASSERT_PRE_NON_NULL(graph, "Graph"); BT_ASSERT_PRE_NON_NULL(func, "Listener"); @@ -986,7 +1001,7 @@ bt_graph_add_filter_filter_component_ports_connected_listener( struct bt_graph *graph, bt_graph_filter_filter_component_ports_connected_listener_func func, bt_graph_listener_removed_func listener_removed, void *data, - int *out_listener_id) + bt_listener_id *out_listener_id) { struct bt_graph_listener_ports_connected listener = { .base = { @@ -995,7 +1010,7 @@ bt_graph_add_filter_filter_component_ports_connected_listener( }, .func = (ports_connected_func_t) func, }; - int listener_id; + bt_listener_id listener_id; BT_ASSERT_PRE_NON_NULL(graph, "Graph"); BT_ASSERT_PRE_NON_NULL(func, "Listener"); @@ -1022,7 +1037,7 @@ bt_graph_add_filter_sink_component_ports_connected_listener( struct bt_graph *graph, bt_graph_filter_sink_component_ports_connected_listener_func func, bt_graph_listener_removed_func listener_removed, void *data, - int *out_listener_id) + bt_listener_id *out_listener_id) { struct bt_graph_listener_ports_connected listener = { .base = { @@ -1031,7 +1046,7 @@ bt_graph_add_filter_sink_component_ports_connected_listener( }, .func = (ports_connected_func_t) func, }; - int listener_id; + bt_listener_id listener_id; BT_ASSERT_PRE_NON_NULL(graph, "Graph"); BT_ASSERT_PRE_NON_NULL(func, "Listener"); @@ -1208,20 +1223,6 @@ end: return status; } -enum bt_graph_cancel_status bt_graph_cancel(struct bt_graph *graph) -{ - BT_ASSERT_PRE_NON_NULL(graph, "Graph"); - graph->canceled = true; - BT_LIB_LOGI("Canceled graph: %!+i", graph); - return BT_FUNC_STATUS_OK; -} - -bt_bool bt_graph_is_canceled(const struct bt_graph *graph) -{ - BT_ASSERT_PRE_NON_NULL(graph, "Graph"); - return graph->canceled ? BT_TRUE : BT_FALSE; -} - BT_HIDDEN void bt_graph_remove_connection(struct bt_graph *graph, struct bt_connection *connection) @@ -1233,7 +1234,6 @@ void bt_graph_remove_connection(struct bt_graph *graph, g_ptr_array_remove(graph->connections, connection); } -BT_ASSERT_PRE_FUNC static inline bool component_name_exists(struct bt_graph *graph, const char *name) { @@ -1263,10 +1263,10 @@ int add_component_with_init_method_data( comp_init_method_t init_method, const char *name, const struct bt_value *params, void *init_method_data, bt_logging_level log_level, - struct bt_component **user_component) + const struct bt_component **user_component) { int status = BT_FUNC_STATUS_OK; - enum bt_component_class_init_method_status init_status; + enum bt_component_class_initialize_method_status init_status; struct bt_component *component = NULL; int ret; bool init_can_consume; @@ -1275,7 +1275,6 @@ int add_component_with_init_method_data( BT_ASSERT(comp_cls); BT_ASSERT_PRE_NON_NULL(graph, "Graph"); BT_ASSERT_PRE_NON_NULL(name, "Name"); - BT_ASSERT_PRE(!graph->canceled, "Graph is canceled: %!+g", graph); BT_ASSERT_PRE( graph->config_state == BT_GRAPH_CONFIGURATION_STATE_CONFIGURING, "Graph is not in the \"configuring\" state: %!+g", graph); @@ -1323,8 +1322,12 @@ int add_component_with_init_method_data( bt_value_freeze(params); if (init_method) { + /* + * There is no use for config objects right now, so just pass + * NULL. + */ BT_LOGD_STR("Calling user's initialization method."); - init_status = init_method(component, params, init_method_data); + init_status = init_method(component, NULL, params, init_method_data); BT_LOGD("User method returned: status=%s", bt_common_func_status_string(init_status)); if (init_status != BT_FUNC_STATUS_OK) { @@ -1351,7 +1354,8 @@ int add_component_with_init_method_data( /* * If it's a sink component, it needs to be part of the graph's - * sink queue to be consumed by bt_graph_consume(). + * sink queue to be consumed by bt_graph_run() or + * bt_graph_run_once(). */ if (bt_component_is_sink(component)) { graph->has_sink = true; @@ -1390,7 +1394,7 @@ end: } enum bt_graph_add_component_status -bt_graph_add_source_component_with_init_method_data( +bt_graph_add_source_component_with_initialize_method_data( struct bt_graph *graph, const struct bt_component_class_source *comp_cls, const char *name, const struct bt_value *params, @@ -1407,19 +1411,19 @@ enum bt_graph_add_component_status bt_graph_add_source_component( struct bt_graph *graph, const struct bt_component_class_source *comp_cls, const char *name, const struct bt_value *params, - bt_logging_level log_level, + enum bt_logging_level log_level, const struct bt_component_source **component) { - return bt_graph_add_source_component_with_init_method_data( + return bt_graph_add_source_component_with_initialize_method_data( graph, comp_cls, name, params, NULL, log_level, component); } enum bt_graph_add_component_status -bt_graph_add_filter_component_with_init_method_data( +bt_graph_add_filter_component_with_initialize_method_data( struct bt_graph *graph, const struct bt_component_class_filter *comp_cls, const char *name, const struct bt_value *params, - void *init_method_data, bt_logging_level log_level, + void *init_method_data, enum bt_logging_level log_level, const struct bt_component_filter **component) { BT_ASSERT_PRE_NON_NULL(comp_cls, "Component class"); @@ -1432,19 +1436,19 @@ enum bt_graph_add_component_status bt_graph_add_filter_component( struct bt_graph *graph, const struct bt_component_class_filter *comp_cls, const char *name, const struct bt_value *params, - bt_logging_level log_level, + enum bt_logging_level log_level, const struct bt_component_filter **component) { - return bt_graph_add_filter_component_with_init_method_data( + return bt_graph_add_filter_component_with_initialize_method_data( graph, comp_cls, name, params, NULL, log_level, component); } enum bt_graph_add_component_status -bt_graph_add_sink_component_with_init_method_data( +bt_graph_add_sink_component_with_initialize_method_data( struct bt_graph *graph, const struct bt_component_class_sink *comp_cls, const char *name, const struct bt_value *params, - void *init_method_data, bt_logging_level log_level, + void *init_method_data, enum bt_logging_level log_level, const struct bt_component_sink **component) { BT_ASSERT_PRE_NON_NULL(comp_cls, "Component class"); @@ -1457,13 +1461,51 @@ enum bt_graph_add_component_status bt_graph_add_sink_component( struct bt_graph *graph, const struct bt_component_class_sink *comp_cls, const char *name, const struct bt_value *params, - bt_logging_level log_level, + enum bt_logging_level log_level, const struct bt_component_sink **component) { - return bt_graph_add_sink_component_with_init_method_data( + return bt_graph_add_sink_component_with_initialize_method_data( graph, comp_cls, name, params, NULL, log_level, component); } +enum bt_graph_add_component_status +bt_graph_add_simple_sink_component(struct bt_graph *graph, const char *name, + bt_graph_simple_sink_component_initialize_func init_func, + bt_graph_simple_sink_component_consume_func consume_func, + bt_graph_simple_sink_component_finalize_func finalize_func, + void *user_data, const bt_component_sink **component) +{ + enum bt_graph_add_component_status status; + struct bt_component_class_sink *comp_cls; + struct simple_sink_init_method_data init_method_data = { + .init_func = init_func, + .consume_func = consume_func, + .finalize_func = finalize_func, + .user_data = user_data, + }; + + /* + * Other preconditions are checked by + * bt_graph_add_sink_component_with_init_method_data(). + */ + BT_ASSERT_PRE_NON_NULL(consume_func, "Consume function"); + + comp_cls = bt_component_class_sink_simple_borrow(); + if (!comp_cls) { + BT_LIB_LOGE_APPEND_CAUSE( + "Cannot borrow simple sink component class."); + status = BT_FUNC_STATUS_MEMORY_ERROR; + goto end; + } + + status = bt_graph_add_sink_component_with_initialize_method_data(graph, + comp_cls, name, NULL, &init_method_data, + BT_LOGGING_LEVEL_NONE, component); + +end: + return status; +} + BT_HIDDEN int bt_graph_remove_unconnected_component(struct bt_graph *graph, struct bt_component *component) @@ -1559,6 +1601,32 @@ void bt_graph_add_message(struct bt_graph *graph, g_ptr_array_add(graph->messages, msg); } +BT_HIDDEN +bool bt_graph_is_interrupted(const struct bt_graph *graph) +{ + BT_ASSERT(graph); + return bt_interrupter_array_any_is_set(graph->interrupters); +} + +enum bt_graph_add_interrupter_status bt_graph_add_interrupter( + struct bt_graph *graph, const struct bt_interrupter *intr) +{ + BT_ASSERT_PRE_NON_NULL(graph, "Graph"); + BT_ASSERT_PRE_NON_NULL(intr, "Interrupter"); + g_ptr_array_add(graph->interrupters, (void *) intr); + bt_object_get_ref_no_null_check(intr); + BT_LIB_LOGD("Added interrupter to graph: %![graph-]+g, %![intr-]+z", + graph, intr); + return BT_FUNC_STATUS_OK; +} + +void bt_graph_interrupt(struct bt_graph *graph) +{ + BT_ASSERT_PRE_NON_NULL(graph, "Graph"); + bt_interrupter_set(graph->default_interrupter); + BT_LIB_LOGI("Interrupted graph: %!+g", graph); +} + void bt_graph_get_ref(const struct bt_graph *graph) { bt_object_get_ref(graph);