From fec2a9f220bdae0ce64716538c111348302f2696 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Mon, 19 Sep 2016 22:29:21 -0400 Subject: [PATCH] Sinks own their input iterators MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérémie Galarneau --- converter/babeltrace.c | 45 ++-- .../plugin/component-class-internal.h | 3 +- .../babeltrace/plugin/component-internal.h | 6 + include/babeltrace/plugin/component.h | 9 +- include/babeltrace/plugin/plugin-system.h | 70 ++++-- include/babeltrace/plugin/sink-internal.h | 10 +- include/babeltrace/plugin/sink.h | 17 +- lib/plugin-system/component-factory.c | 2 +- lib/plugin-system/component.c | 6 +- lib/plugin-system/sink.c | 221 ++++++++++++++++-- lib/plugin-system/source.c | 3 +- plugins/text/text.c | 47 +++- 12 files changed, 355 insertions(+), 84 deletions(-) diff --git a/converter/babeltrace.c b/converter/babeltrace.c index 5b381bf6..e3436afa 100644 --- a/converter/babeltrace.c +++ b/converter/babeltrace.c @@ -203,6 +203,7 @@ int main(int argc, char **argv) struct bt_component *source = NULL, *sink = NULL; struct bt_value *source_params = NULL, *sink_params = NULL; struct bt_notification_iterator *it = NULL; + enum bt_component_status sink_status; ret = parse_options(argc, argv); if (ret < 0) { @@ -289,32 +290,30 @@ int main(int argc, char **argv) goto end; } - do { - enum bt_component_status sink_status; - struct bt_notification *notification = - bt_notification_iterator_get_notification(it); - - if (!notification) { - /* - * Should never happen in final code except after next - * has returned BT_NOTIFICATION_ITERATOR_STATUS_END. - * - * Right now it happens at the first event since the - * iterator is not completely initialized and we don't - * have a notification "heap" in place. - */ - continue; - } + sink_status = bt_component_sink_add_iterator(sink, it); + if (sink_status != BT_COMPONENT_STATUS_OK) { + ret = -1; + goto end; + } - sink_status = bt_component_sink_handle_notification(sink, - notification); - BT_PUT(notification); - if (sink_status != BT_COMPONENT_STATUS_OK) { - fprintf(stderr, "Sink component returned an error, aborting...\n"); + while (true) { + sink_status = bt_component_sink_consume(sink); + + switch (sink_status) { + case BT_COMPONENT_STATUS_AGAIN: + /* Wait for an arbitraty 500 ms. */ + usleep(500000); break; + case BT_COMPONENT_STATUS_OK: + break; + case BT_COMPONENT_STATUS_END: + goto end; + default: + fprintf(stderr, "Sink component returned an error, aborting...\n"); + ret = -1; + goto end; } - } while (bt_notification_iterator_next(it) == - BT_NOTIFICATION_ITERATOR_STATUS_OK); + } /* teardown and exit */ end: BT_PUT(component_factory); diff --git a/include/babeltrace/plugin/component-class-internal.h b/include/babeltrace/plugin/component-class-internal.h index 351e5c06..132a4e8d 100644 --- a/include/babeltrace/plugin/component-class-internal.h +++ b/include/babeltrace/plugin/component-class-internal.h @@ -42,8 +42,7 @@ struct bt_component_class { }; BT_HIDDEN -int bt_component_class_init( - struct bt_component_class *class, enum bt_component_type type, +int bt_component_class_init(struct bt_component_class *class, enum bt_component_type type, const char *name); BT_HIDDEN diff --git a/include/babeltrace/plugin/component-internal.h b/include/babeltrace/plugin/component-internal.h index aacef39b..4239ac4d 100644 --- a/include/babeltrace/plugin/component-internal.h +++ b/include/babeltrace/plugin/component-internal.h @@ -45,6 +45,12 @@ struct bt_component { /** User-defined data and its destruction callback */ void *user_data; bt_component_destroy_cb user_destroy; + + /** + * Used to protect operations which may only be used during + * a component's initialization. + */ + bool initializing; }; BT_HIDDEN diff --git a/include/babeltrace/plugin/component.h b/include/babeltrace/plugin/component.h index 9655c00d..c1ad622e 100644 --- a/include/babeltrace/plugin/component.h +++ b/include/babeltrace/plugin/component.h @@ -40,7 +40,14 @@ extern "C" { */ enum bt_component_status { /** No error, okay. */ - BT_COMPONENT_STATUS_OK = 0, + BT_COMPONENT_STATUS_OK = 0, + /** No more work to be done by this component. **/ + BT_COMPONENT_STATUS_END = 1, + /** + * Component can't process a notification at this time + * (e.g. would block), try again later. + */ + BT_COMPONENT_STATUS_AGAIN = 2, /** General error. */ BT_COMPONENT_STATUS_ERROR = -1, /** Unsupported component feature. */ diff --git a/include/babeltrace/plugin/plugin-system.h b/include/babeltrace/plugin/plugin-system.h index 0c3618c2..ec84b872 100644 --- a/include/babeltrace/plugin/plugin-system.h +++ b/include/babeltrace/plugin/plugin-system.h @@ -31,6 +31,7 @@ */ #include +#include #ifdef __cplusplus extern "C" { @@ -121,45 +122,68 @@ bt_component_source_set_iterator_init_cb(struct bt_component *source, /** bt_component_sink */ /** - * Notification handling function type. + * Notification consumption function type. * - * A reference must be taken on the notification if the component has to - * keep ownership of the notification beyond the invocation of the callback. + * @param sink Sink component instance + * @returns One of #bt_component_status values + */ +typedef enum bt_component_status (*bt_component_sink_consume_cb)( + struct bt_component *); + +/** + * Iterator addition function type. + * + * A sink component may choose to refuse the addition of an iterator + * by not returning BT_COMPONENT_STATUS_OK. * * @param sink Sink component instance - * @param notification Notification to handle * @returns One of #bt_component_status values */ -typedef enum bt_component_status (*bt_component_sink_handle_notification_cb)( - struct bt_component *, struct bt_notification *); +typedef enum bt_component_status (*bt_component_sink_add_iterator_cb)( + struct bt_component *, struct bt_notification_iterator *); /** - * Set a sink component's notification handling callback. + * Set a sink component's consumption callback. * - * @param sink Sink component instance - * @param handle_notification Notification handling callback - * @returns One of #bt_component_status values + * @param sink Sink component instance + * @param consume Consumption callback + * @returns One of #bt_component_status values */ extern enum bt_component_status -bt_component_sink_set_handle_notification_cb(struct bt_component *sink, - bt_component_sink_handle_notification_cb handle_notification); +bt_component_sink_set_consume_cb(struct bt_component *sink, + bt_component_sink_consume_cb consume); /** - * Register a sink to a given notification type. - * - * A sink is always registered to notifications of type - * BT_NOTIFICATION_TYPE_EVENT. However, it may opt to receive any (or all) - * other notification type(s). + * Set a sink component's iterator addition callback. * - * @param sink Sink component instance. - * @param type One of #bt_notification_type - * @returns One of #bt_component_status + * @param sink Sink component instance + * @param add_iterator Iterator addition callback + * @returns One of #bt_component_status values */ extern enum bt_component_status -bt_component_sink_register_notification_type(struct bt_component *sink, - enum bt_notification_type type); +bt_component_sink_set_add_iterator_cb(struct bt_component *sink, + bt_component_sink_add_iterator_cb add_iterator); + +/* Defaults to 1. */ +extern enum bt_component_status +bt_component_sink_set_minimum_input_count(struct bt_component *sink, + unsigned int minimum); + +/* Defaults to 1. */ +extern enum bt_component_status +bt_component_sink_set_maximum_input_count(struct bt_component *sink, + unsigned int maximum); + +extern enum bt_component_status +bt_component_sink_get_input_count(struct bt_component *sink, + unsigned int *count); + +/* May return NULL after an interator has reached its end. */ +extern enum bt_component_status +bt_component_sink_get_input_iterator(struct bt_component *sink, + unsigned int input, struct bt_notification_iterator **iterator); -/** bt_component_notification_iterator */ +/** bt_notification_iterator */ /** * Function returning an iterator's current notification. * diff --git a/include/babeltrace/plugin/sink-internal.h b/include/babeltrace/plugin/sink-internal.h index 047d6d1c..b3a56f95 100644 --- a/include/babeltrace/plugin/sink-internal.h +++ b/include/babeltrace/plugin/sink-internal.h @@ -42,9 +42,13 @@ typedef uint32_t notification_mask_t; struct bt_component_sink { struct bt_component parent; - /* Component implementation callbacks */ - bt_component_sink_handle_notification_cb handle_notification; - notification_mask_t registered_notifications_mask; + bt_component_sink_consume_cb consume; + bt_component_sink_add_iterator_cb add_iterator; + GPtrArray *inputs; + unsigned int min_input_count; + unsigned int max_input_count; + bool validated_inputs; +/* notification_mask_t registered_notifications_mask;*/ }; /** diff --git a/include/babeltrace/plugin/sink.h b/include/babeltrace/plugin/sink.h index ae563f67..3ea16458 100644 --- a/include/babeltrace/plugin/sink.h +++ b/include/babeltrace/plugin/sink.h @@ -37,15 +37,24 @@ struct bt_component; struct bt_notification; /** - * Hand-off a notification to a sink component. + * Add a notification iterator to a sink component. * * @param component Component instance - * @param notification Notification instance to handle + * @param iterator Notification iterator to add * @returns One of #bt_component_status values */ -enum bt_component_status bt_component_sink_handle_notification( +enum bt_component_status bt_component_sink_add_iterator( struct bt_component *component, - struct bt_notification *notification); + struct bt_notification_iterator *iterator); + +/** + * Process one event, consuming from sources as needed. + * + * @param component Component instance + * @returns One of #bt_component_status values + */ +enum bt_component_status bt_component_sink_consume( + struct bt_component *component); #ifdef __cplusplus } diff --git a/lib/plugin-system/component-factory.c b/lib/plugin-system/component-factory.c index fbf36d68..3fb02057 100644 --- a/lib/plugin-system/component-factory.c +++ b/lib/plugin-system/component-factory.c @@ -435,7 +435,7 @@ add_component_class(struct bt_component_factory *factory, const char *name, } component_class = bt_component_class_create(type, name, description, - init, factory->current_plugin); + init, factory->current_plugin); g_ptr_array_add(factory->component_classes, component_class); end: return ret; diff --git a/lib/plugin-system/component.c b/lib/plugin-system/component.c index b580bb5d..643e17ad 100644 --- a/lib/plugin-system/component.c +++ b/lib/plugin-system/component.c @@ -131,7 +131,9 @@ struct bt_component *bt_component_create( goto end; } + component->initializing = true; component_class->init(component, params); + component->initializing = false; ret = component_validation_funcs[type](component); if (ret != BT_COMPONENT_STATUS_OK) { BT_PUT(component); @@ -186,7 +188,7 @@ bt_component_set_private_data(struct bt_component *component, { enum bt_component_status ret = BT_COMPONENT_STATUS_OK; - if (!component) { + if (!component || !component->initializing) { ret = BT_COMPONENT_STATUS_INVALID; goto end; } @@ -201,7 +203,7 @@ enum bt_component_status bt_component_set_destroy_cb( { enum bt_component_status ret = BT_COMPONENT_STATUS_OK; - if (!component) { + if (!component || !component->initializing) { ret = BT_COMPONENT_STATUS_INVALID; goto end; } diff --git a/lib/plugin-system/sink.c b/lib/plugin-system/sink.c index 7067e06e..d9b33616 100644 --- a/lib/plugin-system/sink.c +++ b/lib/plugin-system/sink.c @@ -27,6 +27,7 @@ */ #include +#include #include #include #include @@ -39,17 +40,14 @@ enum bt_component_status bt_component_sink_validate( struct bt_component_sink *sink; sink = container_of(component, struct bt_component_sink, parent); - if (sink->registered_notifications_mask == 0) { - /* - * A sink must be registered to at least one notification type. - */ - printf_error("Invalid sink component; not registered to any notification"); + if (!sink->consume) { + printf_error("Invalid sink component; no notification consumption callback defined."); ret = BT_COMPONENT_STATUS_INVALID; goto end; } - if (!sink->handle_notification) { - printf_error("Invalid sink component; no notification handling callback defined."); + if (sink->min_input_count > sink->max_input_count) { + printf_error("Invalid sink component; minimum input count > maximum input count."); ret = BT_COMPONENT_STATUS_INVALID; goto end; } @@ -57,6 +55,15 @@ end: return ret; } +static +void bt_component_sink_destroy(struct bt_component *component) +{ + struct bt_component_sink *sink = container_of(component, + struct bt_component_sink, parent); + + g_ptr_array_free(sink->inputs, TRUE); +} + BT_HIDDEN struct bt_component *bt_component_sink_create( struct bt_component_class *class, struct bt_value *params) @@ -70,16 +77,24 @@ struct bt_component *bt_component_sink_create( } sink->parent.class = bt_get(class); - ret = bt_component_init(&sink->parent, NULL); + ret = bt_component_init(&sink->parent, bt_component_sink_destroy); if (ret != BT_COMPONENT_STATUS_OK) { goto error; } + sink->min_input_count = 1; + sink->max_input_count = 1; +/* ret = bt_component_sink_register_notification_type(&sink->parent, BT_NOTIFICATION_TYPE_EVENT); if (ret != BT_COMPONENT_STATUS_OK) { goto error; } +*/ + sink->inputs = g_ptr_array_new_with_free_func(bt_put); + if (!sink->inputs) { + goto error; + } end: return sink ? &sink->parent : NULL; error: @@ -87,14 +102,26 @@ error: return NULL; } -enum bt_component_status bt_component_sink_handle_notification( - struct bt_component *component, - struct bt_notification *notification) +static +enum bt_component_status validate_inputs(struct bt_component_sink *sink) +{ + size_t array_size = sink->inputs->len; + + if (array_size < sink->min_input_count || + array_size > sink->max_input_count) { + return BT_COMPONENT_STATUS_INVALID; + } + + return BT_COMPONENT_STATUS_OK; +} + +enum bt_component_status bt_component_sink_consume( + struct bt_component *component) { enum bt_component_status ret = BT_COMPONENT_STATUS_OK; struct bt_component_sink *sink = NULL; - if (!component || !notification) { + if (!component) { ret = BT_COMPONENT_STATUS_INVALID; goto end; } @@ -105,12 +132,21 @@ enum bt_component_status bt_component_sink_handle_notification( } sink = container_of(component, struct bt_component_sink, parent); - assert(sink->handle_notification); - ret = sink->handle_notification(component, notification); + if (!sink->validated_inputs) { + ret = validate_inputs(sink); + if (ret != BT_COMPONENT_STATUS_OK) { + goto end; + } + sink->validated_inputs = true; + } + + assert(sink->consume); + ret = sink->consume(component); end: return ret; } - +/* +static enum bt_component_status bt_component_sink_register_notification_type( struct bt_component *component, enum bt_notification_type type) { @@ -142,13 +178,41 @@ enum bt_component_status bt_component_sink_register_notification_type( end: return ret; } +*/ +enum bt_component_status bt_component_sink_set_consume_cb( + struct bt_component *component, + bt_component_sink_consume_cb consume) +{ + struct bt_component_sink *sink; + enum bt_component_status ret = BT_COMPONENT_STATUS_OK; -enum bt_component_status bt_component_sink_set_handle_notification_cb( + if (!component) { + ret = BT_COMPONENT_STATUS_INVALID; + goto end; + } + + if (bt_component_get_type(component) != BT_COMPONENT_TYPE_SINK) { + ret = BT_COMPONENT_STATUS_UNSUPPORTED; + goto end; + } + + if (!component->initializing) { + ret = BT_COMPONENT_STATUS_INVALID; + goto end; + } + + sink = container_of(component, struct bt_component_sink, parent); + sink->consume = consume; +end: + return ret; +} + +enum bt_component_status bt_component_sink_set_minimum_input_count( struct bt_component *component, - bt_component_sink_handle_notification_cb handle_notification) + unsigned int minimum) { + struct bt_component_sink *sink; enum bt_component_status ret = BT_COMPONENT_STATUS_OK; - struct bt_component_sink *sink = NULL; if (!component) { ret = BT_COMPONENT_STATUS_INVALID; @@ -160,8 +224,127 @@ enum bt_component_status bt_component_sink_set_handle_notification_cb( goto end; } + if (!component->initializing) { + ret = BT_COMPONENT_STATUS_INVALID; + goto end; + } + sink = container_of(component, struct bt_component_sink, parent); - sink->handle_notification = handle_notification; + sink->min_input_count = minimum; +end: + return ret; +} + +enum bt_component_status bt_component_sink_set_maximum_input_count( + struct bt_component *component, + unsigned int maximum) +{ + struct bt_component_sink *sink; + enum bt_component_status ret = BT_COMPONENT_STATUS_OK; + + if (!component) { + ret = BT_COMPONENT_STATUS_INVALID; + goto end; + } + + if (bt_component_get_type(component) != BT_COMPONENT_TYPE_SINK) { + ret = BT_COMPONENT_STATUS_UNSUPPORTED; + goto end; + } + + if (!component->initializing) { + ret = BT_COMPONENT_STATUS_INVALID; + goto end; + } + + sink = container_of(component, struct bt_component_sink, parent); + sink->max_input_count = maximum; +end: + return ret; +} + +enum bt_component_status +bt_component_sink_get_input_count(struct bt_component *component, + unsigned int *count) +{ + struct bt_component_sink *sink; + enum bt_component_status ret = BT_COMPONENT_STATUS_OK; + + if (!component || !count) { + ret = BT_COMPONENT_STATUS_INVALID; + goto end; + } + + if (bt_component_get_type(component) != BT_COMPONENT_TYPE_SINK) { + ret = BT_COMPONENT_STATUS_UNSUPPORTED; + goto end; + } + + sink = container_of(component, struct bt_component_sink, parent); + *count = (unsigned int) sink->inputs->len; +end: + return ret; +} + +enum bt_component_status +bt_component_sink_get_input_iterator(struct bt_component *component, + unsigned int input, struct bt_notification_iterator **iterator) +{ + struct bt_component_sink *sink; + enum bt_component_status ret = BT_COMPONENT_STATUS_OK; + + if (!component || !iterator) { + ret = BT_COMPONENT_STATUS_INVALID; + goto end; + } + + if (bt_component_get_type(component) != BT_COMPONENT_TYPE_SINK) { + ret = BT_COMPONENT_STATUS_UNSUPPORTED; + goto end; + } + + sink = container_of(component, struct bt_component_sink, parent); + if (input >= (unsigned int) sink->inputs->len) { + ret = BT_COMPONENT_STATUS_INVALID; + goto end; + } + + *iterator = bt_get(g_ptr_array_index(sink->inputs, input)); +end: + return ret; +} + +enum bt_component_status +bt_component_sink_add_iterator(struct bt_component *component, + struct bt_notification_iterator *iterator) +{ + struct bt_component_sink *sink; + enum bt_component_status ret = BT_COMPONENT_STATUS_OK; + + if (!component || !iterator) { + ret = BT_COMPONENT_STATUS_INVALID; + goto end; + } + + if (bt_component_get_type(component) != BT_COMPONENT_TYPE_SINK) { + ret = BT_COMPONENT_STATUS_UNSUPPORTED; + goto end; + } + + sink = container_of(component, struct bt_component_sink, parent); + if (sink->inputs->len == sink->max_input_count) { + ret = BT_COMPONENT_STATUS_UNSUPPORTED; + goto end; + } + + if (sink->add_iterator) { + ret = sink->add_iterator(component, iterator); + if (ret != BT_COMPONENT_STATUS_OK) { + goto end; + } + } + + g_ptr_array_add(sink->inputs, bt_get(iterator)); end: return ret; } diff --git a/lib/plugin-system/source.c b/lib/plugin-system/source.c index 333139f5..51b32f83 100644 --- a/lib/plugin-system/source.c +++ b/lib/plugin-system/source.c @@ -93,7 +93,8 @@ bt_component_source_set_iterator_init_cb(struct bt_component *component, struct bt_component_source *source; enum bt_component_status ret = BT_COMPONENT_STATUS_OK; - if (component->class->type != BT_COMPONENT_TYPE_SOURCE) { + if (component->class->type != BT_COMPONENT_TYPE_SOURCE || + !component->initializing) { ret = BT_COMPONENT_STATUS_INVALID; goto end; } diff --git a/plugins/text/text.c b/plugins/text/text.c index a318bfbf..f1744289 100644 --- a/plugins/text/text.c +++ b/plugins/text/text.c @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -74,7 +75,8 @@ end: return text; } -static void destroy_text(struct bt_component *component) +static +void destroy_text(struct bt_component *component) { void *data = bt_component_get_private_data(component); @@ -82,11 +84,10 @@ static void destroy_text(struct bt_component *component) } static -enum bt_component_status handle_notification(struct bt_component *component, +enum bt_component_status handle_notification(struct text_component *text, struct bt_notification *notification) { enum bt_component_status ret = BT_COMPONENT_STATUS_OK; - struct text_component *text = bt_component_get_private_data(component); if (!text) { ret = BT_COMPONENT_STATUS_ERROR; @@ -110,6 +111,7 @@ enum bt_component_status handle_notification(struct bt_component *component, goto end; } ret = text_print_event(text, event); + bt_put(event); if (ret != BT_COMPONENT_STATUS_OK) { goto end; } @@ -125,6 +127,41 @@ end: return ret; } +static +enum bt_component_status run(struct bt_component *component) +{ + enum bt_component_status ret; + struct bt_notification *notification = NULL; + struct bt_notification_iterator *it; + struct text_component *text = bt_component_get_private_data(component); + + ret = bt_component_sink_get_input_iterator(component, 0, &it); + if (ret != BT_COMPONENT_STATUS_OK) { + goto end; + } + + if (!text->processed_first_event) { + ret = bt_notification_iterator_next(it); + if (ret != BT_COMPONENT_STATUS_OK) { + goto end; + } + } else { + text->processed_first_event = true; + } + + notification = bt_notification_iterator_get_notification(it); + if (!notification) { + ret = BT_COMPONENT_STATUS_ERROR; + goto end; + } + + ret = handle_notification(text, notification); +end: + bt_put(it); + bt_put(notification); + return ret; +} + static enum bt_component_status text_component_init( struct bt_component *component, struct bt_value *params) @@ -148,8 +185,8 @@ enum bt_component_status text_component_init( goto error; } - ret = bt_component_sink_set_handle_notification_cb(component, - handle_notification); + ret = bt_component_sink_set_consume_cb(component, + run); if (ret != BT_COMPONENT_STATUS_OK) { goto error; } -- 2.34.1