From d4393e0875e7b08f6ee97d617cc5f2c9286742a4 Mon Sep 17 00:00:00 2001 From: Philippe Proulx Date: Fri, 22 Jun 2018 16:54:27 -0400 Subject: [PATCH] lib: notification iterator: transfer a batch of notifications This patch makes each notification iterator's "next" operation return a batch of notifications intead of a single one. This has a positive effect on performance as we're decreasing the number of executed instructions because there are fewer function calls per notification. The batch size is arbitrarily set to 15 here as a value between 10 and 50 experimentally showed to be beneficial, and there's usually more than one active stream on a typical graph configuration. As the batch size increases, the performance eventually degrades, as I suspect that (pooled) notification objects are constantly getting into and out of CPU cache. The goal of hiding the batch size in the library's implementation is to eventually determine a value at run time depending on the situation, and even change it when needed between consuming sinks. Each notification iterator contains a fixed-sized array of notification pointers. This array is allocated when the iterator is created and freed when it is destroyed. For convenience, I'm adding this typedef: typedef struct bt_notification **bt_notification_array; Consumer side ============= The consuming side API is now: enum bt_notification_iterator_status bt_private_connection_notification_iterator_next( struct bt_notification_iterator *iterator, bt_notification_array *notifs, uint64_t *count); enum bt_notification_iterator_status bt_output_port_notification_iterator_next( struct bt_notification_iterator *iterator, bt_notification_array *notifs, uint64_t *count); In English: the function sets `notifs` to the address of a notification array containing `*count` notifications when the returned status is `BT_NOTIFICATION_ITERATOR_STATUS_OK`. In this case, the caller is responsible for putting each notification in the array (the array's content is owned by the caller, but not the array itself), as the functions above do not put what's already in the iterator's array before filling it. If the status is not `BT_NOTIFICATION_ITERATOR_STATUS_OK`, then the function does not set `*notifs` and `*count`. Because the `utils.flt.muxer` component class kept each upstream notification iterator's current notification, it now keeps a queue of notifications, and the upstream iterator is now considered invalid if its notification queue is empty. If less than `*count` notifications are required, the caller can either: * Discard (put) unneeded notifications: they are lost forever. * Keep a queue (or other data structure) of notifications by keeping the references. The caller must NOT keep the array itself in this case, but each contained notification object, copying the pointer values to its territory. Producer side ============= The "next" method signature is now: enum bt_notification_iterator_status next_method( struct bt_private_connection_private_notification_iterator *notif_iter, bt_notification_array notifs, uint64_t capacity, uint64_t *count); The callback must either: * Fill `notifs` with up to `capacity` notifications (notification's ownership is transfered to the array), set `*count` to how many notifications were transfered (at least one), and return `BT_NOTIFICATION_ITERATOR_STATUS_OK`. `capacity` is always greater than zero. * Return something else than `BT_NOTIFICATION_ITERATOR_STATUS_OK`, not touching `notifs` and `count`. IMPORTANT: The callback must NOT transfer one or more notifications to `notifs` and return something else than `BT_NOTIFICATION_ITERATOR_STATUS_OK`. Signed-off-by: Philippe Proulx --- .../component-class-sink-colander-internal.h | 4 +- include/babeltrace/graph/component-class.h | 14 +- .../graph/notification-iterator-internal.h | 8 +- include/babeltrace/graph/notification.h | 2 + .../graph/output-port-notification-iterator.h | 5 +- ...private-connection-notification-iterator.h | 2 +- lib/graph/component-class-sink-colander.c | 18 +- lib/graph/iterator.c | 150 ++++++++---- plugins/Makefile.am | 2 +- plugins/ctf/common/notif-iter/notif-iter.c | 3 +- plugins/ctf/fs-src/data-stream-file.c | 20 +- plugins/ctf/fs-src/data-stream-file.h | 5 +- plugins/ctf/fs-src/fs.c | 84 +++++-- plugins/ctf/fs-src/fs.h | 6 +- plugins/text/dmesg/dmesg.c | 84 +++++-- plugins/text/dmesg/dmesg.h | 6 +- plugins/text/pretty/pretty.c | 24 +- plugins/utils/Makefile.am | 2 +- plugins/utils/counter/counter.c | 109 +++++---- plugins/utils/counter/counter.h | 1 + plugins/utils/dummy/dummy.c | 21 +- plugins/utils/muxer/muxer.c | 207 ++++++++++------- plugins/utils/muxer/muxer.h | 7 +- tests/lib/test-plugin-plugins/sfs.c | 13 +- tests/lib/test_bt_notification_iterator.c | 219 +++++++++--------- tests/lib/test_graph_topo.c | 12 +- tests/plugins/test-utils-muxer.c | 201 ++++++++-------- 27 files changed, 733 insertions(+), 496 deletions(-) diff --git a/include/babeltrace/graph/component-class-sink-colander-internal.h b/include/babeltrace/graph/component-class-sink-colander-internal.h index d58a8ddf..d1069543 100644 --- a/include/babeltrace/graph/component-class-sink-colander-internal.h +++ b/include/babeltrace/graph/component-class-sink-colander-internal.h @@ -23,6 +23,7 @@ * SOFTWARE. */ +#include #include #ifdef __cplusplus @@ -33,7 +34,8 @@ struct bt_component_class; struct bt_notification; struct bt_component_class_sink_colander_data { - struct bt_notification **notification; + bt_notification_array notifs; + uint64_t *count_addr; }; extern struct bt_component_class *bt_component_class_sink_colander_get(void); diff --git a/include/babeltrace/graph/component-class.h b/include/babeltrace/graph/component-class.h index acad8eec..f7d24364 100644 --- a/include/babeltrace/graph/component-class.h +++ b/include/babeltrace/graph/component-class.h @@ -39,6 +39,9 @@ /* For bt_bool */ #include +/* For bt_notification_array */ +#include + #ifdef __cplusplus extern "C" { #endif @@ -68,11 +71,6 @@ enum bt_component_class_type { BT_COMPONENT_CLASS_TYPE_FILTER = 2, }; -struct bt_notification_iterator_next_method_return { - struct bt_notification *notification; - enum bt_notification_iterator_status status; -}; - struct bt_component_class_query_method_return { struct bt_value *result; enum bt_query_status status; @@ -93,9 +91,11 @@ typedef enum bt_notification_iterator_status typedef void (*bt_component_class_notification_iterator_finalize_method)( struct bt_private_connection_private_notification_iterator *notification_iterator); -typedef struct bt_notification_iterator_next_method_return +typedef enum bt_notification_iterator_status (*bt_component_class_notification_iterator_next_method)( - struct bt_private_connection_private_notification_iterator *notification_iterator); + struct bt_private_connection_private_notification_iterator *notification_iterator, + bt_notification_array notifs, uint64_t capacity, + uint64_t *count); typedef struct bt_component_class_query_method_return (*bt_component_class_query_method)( struct bt_component_class *component_class, diff --git a/include/babeltrace/graph/notification-iterator-internal.h b/include/babeltrace/graph/notification-iterator-internal.h index dd80a519..2cfad159 100644 --- a/include/babeltrace/graph/notification-iterator-internal.h +++ b/include/babeltrace/graph/notification-iterator-internal.h @@ -86,6 +86,7 @@ enum bt_private_connection_notification_iterator_state { struct bt_notification_iterator { struct bt_object base; enum bt_notification_iterator_type type; + GPtrArray *notifs; }; struct bt_notification_iterator_private_connection { @@ -124,8 +125,11 @@ struct bt_notification_iterator_output_port { struct bt_graph *graph; /* Owned by this */ struct bt_component *colander; /* Owned by this */ - /* Only used temporarily: should always be NULL */ - struct bt_notification *notif; + /* + * Only used temporarily as a bridge between a colander sink and + * the user. + */ + uint64_t count; }; static inline diff --git a/include/babeltrace/graph/notification.h b/include/babeltrace/graph/notification.h index ce4c374f..1f8b7a31 100644 --- a/include/babeltrace/graph/notification.h +++ b/include/babeltrace/graph/notification.h @@ -33,6 +33,8 @@ extern "C" { struct bt_notification; +typedef struct bt_notification **bt_notification_array; + /** * Notification types. Unhandled notification types should be ignored. */ diff --git a/include/babeltrace/graph/output-port-notification-iterator.h b/include/babeltrace/graph/output-port-notification-iterator.h index d5c897d3..ab6c7e4f 100644 --- a/include/babeltrace/graph/output-port-notification-iterator.h +++ b/include/babeltrace/graph/output-port-notification-iterator.h @@ -38,8 +38,9 @@ extern struct bt_notification_iterator *bt_output_port_notification_iterator_cre struct bt_port *port, const char *colander_component_name); extern enum bt_notification_iterator_status -bt_output_port_notification_iterator_next(struct bt_notification_iterator *iterator, - struct bt_notification **notification); +bt_output_port_notification_iterator_next( + struct bt_notification_iterator *iterator, + bt_notification_array *notifs, uint64_t *count); #ifdef __cplusplus } diff --git a/include/babeltrace/graph/private-connection-notification-iterator.h b/include/babeltrace/graph/private-connection-notification-iterator.h index b1a3dc2d..46e6d137 100644 --- a/include/babeltrace/graph/private-connection-notification-iterator.h +++ b/include/babeltrace/graph/private-connection-notification-iterator.h @@ -40,7 +40,7 @@ extern struct bt_component *bt_private_connection_notification_iterator_get_comp extern enum bt_notification_iterator_status bt_private_connection_notification_iterator_next( struct bt_notification_iterator *iterator, - struct bt_notification **notification); + bt_notification_array *notifs, uint64_t *count); #ifdef __cplusplus } diff --git a/lib/graph/component-class-sink-colander.c b/lib/graph/component-class-sink-colander.c index e9e23d54..c6c12879 100644 --- a/lib/graph/component-class-sink-colander.c +++ b/lib/graph/component-class-sink-colander.c @@ -39,8 +39,8 @@ static struct bt_component_class *colander_comp_cls; struct colander_data { - struct bt_notification **user_notif; - enum bt_notification_type *notif_types; + bt_notification_array notifs; + uint64_t *count_addr; struct bt_notification_iterator *notif_iter; }; @@ -67,7 +67,8 @@ enum bt_component_status colander_init( goto end; } - colander_data->user_notif = user_provided_data->notification; + colander_data->notifs = user_provided_data->notifs; + colander_data->count_addr = user_provided_data->count_addr; status = bt_private_component_sink_add_input_private_port( priv_comp, "in", NULL, NULL); if (status != BT_COMPONENT_STATUS_OK) { @@ -95,7 +96,6 @@ void colander_finalize(struct bt_private_component *priv_comp) bt_put(colander_data->notif_iter); } - g_free(colander_data->notif_types); g_free(colander_data); } @@ -131,9 +131,9 @@ enum bt_component_status colander_consume( { enum bt_component_status status = BT_COMPONENT_STATUS_OK; enum bt_notification_iterator_status notif_iter_status; - struct bt_notification *notif = NULL; struct colander_data *colander_data = bt_private_component_get_user_data(priv_comp); + bt_notification_array notifs; BT_ASSERT(colander_data); @@ -144,7 +144,7 @@ enum bt_component_status colander_consume( } notif_iter_status = bt_private_connection_notification_iterator_next( - colander_data->notif_iter, ¬if); + colander_data->notif_iter, ¬ifs, colander_data->count_addr); switch (notif_iter_status) { case BT_NOTIFICATION_ITERATOR_STATUS_CANCELED: status = BT_COMPONENT_STATUS_OK; @@ -156,17 +156,17 @@ enum bt_component_status colander_consume( status = BT_COMPONENT_STATUS_END; goto end; case BT_NOTIFICATION_ITERATOR_STATUS_OK: + /* Move notifications to user (count already set) */ + memcpy(colander_data->notifs, notifs, + sizeof(*notifs) * *colander_data->count_addr); break; default: status = BT_COMPONENT_STATUS_ERROR; goto end; } - BT_ASSERT(notif); - end: /* Move notification to user's pointer, even if NULL. */ - *colander_data->user_notif = notif; return status; } diff --git a/lib/graph/iterator.c b/lib/graph/iterator.c index 5135bcee..0deb40c7 100644 --- a/lib/graph/iterator.c +++ b/lib/graph/iterator.c @@ -63,6 +63,12 @@ #include #include +/* + * TODO: Use graph's state (number of active iterators, etc.) and + * possibly system specifications to make a better guess than this. + */ +#define NOTIF_BATCH_SIZE 15 + struct discarded_elements_state { struct bt_clock_value *cur_begin; uint64_t cur_count; @@ -130,8 +136,15 @@ end: static void destroy_base_notification_iterator(struct bt_object *obj) { - BT_ASSERT(obj); - g_free(obj); + struct bt_notification_iterator *iterator = (void *) obj; + + BT_ASSERT(iterator); + + if (iterator->notifs) { + g_ptr_array_free(iterator->notifs, TRUE); + } + + g_free(iterator); } static @@ -267,12 +280,25 @@ void bt_private_connection_notification_iterator_set_connection( } static -void init_notification_iterator(struct bt_notification_iterator *iterator, +int init_notification_iterator(struct bt_notification_iterator *iterator, enum bt_notification_iterator_type type, bt_object_release_func destroy) { + int ret = 0; + bt_object_init_shared(&iterator->base, destroy); iterator->type = type; + iterator->notifs = g_ptr_array_new(); + if (!iterator->notifs) { + BT_LOGE_STR("Failed to allocate a GPtrArray."); + ret = -1; + goto end; + } + + g_ptr_array_set_size(iterator->notifs, NOTIF_BATCH_SIZE); + +end: + return ret; } BT_HIDDEN @@ -285,6 +311,7 @@ enum bt_connection_status bt_private_connection_notification_iterator_create( enum bt_connection_status status = BT_CONNECTION_STATUS_OK; enum bt_component_class_type type; struct bt_notification_iterator_private_connection *iterator = NULL; + int ret; BT_ASSERT(upstream_comp); BT_ASSERT(upstream_port); @@ -307,9 +334,14 @@ enum bt_connection_status bt_private_connection_notification_iterator_create( goto end; } - init_notification_iterator((void *) iterator, + ret = init_notification_iterator((void *) iterator, BT_NOTIFICATION_ITERATOR_TYPE_PRIVATE_CONNECTION, bt_private_connection_notification_iterator_destroy); + if (ret) { + /* init_notification_iterator() logs errors */ + status = BT_CONNECTION_STATUS_NOMEM; + goto end; + } iterator->stream_states = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) destroy_stream_state); @@ -571,6 +603,26 @@ end: return is_valid; } +BT_ASSERT_PRE_FUNC +static inline +bool validate_notifications( + struct bt_notification_iterator_private_connection *iterator, + uint64_t count) +{ + bool ret = true; + bt_notification_array notifs = (void *) iterator->base.notifs->pdata; + uint64_t i; + + for (i = 0; i < count; i++) { + ret = validate_notification(iterator, notifs[i]); + if (!ret) { + break; + } + } + + return ret; +} + BT_ASSERT_PRE_FUNC static inline bool priv_conn_notif_iter_can_end( struct bt_notification_iterator_private_connection *iterator) @@ -609,22 +661,19 @@ end: enum bt_notification_iterator_status bt_private_connection_notification_iterator_next( struct bt_notification_iterator *user_iterator, - struct bt_notification **user_notif) + struct bt_notification ***user_notifs, uint64_t *user_count) { struct bt_notification_iterator_private_connection *iterator = (void *) user_iterator; struct bt_private_connection_private_notification_iterator *priv_iterator = bt_private_connection_private_notification_iterator_from_notification_iterator(iterator); bt_component_class_notification_iterator_next_method next_method = NULL; - struct bt_notification_iterator_next_method_return next_return = { - .status = BT_NOTIFICATION_ITERATOR_STATUS_OK, - .notification = NULL, - }; enum bt_notification_iterator_status status = BT_NOTIFICATION_ITERATOR_STATUS_OK; BT_ASSERT_PRE_NON_NULL(user_iterator, "Notification iterator"); - BT_ASSERT_PRE_NON_NULL(user_notif, "Notification"); + BT_ASSERT_PRE_NON_NULL(user_notifs, "Notification array"); + BT_ASSERT_PRE_NON_NULL(user_count, "Notification count"); BT_ASSERT_PRE(user_iterator->type == BT_NOTIFICATION_ITERATOR_TYPE_PRIVATE_CONNECTION, "Notification iterator was not created from a private connection: " @@ -670,12 +719,13 @@ bt_private_connection_notification_iterator_next( */ BT_ASSERT(next_method); BT_LOGD_STR("Calling user's \"next\" method."); - next_return = next_method(priv_iterator); + status = next_method(priv_iterator, + (void *) user_iterator->notifs->pdata, + NOTIF_BATCH_SIZE, user_count); BT_LOGD("User method returned: status=%s", - bt_notification_iterator_status_string(next_return.status)); - if (next_return.status < 0) { + bt_notification_iterator_status_string(status)); + if (status < 0) { BT_LOGW_STR("User method failed."); - status = next_return.status; goto end; } @@ -694,16 +744,31 @@ bt_private_connection_notification_iterator_next( * BT_NOTIFICATION_ITERATOR_STATUS_OK because * otherwise this field could be garbage. */ - if (next_return.status == - BT_NOTIFICATION_ITERATOR_STATUS_OK) { - bt_put(next_return.notification); + if (status == BT_NOTIFICATION_ITERATOR_STATUS_OK) { + uint64_t i; + bt_notification_array notifs = + (void *) user_iterator->notifs->pdata; + + for (i = 0; i < *user_count; i++) { + bt_put(notifs[i]); + } } status = BT_NOTIFICATION_ITERATOR_STATUS_CANCELED; goto end; } - switch (next_return.status) { + switch (status) { + case BT_NOTIFICATION_ITERATOR_STATUS_OK: + BT_ASSERT_PRE(validate_notifications(iterator, *user_count), + "Notifications are invalid at this point: " + "%![notif-iter-]+i, count=%" PRIu64, + iterator, *user_count); + *user_notifs = (void *) user_iterator->notifs->pdata; + break; + case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN: + status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN; + goto end; case BT_NOTIFICATION_ITERATOR_STATUS_END: BT_ASSERT_PRE(priv_conn_notif_iter_can_end(iterator), "Notification iterator cannot end at this point: " @@ -715,20 +780,6 @@ bt_private_connection_notification_iterator_next( BT_LOGD("Set new status: status=%s", bt_notification_iterator_status_string(status)); goto end; - case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN: - status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN; - goto end; - case BT_NOTIFICATION_ITERATOR_STATUS_OK: - BT_ASSERT_PRE(next_return.notification, - "User method returned BT_NOTIFICATION_ITERATOR_STATUS_OK, but notification is NULL: " - "%!+i", iterator); - BT_ASSERT_PRE(validate_notification(iterator, - next_return.notification), - "Notification is invalid at this point: " - "%![notif-iter-]+i, %![notif-]+n", - iterator, next_return.notification); - *user_notif = next_return.notification; - break; default: /* Unknown non-error status */ abort(); @@ -741,7 +792,8 @@ end: enum bt_notification_iterator_status bt_output_port_notification_iterator_next( struct bt_notification_iterator *iterator, - struct bt_notification **user_notif) + bt_notification_array *notifs_to_user, + uint64_t *count_to_user) { enum bt_notification_iterator_status status; struct bt_notification_iterator_output_port *out_port_iter = @@ -749,37 +801,40 @@ bt_output_port_notification_iterator_next( enum bt_graph_status graph_status; BT_ASSERT_PRE_NON_NULL(iterator, "Notification iterator"); - BT_ASSERT_PRE_NON_NULL(user_notif, "Notification"); + BT_ASSERT_PRE_NON_NULL(notifs_to_user, "Notification array"); + BT_ASSERT_PRE_NON_NULL(count_to_user, "Notification count"); BT_ASSERT_PRE(iterator->type == BT_NOTIFICATION_ITERATOR_TYPE_OUTPUT_PORT, "Notification iterator was not created from an output port: " "%!+i", iterator); BT_LIB_LOGD("Getting next output port notification iterator's notification: %!+i", iterator); + graph_status = bt_graph_consume_sink_no_check( out_port_iter->graph, out_port_iter->colander); switch (graph_status) { case BT_GRAPH_STATUS_CANCELED: - BT_ASSERT(!out_port_iter->notif); status = BT_NOTIFICATION_ITERATOR_STATUS_CANCELED; break; case BT_GRAPH_STATUS_AGAIN: - BT_ASSERT(!out_port_iter->notif); status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN; break; case BT_GRAPH_STATUS_END: - BT_ASSERT(!out_port_iter->notif); status = BT_NOTIFICATION_ITERATOR_STATUS_END; break; case BT_GRAPH_STATUS_NOMEM: - BT_ASSERT(!out_port_iter->notif); status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM; break; case BT_GRAPH_STATUS_OK: - BT_ASSERT(out_port_iter->notif); status = BT_NOTIFICATION_ITERATOR_STATUS_OK; - *user_notif = out_port_iter->notif; - out_port_iter->notif = NULL; + + /* + * On success, the colander sink moves the notifications + * to this iterator's array and sets this iterator's + * notification count: move them to the user. + */ + *notifs_to_user = (void *) iterator->notifs->pdata; + *count_to_user = out_port_iter->count; break; default: /* Other errors */ @@ -820,7 +875,6 @@ void bt_output_port_notification_iterator_destroy(struct bt_object *obj) BT_LOGD("Destroying output port notification iterator object: addr=%p", iterator); - BT_ASSERT(!iterator->notif); BT_LOGD_STR("Putting graph."); bt_put(iterator->graph); BT_LOGD_STR("Putting colander sink component."); @@ -841,6 +895,7 @@ struct bt_notification_iterator *bt_output_port_notification_iterator_create( const char *colander_comp_name; struct bt_port *colander_in_port = NULL; struct bt_component_class_sink_colander_data colander_data; + int ret; BT_ASSERT_PRE_NON_NULL(output_port, "Output port"); BT_ASSERT_PRE(bt_port_get_type(output_port) == BT_PORT_TYPE_OUTPUT, @@ -862,9 +917,14 @@ struct bt_notification_iterator *bt_output_port_notification_iterator_create( goto error; } - init_notification_iterator((void *) iterator, + ret = init_notification_iterator((void *) iterator, BT_NOTIFICATION_ITERATOR_TYPE_OUTPUT_PORT, bt_output_port_notification_iterator_destroy); + if (ret) { + /* init_notification_iterator() logs errors */ + BT_PUT(iterator); + goto end; + } /* Create colander component */ colander_comp_cls = bt_component_class_sink_colander_get(); @@ -876,7 +936,9 @@ struct bt_notification_iterator *bt_output_port_notification_iterator_create( BT_MOVE(iterator->graph, graph); colander_comp_name = colander_component_name ? colander_component_name : "colander"; - colander_data.notification = &iterator->notif; + colander_data.notifs = (void *) iterator->base.notifs->pdata; + colander_data.count_addr = &iterator->count; + graph_status = bt_graph_add_component_with_init_method_data( iterator->graph, colander_comp_cls, colander_comp_name, NULL, &colander_data, &iterator->colander); diff --git a/plugins/Makefile.am b/plugins/Makefile.am index 7b4ca384..4622c072 100644 --- a/plugins/Makefile.am +++ b/plugins/Makefile.am @@ -1,4 +1,4 @@ -SUBDIRS = text utils ctf +SUBDIRS = utils ctf text # libctfcopytrace if ENABLE_DEBUG_INFO diff --git a/plugins/ctf/common/notif-iter/notif-iter.c b/plugins/ctf/common/notif-iter/notif-iter.c index 4ab01c4e..fc9bccc2 100644 --- a/plugins/ctf/common/notif-iter/notif-iter.c +++ b/plugins/ctf/common/notif-iter/notif-iter.c @@ -3138,7 +3138,8 @@ enum bt_notif_iter_status bt_notif_iter_get_next_notification( goto end; } - BT_MOVE(*notification, notit->event_notif); + *notification = notit->event_notif; + notit->event_notif = NULL; goto end; case STATE_EMIT_NOTIF_END_OF_PACKET: /* Update clock with timestamp_end field. */ diff --git a/plugins/ctf/fs-src/data-stream-file.c b/plugins/ctf/fs-src/data-stream-file.c index 76f5688a..2fbc3c0c 100644 --- a/plugins/ctf/fs-src/data-stream-file.c +++ b/plugins/ctf/fs-src/data-stream-file.c @@ -818,25 +818,23 @@ void ctf_fs_ds_file_destroy(struct ctf_fs_ds_file *ds_file) } BT_HIDDEN -struct bt_notification_iterator_next_method_return ctf_fs_ds_file_next( - struct ctf_fs_ds_file *ds_file) +enum bt_notification_iterator_status ctf_fs_ds_file_next( + struct ctf_fs_ds_file *ds_file, + struct bt_notification **notif) { enum bt_notif_iter_status notif_iter_status; - struct bt_notification_iterator_next_method_return ret = { - .status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR, - .notification = NULL, - }; + enum bt_notification_iterator_status status; notif_iter_status = bt_notif_iter_get_next_notification( ds_file->notif_iter, ds_file->cc_prio_map, ds_file->graph, - &ret.notification); + notif); switch (notif_iter_status) { case BT_NOTIF_ITER_STATUS_EOF: - ret.status = BT_NOTIFICATION_ITERATOR_STATUS_END; + status = BT_NOTIFICATION_ITERATOR_STATUS_END; break; case BT_NOTIF_ITER_STATUS_OK: - ret.status = BT_NOTIFICATION_ITERATOR_STATUS_OK; + status = BT_NOTIFICATION_ITERATOR_STATUS_OK; break; case BT_NOTIF_ITER_STATUS_AGAIN: /* @@ -848,11 +846,11 @@ struct bt_notification_iterator_next_method_return ctf_fs_ds_file_next( case BT_NOTIF_ITER_STATUS_INVAL: case BT_NOTIF_ITER_STATUS_ERROR: default: - ret.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; + status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; break; } - return ret; + return status; } BT_HIDDEN diff --git a/plugins/ctf/fs-src/data-stream-file.h b/plugins/ctf/fs-src/data-stream-file.h index e7d8b04a..be035606 100644 --- a/plugins/ctf/fs-src/data-stream-file.h +++ b/plugins/ctf/fs-src/data-stream-file.h @@ -140,8 +140,9 @@ BT_HIDDEN void ctf_fs_ds_file_destroy(struct ctf_fs_ds_file *stream); BT_HIDDEN -struct bt_notification_iterator_next_method_return ctf_fs_ds_file_next( - struct ctf_fs_ds_file *stream); +enum bt_notification_iterator_status ctf_fs_ds_file_next( + struct ctf_fs_ds_file *ds_file, + struct bt_notification **notif); BT_HIDDEN struct ctf_fs_ds_index *ctf_fs_ds_file_build_index( diff --git a/plugins/ctf/fs-src/fs.c b/plugins/ctf/fs-src/fs.c index 04421d31..eac20e42 100644 --- a/plugins/ctf/fs-src/fs.c +++ b/plugins/ctf/fs-src/fs.c @@ -87,19 +87,19 @@ void ctf_fs_notif_iter_data_destroy( g_free(notif_iter_data); } -struct bt_notification_iterator_next_method_return ctf_fs_iterator_next( - struct bt_private_connection_private_notification_iterator *iterator) +static +enum bt_notification_iterator_status ctf_fs_iterator_next_one( + struct ctf_fs_notif_iter_data *notif_iter_data, + struct bt_notification **notif) { - struct bt_notification_iterator_next_method_return next_ret; - struct ctf_fs_notif_iter_data *notif_iter_data = - bt_private_connection_private_notification_iterator_get_user_data(iterator); + enum bt_notification_iterator_status status; int ret; BT_ASSERT(notif_iter_data->ds_file); - next_ret = ctf_fs_ds_file_next(notif_iter_data->ds_file); + status = ctf_fs_ds_file_next(notif_iter_data->ds_file, notif); - if (next_ret.status == BT_NOTIFICATION_ITERATOR_STATUS_OK && - bt_notification_get_type(next_ret.notification) == + if (status == BT_NOTIFICATION_ITERATOR_STATUS_OK && + bt_notification_get_type(*notif) == BT_NOTIFICATION_TYPE_STREAM_BEGIN) { if (notif_iter_data->skip_stream_begin_notifs) { /* @@ -107,9 +107,10 @@ struct bt_notification_iterator_next_method_return ctf_fs_iterator_next( * BT_NOTIFICATION_TYPE_STREAM_BEGIN * notification: skip this one, get a new one. */ - BT_PUT(next_ret.notification); - next_ret = ctf_fs_ds_file_next(notif_iter_data->ds_file); - BT_ASSERT(next_ret.status != BT_NOTIFICATION_ITERATOR_STATUS_END); + BT_PUT(*notif); + status = ctf_fs_ds_file_next(notif_iter_data->ds_file, + notif); + BT_ASSERT(status != BT_NOTIFICATION_ITERATOR_STATUS_END); goto end; } else { /* @@ -121,8 +122,8 @@ struct bt_notification_iterator_next_method_return ctf_fs_iterator_next( } } - if (next_ret.status == BT_NOTIFICATION_ITERATOR_STATUS_OK && - bt_notification_get_type(next_ret.notification) == + if (status == BT_NOTIFICATION_ITERATOR_STATUS_OK && + bt_notification_get_type(*notif) == BT_NOTIFICATION_TYPE_STREAM_END) { notif_iter_data->ds_file_info_index++; @@ -140,7 +141,7 @@ struct bt_notification_iterator_next_method_return ctf_fs_iterator_next( goto end; } - BT_PUT(next_ret.notification); + BT_PUT(*notif); bt_notif_iter_reset(notif_iter_data->notif_iter); /* @@ -149,11 +150,11 @@ struct bt_notification_iterator_next_method_return ctf_fs_iterator_next( */ ret = notif_iter_data_set_current_ds_file(notif_iter_data); if (ret) { - next_ret.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; + status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; goto end; } - next_ret = ctf_fs_ds_file_next(notif_iter_data->ds_file); + status = ctf_fs_ds_file_next(notif_iter_data->ds_file, notif); /* * If we get a notification, we expect to get a @@ -173,17 +174,56 @@ struct bt_notification_iterator_next_method_return ctf_fs_iterator_next( */ BT_ASSERT(notif_iter_data->skip_stream_begin_notifs); - if (next_ret.status == BT_NOTIFICATION_ITERATOR_STATUS_OK) { - BT_ASSERT(bt_notification_get_type(next_ret.notification) == + if (status == BT_NOTIFICATION_ITERATOR_STATUS_OK) { + BT_ASSERT(bt_notification_get_type(*notif) == BT_NOTIFICATION_TYPE_STREAM_BEGIN); - BT_PUT(next_ret.notification); - next_ret = ctf_fs_ds_file_next(notif_iter_data->ds_file); - BT_ASSERT(next_ret.status != BT_NOTIFICATION_ITERATOR_STATUS_END); + BT_PUT(*notif); + status = ctf_fs_ds_file_next(notif_iter_data->ds_file, + notif); + BT_ASSERT(status != BT_NOTIFICATION_ITERATOR_STATUS_END); } } end: - return next_ret; + return status; +} + +BT_HIDDEN +enum bt_notification_iterator_status ctf_fs_iterator_next( + struct bt_private_connection_private_notification_iterator *iterator, + bt_notification_array notifs, uint64_t capacity, + uint64_t *count) +{ + enum bt_notification_iterator_status status = + BT_NOTIFICATION_ITERATOR_STATUS_OK; + struct ctf_fs_notif_iter_data *notif_iter_data = + bt_private_connection_private_notification_iterator_get_user_data(iterator); + uint64_t i = 0; + + while (i < capacity && status == BT_NOTIFICATION_ITERATOR_STATUS_OK) { + status = ctf_fs_iterator_next_one(notif_iter_data, ¬ifs[i]); + if (status == BT_NOTIFICATION_ITERATOR_STATUS_OK) { + i++; + } + } + + if (i > 0) { + /* + * Even if ctf_fs_iterator_next_one() returned something + * else than BT_NOTIFICATION_ITERATOR_STATUS_OK, we + * accumulated notification objects in the output + * notification array, so we need to return + * BT_NOTIFICATION_ITERATOR_STATUS_OK so that they are + * transfered to downstream. This other status occurs + * again the next time muxer_notif_iter_do_next() is + * called, possibly without any accumulated + * notification, in which case we'll return it. + */ + *count = i; + status = BT_NOTIFICATION_ITERATOR_STATUS_OK; + } + + return status; } void ctf_fs_iterator_finalize(struct bt_private_connection_private_notification_iterator *it) diff --git a/plugins/ctf/fs-src/fs.h b/plugins/ctf/fs-src/fs.h index b54e4722..534cb6b5 100644 --- a/plugins/ctf/fs-src/fs.h +++ b/plugins/ctf/fs-src/fs.h @@ -179,7 +179,9 @@ BT_HIDDEN void ctf_fs_iterator_finalize(struct bt_private_connection_private_notification_iterator *it); BT_HIDDEN -struct bt_notification_iterator_next_method_return ctf_fs_iterator_next( - struct bt_private_connection_private_notification_iterator *iterator); +enum bt_notification_iterator_status ctf_fs_iterator_next( + struct bt_private_connection_private_notification_iterator *iterator, + bt_notification_array notifs, uint64_t capacity, + uint64_t *count); #endif /* BABELTRACE_PLUGIN_CTF_FS_H */ diff --git a/plugins/text/dmesg/dmesg.c b/plugins/text/dmesg/dmesg.c index 9f0d9e1c..4a3aca80 100644 --- a/plugins/text/dmesg/dmesg.c +++ b/plugins/text/dmesg/dmesg.c @@ -872,26 +872,22 @@ void dmesg_notif_iter_finalize( priv_notif_iter)); } -BT_HIDDEN -struct bt_notification_iterator_next_method_return dmesg_notif_iter_next( - struct bt_private_connection_private_notification_iterator *priv_notif_iter) +static +enum bt_notification_iterator_status dmesg_notif_iter_next_one( + struct dmesg_notif_iter *dmesg_notif_iter, + struct bt_notification **notif) { ssize_t len; - struct dmesg_notif_iter *dmesg_notif_iter = - bt_private_connection_private_notification_iterator_get_user_data( - priv_notif_iter); struct dmesg_component *dmesg_comp; - struct bt_notification_iterator_next_method_return next_ret = { - .status = BT_NOTIFICATION_ITERATOR_STATUS_OK, - .notification = NULL - }; + enum bt_notification_iterator_status status = + BT_NOTIFICATION_ITERATOR_STATUS_OK; BT_ASSERT(dmesg_notif_iter); dmesg_comp = dmesg_notif_iter->dmesg_comp; BT_ASSERT(dmesg_comp); if (dmesg_notif_iter->state == STATE_DONE) { - next_ret.status = BT_NOTIFICATION_ITERATOR_STATUS_END; + status = BT_NOTIFICATION_ITERATOR_STATUS_END; goto end; } @@ -909,16 +905,14 @@ struct bt_notification_iterator_next_method_return dmesg_notif_iter_next( &dmesg_notif_iter->linebuf_len, dmesg_notif_iter->fp); if (len < 0) { if (errno == EINVAL) { - next_ret.status = - BT_NOTIFICATION_ITERATOR_STATUS_ERROR; + status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; } else if (errno == ENOMEM) { - next_ret.status = + status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM; } else { if (dmesg_notif_iter->state == STATE_EMIT_STREAM_BEGINNING) { /* Stream did not even begin */ - next_ret.status = - BT_NOTIFICATION_ITERATOR_STATUS_END; + status = BT_NOTIFICATION_ITERATOR_STATUS_END; goto end; } else { /* End current packet now */ @@ -961,28 +955,28 @@ handle_state: switch (dmesg_notif_iter->state) { case STATE_EMIT_STREAM_BEGINNING: BT_ASSERT(dmesg_notif_iter->tmp_event_notif); - next_ret.notification = bt_notification_stream_begin_create( + *notif = bt_notification_stream_begin_create( dmesg_comp->graph, dmesg_comp->stream); dmesg_notif_iter->state = STATE_EMIT_PACKET_BEGINNING; break; case STATE_EMIT_PACKET_BEGINNING: BT_ASSERT(dmesg_notif_iter->tmp_event_notif); - next_ret.notification = bt_notification_packet_begin_create( + *notif = bt_notification_packet_begin_create( dmesg_comp->graph, dmesg_comp->packet); dmesg_notif_iter->state = STATE_EMIT_EVENT; break; case STATE_EMIT_EVENT: BT_ASSERT(dmesg_notif_iter->tmp_event_notif); - BT_MOVE(next_ret.notification, - dmesg_notif_iter->tmp_event_notif); + *notif = dmesg_notif_iter->tmp_event_notif; + dmesg_notif_iter->tmp_event_notif = NULL; break; case STATE_EMIT_PACKET_END: - next_ret.notification = bt_notification_packet_end_create( + *notif = bt_notification_packet_end_create( dmesg_comp->graph, dmesg_comp->packet); dmesg_notif_iter->state = STATE_EMIT_STREAM_END; break; case STATE_EMIT_STREAM_END: - next_ret.notification = bt_notification_stream_end_create( + *notif = bt_notification_stream_end_create( dmesg_comp->graph, dmesg_comp->stream); dmesg_notif_iter->state = STATE_DONE; break; @@ -990,12 +984,52 @@ handle_state: break; } - if (!next_ret.notification) { + if (!*notif) { BT_LOGE("Cannot create notification: dmesg-comp-addr=%p", dmesg_comp); - next_ret.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; + status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; } end: - return next_ret; + return status; +} + +BT_HIDDEN +enum bt_notification_iterator_status dmesg_notif_iter_next( + struct bt_private_connection_private_notification_iterator *priv_notif_iter, + bt_notification_array notifs, uint64_t capacity, + uint64_t *count) +{ + struct dmesg_notif_iter *dmesg_notif_iter = + bt_private_connection_private_notification_iterator_get_user_data( + priv_notif_iter); + enum bt_notification_iterator_status status = + BT_NOTIFICATION_ITERATOR_STATUS_OK; + uint64_t i = 0; + + while (i < capacity && status == BT_NOTIFICATION_ITERATOR_STATUS_OK) { + status = dmesg_notif_iter_next_one(dmesg_notif_iter, ¬ifs[i]); + if (status == BT_NOTIFICATION_ITERATOR_STATUS_OK) { + i++; + } + } + + if (i > 0) { + /* + * Even if dmesg_notif_iter_next_one() returned + * something else than + * BT_NOTIFICATION_ITERATOR_STATUS_OK, we accumulated + * notification objects in the output notification + * array, so we need to return + * BT_NOTIFICATION_ITERATOR_STATUS_OK so that they are + * transfered to downstream. This other status occurs + * again the next time muxer_notif_iter_do_next() is + * called, possibly without any accumulated + * notification, in which case we'll return it. + */ + *count = i; + status = BT_NOTIFICATION_ITERATOR_STATUS_OK; + } + + return status; } diff --git a/plugins/text/dmesg/dmesg.h b/plugins/text/dmesg/dmesg.h index e85a8906..3e2398cf 100644 --- a/plugins/text/dmesg/dmesg.h +++ b/plugins/text/dmesg/dmesg.h @@ -44,7 +44,9 @@ void dmesg_notif_iter_finalize( struct bt_private_connection_private_notification_iterator *priv_notif_iter); BT_HIDDEN -struct bt_notification_iterator_next_method_return dmesg_notif_iter_next( - struct bt_private_connection_private_notification_iterator *priv_notif_iter); +enum bt_notification_iterator_status dmesg_notif_iter_next( + struct bt_private_connection_private_notification_iterator *priv_notif_iter, + bt_notification_array notifs, uint64_t capacity, + uint64_t *count); #endif /* BABELTRACE_PLUGIN_TEXT_DMESG_DMESG_H */ diff --git a/plugins/text/pretty/pretty.c b/plugins/text/pretty/pretty.c index 77626676..1010917d 100644 --- a/plugins/text/pretty/pretty.c +++ b/plugins/text/pretty/pretty.c @@ -179,11 +179,13 @@ BT_HIDDEN enum bt_component_status pretty_consume(struct bt_private_component *component) { enum bt_component_status ret; - struct bt_notification *notif = NULL; + bt_notification_array notifs; struct bt_notification_iterator *it; struct pretty_component *pretty = bt_private_component_get_user_data(component); enum bt_notification_iterator_status it_ret; + uint64_t count = 0; + uint64_t i = 0; if (unlikely(pretty->error)) { ret = BT_COMPONENT_STATUS_ERROR; @@ -191,7 +193,8 @@ enum bt_component_status pretty_consume(struct bt_private_component *component) } it = pretty->input_iterator; - it_ret = bt_private_connection_notification_iterator_next(it, ¬if); + it_ret = bt_private_connection_notification_iterator_next(it, ¬ifs, + &count); switch (it_ret) { case BT_NOTIFICATION_ITERATOR_STATUS_END: @@ -208,11 +211,22 @@ enum bt_component_status pretty_consume(struct bt_private_component *component) goto end; } - BT_ASSERT(notif); - ret = handle_notification(pretty, notif); + BT_ASSERT(it_ret == BT_NOTIFICATION_ITERATOR_STATUS_OK); + + for (i = 0; i < count; i++) { + ret = handle_notification(pretty, notifs[i]); + if (ret) { + goto end; + } + + bt_put(notifs[i]); + } end: - bt_put(notif); + for (; i < count; i++) { + bt_put(notifs[i]); + } + return ret; } diff --git a/plugins/utils/Makefile.am b/plugins/utils/Makefile.am index b99ab206..d85364e2 100644 --- a/plugins/utils/Makefile.am +++ b/plugins/utils/Makefile.am @@ -1,6 +1,6 @@ AM_CPPFLAGS += -I$(top_srcdir)/plugins -SUBDIRS = dummy counter muxer . +SUBDIRS = dummy muxer counter . # trimmer plugindir = "$(PLUGINSDIR)" diff --git a/plugins/utils/counter/counter.c b/plugins/utils/counter/counter.c index 7ff3d339..bbcb8276 100644 --- a/plugins/utils/counter/counter.c +++ b/plugins/utils/counter/counter.c @@ -54,8 +54,10 @@ uint64_t get_total_count(struct counter *counter) } static -void print_count(struct counter *counter, uint64_t total) +void print_count(struct counter *counter) { + uint64_t total = get_total_count(counter); + PRINTF_COUNT("event", "events", event); PRINTF_COUNT("stream beginning", "stream beginnings", stream_begin); PRINTF_COUNT("stream end", "stream ends", stream_end); @@ -83,19 +85,18 @@ void print_count(struct counter *counter, uint64_t total) } static -void try_print_count(struct counter *counter) +void try_print_count(struct counter *counter, uint64_t notif_count) { - uint64_t total; - if (counter->step == 0) { /* No update */ return; } - total = get_total_count(counter); + counter->at += notif_count; - if (total % counter->step == 0) { - print_count(counter, total); + if (counter->at >= counter->step) { + counter->at = 0; + print_count(counter); putchar('\n'); } } @@ -106,7 +107,7 @@ void try_print_last(struct counter *counter) const uint64_t total = get_total_count(counter); if (total != counter->last_printed_total) { - print_count(counter, total); + print_count(counter); } } @@ -215,7 +216,8 @@ enum bt_component_status counter_consume(struct bt_private_component *component) struct counter *counter; enum bt_notification_iterator_status it_ret; int64_t count; - struct bt_notification *notif = NULL; + uint64_t notif_count; + bt_notification_array notifs; counter = bt_private_component_get_user_data(component); BT_ASSERT(counter); @@ -231,9 +233,9 @@ enum bt_component_status counter_consume(struct bt_private_component *component) goto end; } - /* Consume one notification */ + /* Consume notifications */ it_ret = bt_private_connection_notification_iterator_next( - counter->notif_iter, ¬if); + counter->notif_iter, ¬ifs, ¬if_count); if (it_ret < 0) { ret = BT_COMPONENT_STATUS_ERROR; goto end; @@ -249,53 +251,60 @@ enum bt_component_status counter_consume(struct bt_private_component *component) goto end; case BT_NOTIFICATION_ITERATOR_STATUS_OK: { - BT_ASSERT(notif); - switch (bt_notification_get_type(notif)) { - case BT_NOTIFICATION_TYPE_EVENT: - counter->count.event++; - break; - case BT_NOTIFICATION_TYPE_INACTIVITY: - counter->count.inactivity++; - break; - case BT_NOTIFICATION_TYPE_STREAM_BEGIN: - counter->count.stream_begin++; - break; - case BT_NOTIFICATION_TYPE_STREAM_END: - counter->count.stream_end++; - break; - case BT_NOTIFICATION_TYPE_PACKET_BEGIN: - counter->count.packet_begin++; - break; - case BT_NOTIFICATION_TYPE_PACKET_END: - counter->count.packet_end++; - break; - case BT_NOTIFICATION_TYPE_DISCARDED_EVENTS: - counter->count.discarded_events_notifs++; - count = bt_notification_discarded_events_get_count( - notif); - if (count >= 0) { - counter->count.discarded_events += count; + uint64_t i; + + for (i = 0; i < notif_count; i++) { + struct bt_notification *notif = notifs[i]; + + BT_ASSERT(notif); + switch (bt_notification_get_type(notif)) { + case BT_NOTIFICATION_TYPE_EVENT: + counter->count.event++; + break; + case BT_NOTIFICATION_TYPE_INACTIVITY: + counter->count.inactivity++; + break; + case BT_NOTIFICATION_TYPE_STREAM_BEGIN: + counter->count.stream_begin++; + break; + case BT_NOTIFICATION_TYPE_STREAM_END: + counter->count.stream_end++; + break; + case BT_NOTIFICATION_TYPE_PACKET_BEGIN: + counter->count.packet_begin++; + break; + case BT_NOTIFICATION_TYPE_PACKET_END: + counter->count.packet_end++; + break; + case BT_NOTIFICATION_TYPE_DISCARDED_EVENTS: + counter->count.discarded_events_notifs++; + count = bt_notification_discarded_events_get_count( + notif); + if (count >= 0) { + counter->count.discarded_events += count; + } + break; + case BT_NOTIFICATION_TYPE_DISCARDED_PACKETS: + counter->count.discarded_packets_notifs++; + count = bt_notification_discarded_packets_get_count( + notif); + if (count >= 0) { + counter->count.discarded_packets += count; + } + break; + default: + counter->count.other++; } - break; - case BT_NOTIFICATION_TYPE_DISCARDED_PACKETS: - counter->count.discarded_packets_notifs++; - count = bt_notification_discarded_packets_get_count( - notif); - if (count >= 0) { - counter->count.discarded_packets += count; - } - break; - default: - counter->count.other++; + + bt_put(notif); } } default: break; } - try_print_count(counter); + try_print_count(counter, notif_count); end: - bt_put(notif); return ret; } diff --git a/plugins/utils/counter/counter.h b/plugins/utils/counter/counter.h index 6915532b..2630f10e 100644 --- a/plugins/utils/counter/counter.h +++ b/plugins/utils/counter/counter.h @@ -44,6 +44,7 @@ struct counter { uint64_t other; } count; uint64_t last_printed_total; + uint64_t at; uint64_t step; bool hide_zero; bool error; diff --git a/plugins/utils/dummy/dummy.c b/plugins/utils/dummy/dummy.c index 5109259e..926a6dd5 100644 --- a/plugins/utils/dummy/dummy.c +++ b/plugins/utils/dummy/dummy.c @@ -103,9 +103,11 @@ end: enum bt_component_status dummy_consume(struct bt_private_component *component) { enum bt_component_status ret = BT_COMPONENT_STATUS_OK; - struct bt_notification *notif = NULL; + bt_notification_array notifs; + uint64_t count; struct dummy *dummy; enum bt_notification_iterator_status it_ret; + uint64_t i; dummy = bt_private_component_get_user_data(component); BT_ASSERT(dummy); @@ -122,22 +124,29 @@ enum bt_component_status dummy_consume(struct bt_private_component *component) /* Consume one notification */ it_ret = bt_private_connection_notification_iterator_next( - dummy->notif_iter, ¬if); + dummy->notif_iter, ¬ifs, &count); switch (it_ret) { - case BT_NOTIFICATION_ITERATOR_STATUS_ERROR: - ret = BT_COMPONENT_STATUS_ERROR; - goto end; + case BT_NOTIFICATION_ITERATOR_STATUS_OK: + ret = BT_COMPONENT_STATUS_OK; + + for (i = 0; i < count; i++) { + bt_put(notifs[i]); + } + + break; case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN: ret = BT_COMPONENT_STATUS_AGAIN; goto end; case BT_NOTIFICATION_ITERATOR_STATUS_END: ret = BT_COMPONENT_STATUS_END; goto end; + case BT_NOTIFICATION_ITERATOR_STATUS_ERROR: + ret = BT_COMPONENT_STATUS_ERROR; + goto end; default: break; } end: - bt_put(notif); return ret; } diff --git a/plugins/utils/muxer/muxer.c b/plugins/utils/muxer/muxer.c index 61cd4883..663bd346 100644 --- a/plugins/utils/muxer/muxer.c +++ b/plugins/utils/muxer/muxer.c @@ -61,21 +61,8 @@ struct muxer_upstream_notif_iter { /* Owned by this, NULL if ended */ struct bt_notification_iterator *notif_iter; - /* Owned by this */ - struct bt_notification *notif; - - /* - * This flag is true if the upstream notification iterator's - * current notification must be considered for the multiplexing - * operations. If the upstream iterator returns - * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN, then this object - * is considered invalid, because its current notification is - * still the previous one, but we already took it into account. - * - * The value of this flag is not important if notif_iter above - * is NULL (which means the upstream iterator is finished). - */ - bool is_valid; + /* Contains `struct bt_notification *`, owned by this */ + GQueue *notifs; }; enum muxer_notif_iter_clock_class_expectation { @@ -107,9 +94,6 @@ struct muxer_notif_iter { */ GList *newly_connected_priv_ports; - /* Next thing to return by the "next" method */ - struct bt_notification_iterator_next_method_return next_next_return; - /* Last time returned in a notification */ int64_t last_returned_ts_ns; @@ -133,12 +117,23 @@ void destroy_muxer_upstream_notif_iter( } BT_LOGD("Destroying muxer's upstream notification iterator wrapper: " - "addr=%p, notif-iter-addr=%p, is-valid=%d", + "addr=%p, notif-iter-addr=%p, queue-len=%u", muxer_upstream_notif_iter, muxer_upstream_notif_iter->notif_iter, - muxer_upstream_notif_iter->is_valid); + muxer_upstream_notif_iter->notifs->length); bt_put(muxer_upstream_notif_iter->notif_iter); - bt_put(muxer_upstream_notif_iter->notif); + + if (muxer_upstream_notif_iter->notifs) { + struct bt_notification *notif; + + while ((notif = g_queue_pop_head( + muxer_upstream_notif_iter->notifs))) { + bt_put(notif); + } + + g_queue_free(muxer_upstream_notif_iter->notifs); + } + g_free(muxer_upstream_notif_iter); } @@ -157,7 +152,13 @@ struct muxer_upstream_notif_iter *muxer_notif_iter_add_upstream_notif_iter( } muxer_upstream_notif_iter->notif_iter = bt_get(notif_iter); - muxer_upstream_notif_iter->is_valid = false; + muxer_upstream_notif_iter->notifs = g_queue_new(); + if (!muxer_upstream_notif_iter->notifs) { + BT_LOGE_STR("Failed to allocate a GQueue."); + + goto end; + } + g_ptr_array_add(muxer_notif_iter->muxer_upstream_notif_iters, muxer_upstream_notif_iter); BT_LOGD("Added muxer's upstream notification iterator wrapper: " @@ -452,36 +453,45 @@ enum bt_notification_iterator_status muxer_upstream_notif_iter_next( struct muxer_upstream_notif_iter *muxer_upstream_notif_iter) { enum bt_notification_iterator_status status; - struct bt_notification *notif = NULL; + bt_notification_array notifs; + uint64_t i; + uint64_t count; BT_LOGV("Calling upstream notification iterator's \"next\" method: " "muxer-upstream-notif-iter-wrap-addr=%p, notif-iter-addr=%p", muxer_upstream_notif_iter, muxer_upstream_notif_iter->notif_iter); status = bt_private_connection_notification_iterator_next( - muxer_upstream_notif_iter->notif_iter, ¬if); + muxer_upstream_notif_iter->notif_iter, ¬ifs, &count); BT_LOGV("Upstream notification iterator's \"next\" method returned: " "status=%s", bt_notification_iterator_status_string(status)); switch (status) { case BT_NOTIFICATION_ITERATOR_STATUS_OK: /* - * Notification iterator's current notification is valid: - * it must be considered for muxing operations. + * Notification iterator's current notification is + * valid: it must be considered for muxing operations. */ BT_LOGV_STR("Validated upstream notification iterator wrapper."); - muxer_upstream_notif_iter->is_valid = true; - BT_MOVE(muxer_upstream_notif_iter->notif, notif); + BT_ASSERT(count > 0); + + /* Move notifications to our queue */ + for (i = 0; i < count; i++) { + /* + * Push to tail in order; other side + * (muxer_notif_iter_do_next_one()) consumes + * from the head first. + */ + g_queue_push_tail(muxer_upstream_notif_iter->notifs, + notifs[i]); + } break; case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN: /* * Notification iterator's current notification is not * valid anymore. Return - * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN - * immediately. + * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN immediately. */ - BT_LOGV_STR("Invalidated upstream notification iterator wrapper because of BT_NOTIFICATION_ITERATOR_STATUS_AGAIN."); - muxer_upstream_notif_iter->is_valid = false; break; case BT_NOTIFICATION_ITERATOR_STATUS_END: /* Fall-through. */ case BT_NOTIFICATION_ITERATOR_STATUS_CANCELED: @@ -490,9 +500,7 @@ enum bt_notification_iterator_status muxer_upstream_notif_iter_next( * won't be considered again to find the youngest * notification. */ - BT_LOGV_STR("Invalidated upstream notification iterator wrapper because of BT_NOTIFICATION_ITERATOR_STATUS_END or BT_NOTIFICATION_ITERATOR_STATUS_CANCELED."); BT_PUT(muxer_upstream_notif_iter->notif_iter); - muxer_upstream_notif_iter->is_valid = false; status = BT_NOTIFICATION_ITERATOR_STATUS_OK; break; default: @@ -503,7 +511,6 @@ enum bt_notification_iterator_status muxer_upstream_notif_iter_next( break; } - BT_ASSERT(!notif); return status; } @@ -518,7 +525,7 @@ int muxer_notif_iter_handle_newly_connected_ports( /* * Here we create one upstream notification iterator for each - * newly connected port. We do not perform an initial "next" on + * newly connected port. We do NOT perform an initial "next" on * those new upstream notification iterators: they are * invalidated, to be validated later. The list of newly * connected ports to handle here is updated by @@ -906,8 +913,8 @@ muxer_notif_iter_youngest_upstream_notif_iter( continue; } - BT_ASSERT(cur_muxer_upstream_notif_iter->is_valid); - notif = cur_muxer_upstream_notif_iter->notif; + BT_ASSERT(cur_muxer_upstream_notif_iter->notifs->length > 0); + notif = g_queue_peek_head(cur_muxer_upstream_notif_iter->notifs); BT_ASSERT(notif); ret = get_notif_ts_ns(muxer_comp, muxer_notif_iter, notif, muxer_notif_iter->last_returned_ts_ns, ¬if_ts_ns); @@ -946,8 +953,12 @@ enum bt_notification_iterator_status validate_muxer_upstream_notif_iter( "muxer-upstream-notif-iter-wrap-addr=%p", muxer_upstream_notif_iter); - if (muxer_upstream_notif_iter->is_valid || + if (muxer_upstream_notif_iter->notifs->length > 0 || !muxer_upstream_notif_iter->notif_iter) { + BT_LOGV("Already valid or not considered: " + "queue-len=%u, upstream-notif-iter-addr=%p", + muxer_upstream_notif_iter->notifs->length, + muxer_upstream_notif_iter->notif_iter); goto end; } @@ -960,7 +971,7 @@ end: static enum bt_notification_iterator_status validate_muxer_upstream_notif_iters( - struct muxer_notif_iter *muxer_notif_iter) + struct muxer_notif_iter *muxer_notif_iter) { enum bt_notification_iterator_status status = BT_NOTIFICATION_ITERATOR_STATUS_OK; @@ -1019,16 +1030,15 @@ end: return status; } -static -struct bt_notification_iterator_next_method_return muxer_notif_iter_do_next( +static inline +enum bt_notification_iterator_status muxer_notif_iter_do_next_one( struct muxer_comp *muxer_comp, - struct muxer_notif_iter *muxer_notif_iter) + struct muxer_notif_iter *muxer_notif_iter, + struct bt_notification **notif) { + enum bt_notification_iterator_status status = + BT_NOTIFICATION_ITERATOR_STATUS_OK; struct muxer_upstream_notif_iter *muxer_upstream_notif_iter = NULL; - struct bt_notification_iterator_next_method_return next_return = { - .notification = NULL, - .status = BT_NOTIFICATION_ITERATOR_STATUS_OK, - }; int64_t next_return_ts; while (true) { @@ -1040,14 +1050,12 @@ struct bt_notification_iterator_next_method_return muxer_notif_iter_do_next( "muxer-comp-addr=%p, muxer-notif-iter-addr=%p, " "ret=%d", muxer_comp, muxer_notif_iter, ret); - next_return.status = - BT_NOTIFICATION_ITERATOR_STATUS_ERROR; + status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; goto end; } - next_return.status = - validate_muxer_upstream_notif_iters(muxer_notif_iter); - if (next_return.status != BT_NOTIFICATION_ITERATOR_STATUS_OK) { + status = validate_muxer_upstream_notif_iters(muxer_notif_iter); + if (status != BT_NOTIFICATION_ITERATOR_STATUS_OK) { /* validate_muxer_upstream_notif_iters() logs details */ goto end; } @@ -1075,21 +1083,19 @@ struct bt_notification_iterator_next_method_return muxer_notif_iter_do_next( * amongst those, of which the current notification is the * youngest. */ - next_return.status = - muxer_notif_iter_youngest_upstream_notif_iter(muxer_comp, + status = muxer_notif_iter_youngest_upstream_notif_iter(muxer_comp, muxer_notif_iter, &muxer_upstream_notif_iter, &next_return_ts); - if (next_return.status < 0 || - next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_END || - next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_CANCELED) { - if (next_return.status < 0) { + if (status < 0 || status == BT_NOTIFICATION_ITERATOR_STATUS_END || + status == BT_NOTIFICATION_ITERATOR_STATUS_CANCELED) { + if (status < 0) { BT_LOGE("Cannot find the youngest upstream notification iterator wrapper: " "status=%s", - bt_notification_iterator_status_string(next_return.status)); + bt_notification_iterator_status_string(status)); } else { BT_LOGV("Cannot find the youngest upstream notification iterator wrapper: " "status=%s", - bt_notification_iterator_status_string(next_return.status)); + bt_notification_iterator_status_string(status)); } goto end; @@ -1101,7 +1107,7 @@ struct bt_notification_iterator_next_method_return muxer_notif_iter_do_next( "last-returned-ts=%" PRId64, muxer_notif_iter, next_return_ts, muxer_notif_iter->last_returned_ts_ns); - next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; + status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; goto end; } @@ -1110,21 +1116,58 @@ struct bt_notification_iterator_next_method_return muxer_notif_iter_do_next( "muxer-upstream-notif-iter-wrap-addr=%p, " "ts=%" PRId64, muxer_notif_iter, muxer_upstream_notif_iter, next_return_ts); - BT_ASSERT(next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_OK); + BT_ASSERT(status == BT_NOTIFICATION_ITERATOR_STATUS_OK); BT_ASSERT(muxer_upstream_notif_iter); - next_return.notification = bt_get(muxer_upstream_notif_iter->notif); - BT_ASSERT(next_return.notification); /* - * We invalidate the upstream notification iterator so that, the - * next time this function is called, - * validate_muxer_upstream_notif_iters() will make it valid. + * Consume from the queue's head: other side + * (muxer_upstream_notif_iter_next()) writes to the tail. */ - muxer_upstream_notif_iter->is_valid = false; + *notif = g_queue_pop_head(muxer_upstream_notif_iter->notifs); + BT_ASSERT(*notif); muxer_notif_iter->last_returned_ts_ns = next_return_ts; end: - return next_return; + return status; +} + +static +enum bt_notification_iterator_status muxer_notif_iter_do_next( + struct muxer_comp *muxer_comp, + struct muxer_notif_iter *muxer_notif_iter, + bt_notification_array notifs, uint64_t capacity, + uint64_t *count) +{ + enum bt_notification_iterator_status status = + BT_NOTIFICATION_ITERATOR_STATUS_OK; + uint64_t i = 0; + + while (i < capacity && status == BT_NOTIFICATION_ITERATOR_STATUS_OK) { + status = muxer_notif_iter_do_next_one(muxer_comp, + muxer_notif_iter, ¬ifs[i]); + if (status == BT_NOTIFICATION_ITERATOR_STATUS_OK) { + i++; + } + } + + if (i > 0) { + /* + * Even if muxer_notif_iter_do_next_one() returned + * something else than + * BT_NOTIFICATION_ITERATOR_STATUS_OK, we accumulated + * notification objects in the output notification + * array, so we need to return + * BT_NOTIFICATION_ITERATOR_STATUS_OK so that they are + * transfered to downstream. This other status occurs + * again the next time muxer_notif_iter_do_next() is + * called, possibly without any accumulated + * notification, in which case we'll return it. + */ + *count = i; + status = BT_NOTIFICATION_ITERATOR_STATUS_OK; + } + + return status; } static @@ -1338,10 +1381,12 @@ void muxer_notif_iter_finalize( } BT_HIDDEN -struct bt_notification_iterator_next_method_return muxer_notif_iter_next( - struct bt_private_connection_private_notification_iterator *priv_notif_iter) +enum bt_notification_iterator_status muxer_notif_iter_next( + struct bt_private_connection_private_notification_iterator *priv_notif_iter, + bt_notification_array notifs, uint64_t capacity, + uint64_t *count) { - struct bt_notification_iterator_next_method_return next_ret; + enum bt_notification_iterator_status status; struct muxer_notif_iter *muxer_notif_iter = bt_private_connection_private_notification_iterator_get_user_data(priv_notif_iter); struct bt_private_component *priv_comp = NULL; @@ -1353,7 +1398,6 @@ struct bt_notification_iterator_next_method_return muxer_notif_iter_next( BT_ASSERT(priv_comp); muxer_comp = bt_private_component_get_user_data(priv_comp); BT_ASSERT(muxer_comp); - BT_LOGV("Muxer component's notification iterator's \"next\" method called: " "comp-addr=%p, muxer-comp-addr=%p, muxer-notif-iter-addr=%p, " "notif-iter-addr=%p", @@ -1365,28 +1409,27 @@ struct bt_notification_iterator_next_method_return muxer_notif_iter_next( "comp-addr=%p, muxer-comp-addr=%p, muxer-notif-iter-addr=%p, " "notif-iter-addr=%p", priv_comp, muxer_comp, muxer_notif_iter, priv_notif_iter); - next_ret.notification = NULL; - next_ret.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; + status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; goto end; } - next_ret = muxer_notif_iter_do_next(muxer_comp, muxer_notif_iter); - if (next_ret.status < 0) { + status = muxer_notif_iter_do_next(muxer_comp, muxer_notif_iter, + notifs, capacity, count); + if (status < 0) { BT_LOGE("Cannot get next notification: " "comp-addr=%p, muxer-comp-addr=%p, muxer-notif-iter-addr=%p, " "notif-iter-addr=%p, status=%s", priv_comp, muxer_comp, muxer_notif_iter, priv_notif_iter, - bt_notification_iterator_status_string(next_ret.status)); + bt_notification_iterator_status_string(status)); } else { BT_LOGV("Returning from muxer component's notification iterator's \"next\" method: " - "status=%s, notif-addr=%p", - bt_notification_iterator_status_string(next_ret.status), - next_ret.notification); + "status=%s", + bt_notification_iterator_status_string(status)); } end: bt_put(priv_comp); - return next_ret; + return status; } BT_HIDDEN diff --git a/plugins/utils/muxer/muxer.h b/plugins/utils/muxer/muxer.h index b7e05ba3..5e7f1218 100644 --- a/plugins/utils/muxer/muxer.h +++ b/plugins/utils/muxer/muxer.h @@ -24,6 +24,7 @@ * SOFTWARE. */ +#include #include BT_HIDDEN @@ -45,8 +46,10 @@ void muxer_notif_iter_finalize( struct bt_private_connection_private_notification_iterator *priv_notif_iter); BT_HIDDEN -struct bt_notification_iterator_next_method_return muxer_notif_iter_next( - struct bt_private_connection_private_notification_iterator *priv_notif_iter); +enum bt_notification_iterator_status muxer_notif_iter_next( + struct bt_private_connection_private_notification_iterator *priv_notif_iter, + bt_notification_array notifs, uint64_t capacity, + uint64_t *count); BT_HIDDEN void muxer_port_connected( diff --git a/tests/lib/test-plugin-plugins/sfs.c b/tests/lib/test-plugin-plugins/sfs.c index f9d52130..4d0357c6 100644 --- a/tests/lib/test-plugin-plugins/sfs.c +++ b/tests/lib/test-plugin-plugins/sfs.c @@ -40,15 +40,12 @@ static void dummy_iterator_finalize_method( { } -static struct bt_notification_iterator_next_method_return dummy_iterator_next_method( - struct bt_private_connection_private_notification_iterator *private_iterator) +static enum bt_notification_iterator_status dummy_iterator_next_method( + struct bt_private_connection_private_notification_iterator *private_iterator, + bt_notification_array notifs, uint64_t capacity, + uint64_t *count) { - struct bt_notification_iterator_next_method_return next_return = { - .notification = NULL, - .status = BT_NOTIFICATION_ITERATOR_STATUS_OK, - }; - - return next_return; + return BT_NOTIFICATION_ITERATOR_STATUS_ERROR; } static struct bt_component_class_query_method_return query_method( diff --git a/tests/lib/test_bt_notification_iterator.c b/tests/lib/test_bt_notification_iterator.c index f076af51..4cecf362 100644 --- a/tests/lib/test_bt_notification_iterator.c +++ b/tests/lib/test_bt_notification_iterator.c @@ -404,96 +404,61 @@ enum bt_notification_iterator_status src_iter_init( } static -struct bt_notification_iterator_next_method_return src_iter_next_seq( - struct src_iter_user_data *user_data) +void src_iter_next_seq_one(struct src_iter_user_data *user_data, + struct bt_notification **notif) { - struct bt_notification_iterator_next_method_return next_return = { - .status = BT_NOTIFICATION_ITERATOR_STATUS_OK, - }; - int64_t cur_ts_ns; struct bt_packet *event_packet = NULL; - BT_ASSERT(user_data->seq); - cur_ts_ns = user_data->seq[user_data->at]; - - switch (cur_ts_ns) { - case SEQ_END: - next_return.status = - BT_NOTIFICATION_ITERATOR_STATUS_END; - break; + switch (user_data->seq[user_data->at]) { case SEQ_INACTIVITY: - next_return.notification = - bt_notification_inactivity_create(graph, + *notif = bt_notification_inactivity_create(graph, src_empty_cc_prio_map); - BT_ASSERT(next_return.notification); break; case SEQ_STREAM1_BEGIN: - next_return.notification = - bt_notification_stream_begin_create(graph, src_stream1); - BT_ASSERT(next_return.notification); + *notif = bt_notification_stream_begin_create(graph, + src_stream1); break; case SEQ_STREAM2_BEGIN: - next_return.notification = - bt_notification_stream_begin_create(graph, src_stream2); - BT_ASSERT(next_return.notification); + *notif = bt_notification_stream_begin_create(graph, + src_stream2); break; case SEQ_STREAM1_END: - next_return.notification = - bt_notification_stream_end_create(graph, src_stream1); - BT_ASSERT(next_return.notification); + *notif = bt_notification_stream_end_create(graph, src_stream1); break; case SEQ_STREAM2_END: - next_return.notification = - bt_notification_stream_end_create(graph, src_stream2); - BT_ASSERT(next_return.notification); + *notif = bt_notification_stream_end_create(graph, src_stream2); break; case SEQ_STREAM1_PACKET1_BEGIN: - next_return.notification = - bt_notification_packet_begin_create(graph, - src_stream1_packet1); - BT_ASSERT(next_return.notification); + *notif = bt_notification_packet_begin_create(graph, + src_stream1_packet1); break; case SEQ_STREAM1_PACKET2_BEGIN: - next_return.notification = - bt_notification_packet_begin_create(graph, - src_stream1_packet2); - BT_ASSERT(next_return.notification); + *notif = bt_notification_packet_begin_create(graph, + src_stream1_packet2); break; case SEQ_STREAM2_PACKET1_BEGIN: - next_return.notification = - bt_notification_packet_begin_create(graph, - src_stream2_packet1); - BT_ASSERT(next_return.notification); + *notif = bt_notification_packet_begin_create(graph, + src_stream2_packet1); break; case SEQ_STREAM2_PACKET2_BEGIN: - next_return.notification = - bt_notification_packet_begin_create(graph, - src_stream2_packet2); - BT_ASSERT(next_return.notification); + *notif = bt_notification_packet_begin_create(graph, + src_stream2_packet2); break; case SEQ_STREAM1_PACKET1_END: - next_return.notification = - bt_notification_packet_end_create(graph, - src_stream1_packet1); - BT_ASSERT(next_return.notification); + *notif = bt_notification_packet_end_create(graph, + src_stream1_packet1); break; case SEQ_STREAM1_PACKET2_END: - next_return.notification = - bt_notification_packet_end_create(graph, - src_stream1_packet2); - BT_ASSERT(next_return.notification); + *notif = bt_notification_packet_end_create(graph, + src_stream1_packet2); break; case SEQ_STREAM2_PACKET1_END: - next_return.notification = - bt_notification_packet_end_create(graph, - src_stream2_packet1); - BT_ASSERT(next_return.notification); + *notif = bt_notification_packet_end_create(graph, + src_stream2_packet1); break; case SEQ_STREAM2_PACKET2_END: - next_return.notification = - bt_notification_packet_end_create(graph, - src_stream2_packet2); - BT_ASSERT(next_return.notification); + *notif = bt_notification_packet_end_create(graph, + src_stream2_packet2); break; case SEQ_EVENT_STREAM1_PACKET1: event_packet = src_stream1_packet1; @@ -512,33 +477,54 @@ struct bt_notification_iterator_next_method_return src_iter_next_seq( } if (event_packet) { - next_return.notification = - bt_notification_event_create(graph, src_event_class, - event_packet, src_empty_cc_prio_map); - BT_ASSERT(next_return.notification); + *notif = bt_notification_event_create(graph, src_event_class, + event_packet, src_empty_cc_prio_map); + } + + BT_ASSERT(*notif); + user_data->at++; +} + +static +enum bt_notification_iterator_status src_iter_next_seq( + struct src_iter_user_data *user_data, + bt_notification_array notifs, uint64_t capacity, + uint64_t *count) +{ + enum bt_notification_iterator_status status = + BT_NOTIFICATION_ITERATOR_STATUS_OK; + uint64_t i = 0; + + BT_ASSERT(user_data->seq); + + if (user_data->seq[user_data->at] == SEQ_END) { + status = BT_NOTIFICATION_ITERATOR_STATUS_END; + goto end; } - if (next_return.status != BT_NOTIFICATION_ITERATOR_STATUS_END) { - user_data->at++; + while (i < capacity && user_data->seq[user_data->at] != SEQ_END) { + src_iter_next_seq_one(user_data, ¬ifs[i]); + i++; } - return next_return; + BT_ASSERT(i > 0 && i <= capacity); + *count = i; + +end: + return status; } static -struct bt_notification_iterator_next_method_return src_iter_next( - struct bt_private_connection_private_notification_iterator *priv_iterator) +enum bt_notification_iterator_status src_iter_next( + struct bt_private_connection_private_notification_iterator *priv_iterator, + bt_notification_array notifs, uint64_t capacity, + uint64_t *count) { - struct bt_notification_iterator_next_method_return next_return = { - .status = BT_NOTIFICATION_ITERATOR_STATUS_OK, - .notification = NULL, - }; struct src_iter_user_data *user_data = bt_private_connection_private_notification_iterator_get_user_data(priv_iterator); BT_ASSERT(user_data); - next_return = src_iter_next_seq(user_data); - return next_return; + return src_iter_next_seq(user_data, notifs, capacity, count); } static @@ -560,40 +546,9 @@ void src_finalize(struct bt_private_component *private_component) } static -enum bt_notification_iterator_status common_consume( - struct bt_notification_iterator *notif_iter, - bool is_output_port_notif_iter) +void append_test_events_from_notification(struct bt_notification *notification) { - enum bt_notification_iterator_status ret; - struct bt_notification *notification = NULL; struct test_event test_event = { 0 }; - bool do_append_test_event = true; - BT_ASSERT(notif_iter); - - if (is_output_port_notif_iter) { - ret = bt_output_port_notification_iterator_next(notif_iter, - ¬ification); - } else { - ret = bt_private_connection_notification_iterator_next( - notif_iter, ¬ification); - } - - if (ret < 0) { - do_append_test_event = false; - goto end; - } - - switch (ret) { - case BT_NOTIFICATION_ITERATOR_STATUS_END: - test_event.type = TEST_EV_TYPE_END; - goto end; - case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN: - abort(); - default: - break; - } - - BT_ASSERT(notification); switch (bt_notification_get_type(notification)) { case BT_NOTIFICATION_TYPE_EVENT: @@ -644,12 +599,54 @@ enum bt_notification_iterator_status common_consume( BT_ASSERT(test_event.stream); } -end: - if (do_append_test_event) { + append_test_event(&test_event); +} + +static +enum bt_notification_iterator_status common_consume( + struct bt_notification_iterator *notif_iter, + bool is_output_port_notif_iter) +{ + enum bt_notification_iterator_status ret; + bt_notification_array notifications = NULL; + uint64_t count = 0; + struct test_event test_event = { 0 }; + uint64_t i; + + BT_ASSERT(notif_iter); + + if (is_output_port_notif_iter) { + ret = bt_output_port_notification_iterator_next(notif_iter, + ¬ifications, &count); + } else { + ret = bt_private_connection_notification_iterator_next( + notif_iter, ¬ifications, &count); + } + + if (ret < 0) { + goto end; + } + + switch (ret) { + case BT_NOTIFICATION_ITERATOR_STATUS_END: + test_event.type = TEST_EV_TYPE_END; append_test_event(&test_event); + goto end; + case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN: + abort(); + default: + break; } - bt_put(notification); + BT_ASSERT(notifications); + BT_ASSERT(count > 0); + + for (i = 0; i < count; i++) { + append_test_events_from_notification(notifications[i]); + bt_put(notifications[i]); + } + +end: return ret; } diff --git a/tests/lib/test_graph_topo.c b/tests/lib/test_graph_topo.c index e69a61a8..efc2cc5f 100644 --- a/tests/lib/test_graph_topo.c +++ b/tests/lib/test_graph_topo.c @@ -285,14 +285,12 @@ size_t event_pos(struct event *event) } static -struct bt_notification_iterator_next_method_return src_iter_next( - struct bt_private_connection_private_notification_iterator *priv_iterator) +enum bt_notification_iterator_status src_iter_next( + struct bt_private_connection_private_notification_iterator *priv_iterator, + bt_notification_array notifs, uint64_t capacity, + uint64_t *count) { - struct bt_notification_iterator_next_method_return ret = { - .status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR, - }; - - return ret; + return BT_NOTIFICATION_ITERATOR_STATUS_ERROR; } static diff --git a/tests/plugins/test-utils-muxer.c b/tests/plugins/test-utils-muxer.c index 5cbe9fee..58a4331f 100644 --- a/tests/plugins/test-utils-muxer.c +++ b/tests/plugins/test-utils-muxer.c @@ -131,22 +131,22 @@ static int64_t seq4[] = { static int64_t seq1_with_again[] = { SEQ_STREAM_BEGIN, SEQ_PACKET_BEGIN, 24, 53, 97, 105, 119, 210, - SEQ_AGAIN, 222, 240, 292, 317, 353, 407, 433, 473, 487, 504, - 572, 615, 708, 766, 850, 852, 931, 951, 956, 996, + SEQ_AGAIN, SEQ_AGAIN, 222, 240, 292, 317, 353, 407, 433, 473, + 487, 504, 572, 615, 708, 766, 850, 852, 931, 951, 956, 996, SEQ_PACKET_END, SEQ_STREAM_END, SEQ_END, }; static int64_t seq2_with_again[] = { SEQ_STREAM_BEGIN, SEQ_PACKET_BEGIN, 51, 59, 68, 77, 91, 121, 139, 170, 179, 266, 352, 454, 478, 631, 644, 668, 714, 744, 750, - 778, 790, 836, SEQ_AGAIN, SEQ_PACKET_END, SEQ_STREAM_END, - SEQ_END, + 778, 790, 836, SEQ_AGAIN, SEQ_AGAIN, SEQ_PACKET_END, + SEQ_STREAM_END, SEQ_END, }; static int64_t seq3_with_again[] = { SEQ_STREAM_BEGIN, SEQ_PACKET_BEGIN, 8, 71, 209, 254, 298, 320, - 350, 393, 419, 624, 651, SEQ_AGAIN, 678, 717, 731, 733, 788, - 819, 820, 857, 892, 903, 944, 998, SEQ_PACKET_END, + 350, 393, 419, 624, 651, SEQ_AGAIN, SEQ_AGAIN, 678, 717, 731, + 733, 788, 819, 820, 857, 892, 903, 944, 998, SEQ_PACKET_END, SEQ_STREAM_END, SEQ_END, }; @@ -534,12 +534,12 @@ struct bt_notification *src_create_event_notif(struct bt_packet *packet, } static -struct bt_notification_iterator_next_method_return src_iter_next_seq( - struct src_iter_user_data *user_data) +enum bt_notification_iterator_status src_iter_next_seq( + struct src_iter_user_data *user_data, + bt_notification_array notifs) { - struct bt_notification_iterator_next_method_return next_return = { - .status = BT_NOTIFICATION_ITERATOR_STATUS_OK, - }; + enum bt_notification_iterator_status status = + BT_NOTIFICATION_ITERATOR_STATUS_OK; int64_t cur_ts_ns; struct bt_stream *stream; @@ -548,63 +548,57 @@ struct bt_notification_iterator_next_method_return src_iter_next_seq( switch (cur_ts_ns) { case SEQ_END: - next_return.status = - BT_NOTIFICATION_ITERATOR_STATUS_END; + status = BT_NOTIFICATION_ITERATOR_STATUS_END; break; case SEQ_AGAIN: - next_return.status = - BT_NOTIFICATION_ITERATOR_STATUS_AGAIN; + status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN; break; case SEQ_PACKET_BEGIN: - next_return.notification = - bt_notification_packet_begin_create(graph, + notifs[0] = bt_notification_packet_begin_create(graph, user_data->packet); - BT_ASSERT(next_return.notification); + BT_ASSERT(notifs[0]); break; case SEQ_PACKET_END: - next_return.notification = - bt_notification_packet_end_create(graph, + notifs[0] = bt_notification_packet_end_create(graph, user_data->packet); - BT_ASSERT(next_return.notification); + BT_ASSERT(notifs[0]); break; case SEQ_STREAM_BEGIN: stream = bt_packet_get_stream(user_data->packet); - next_return.notification = - bt_notification_stream_begin_create(graph, stream); - BT_ASSERT(next_return.notification); + notifs[0] = bt_notification_stream_begin_create(graph, stream); + BT_ASSERT(notifs[0]); bt_put(stream); break; case SEQ_STREAM_END: stream = bt_packet_get_stream(user_data->packet); - next_return.notification = - bt_notification_stream_end_create(graph, stream); - BT_ASSERT(next_return.notification); + notifs[0] = bt_notification_stream_end_create(graph, stream); + BT_ASSERT(notifs[0]); bt_put(stream); break; default: { - next_return.notification = src_create_event_notif( - user_data->packet, src_cc_prio_map, cur_ts_ns); - BT_ASSERT(next_return.notification); + notifs[0] = src_create_event_notif(user_data->packet, + src_cc_prio_map, cur_ts_ns); + BT_ASSERT(notifs[0]); break; } } - if (next_return.status != BT_NOTIFICATION_ITERATOR_STATUS_END) { + if (status != BT_NOTIFICATION_ITERATOR_STATUS_END) { user_data->at++; } - return next_return; + return status; } static -struct bt_notification_iterator_next_method_return src_iter_next( - struct bt_private_connection_private_notification_iterator *priv_iterator) +enum bt_notification_iterator_status src_iter_next( + struct bt_private_connection_private_notification_iterator *priv_iterator, + bt_notification_array notifs, uint64_t capacity, + uint64_t *count) { - struct bt_notification_iterator_next_method_return next_return = { - .notification = NULL, - .status = BT_NOTIFICATION_ITERATOR_STATUS_OK, - }; + enum bt_notification_iterator_status status = + BT_NOTIFICATION_ITERATOR_STATUS_OK; struct src_iter_user_data *user_data = bt_private_connection_private_notification_iterator_get_user_data(priv_iterator); struct bt_private_component *private_component = @@ -615,52 +609,59 @@ struct bt_notification_iterator_next_method_return src_iter_next( BT_ASSERT(user_data); BT_ASSERT(private_component); + /* + * We can always set it to 1: it's not going to be considered + * anyway if the status is not + * BT_NOTIFICATION_ITERATOR_STATUS_OK. + */ + *count = 1; + switch (current_test) { case TEST_NO_TS: if (user_data->iter_index == 0) { if (user_data->at == 0) { stream = bt_packet_get_stream(user_data->packet); - next_return.notification = + notifs[0] = bt_notification_stream_begin_create( graph, stream); bt_put(stream); - BT_ASSERT(next_return.notification); + BT_ASSERT(notifs[0]); } else if (user_data->at == 1) { - next_return.notification = + notifs[0] = bt_notification_packet_begin_create( graph, user_data->packet); - BT_ASSERT(next_return.notification); + BT_ASSERT(notifs[0]); } else if (user_data->at < 7) { - next_return.notification = + notifs[0] = src_create_event_notif( user_data->packet, src_empty_cc_prio_map, 0); - BT_ASSERT(next_return.notification); + BT_ASSERT(notifs[0]); } else if (user_data->at == 7) { - next_return.notification = + notifs[0] = bt_notification_packet_end_create( graph, user_data->packet); - BT_ASSERT(next_return.notification); + BT_ASSERT(notifs[0]); } else if (user_data->at == 8) { stream = bt_packet_get_stream(user_data->packet); - next_return.notification = + notifs[0] = bt_notification_stream_end_create( graph, stream); bt_put(stream); - BT_ASSERT(next_return.notification); + BT_ASSERT(notifs[0]); } else { - next_return.status = + status = BT_NOTIFICATION_ITERATOR_STATUS_END; } user_data->at++; } else { - next_return = src_iter_next_seq(user_data); + status = src_iter_next_seq(user_data, notifs); } break; case TEST_SIMPLE_4_PORTS: case TEST_4_PORTS_WITH_RETRIES: - next_return = src_iter_next_seq(user_data); + status = src_iter_next_seq(user_data, notifs); break; case TEST_SINGLE_END_THEN_MULTIPLE_FULL: if (user_data->iter_index == 0) { @@ -670,15 +671,15 @@ struct bt_notification_iterator_next_method_return src_iter_next( ret = bt_private_component_source_add_output_private_port( private_component, "out2", NULL, NULL); BT_ASSERT(ret == 0); - next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END; + status = BT_NOTIFICATION_ITERATOR_STATUS_END; } else { - next_return = src_iter_next_seq(user_data); + status = src_iter_next_seq(user_data, notifs); } break; case TEST_SINGLE_AGAIN_END_THEN_MULTIPLE_FULL: if (user_data->iter_index == 0) { if (user_data->at == 0) { - next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN; + status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN; user_data->at++; } else { ret = bt_private_component_source_add_output_private_port( @@ -687,10 +688,10 @@ struct bt_notification_iterator_next_method_return src_iter_next( ret = bt_private_component_source_add_output_private_port( private_component, "out2", NULL, NULL); BT_ASSERT(ret == 0); - next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END; + status = BT_NOTIFICATION_ITERATOR_STATUS_END; } } else { - next_return = src_iter_next_seq(user_data); + status = src_iter_next_seq(user_data, notifs); } break; default: @@ -698,7 +699,7 @@ struct bt_notification_iterator_next_method_return src_iter_next( } bt_put(private_component); - return next_return; + return status; } static @@ -755,42 +756,10 @@ void src_finalize(struct bt_private_component *private_component) } static -enum bt_component_status sink_consume( - struct bt_private_component *priv_component) +void append_test_event_from_notification(struct bt_notification *notification) { - enum bt_component_status ret = BT_COMPONENT_STATUS_OK; - struct bt_notification *notification = NULL; - struct sink_user_data *user_data = - bt_private_component_get_user_data(priv_component); - enum bt_notification_iterator_status it_ret; + int ret; struct test_event test_event; - bool do_append_test_event = true; - - BT_ASSERT(user_data && user_data->notif_iter); - it_ret = bt_private_connection_notification_iterator_next( - user_data->notif_iter, ¬ification); - - if (it_ret < 0) { - ret = BT_COMPONENT_STATUS_ERROR; - do_append_test_event = false; - goto end; - } - - switch (it_ret) { - case BT_NOTIFICATION_ITERATOR_STATUS_END: - test_event.type = TEST_EV_TYPE_END; - ret = BT_COMPONENT_STATUS_END; - BT_PUT(user_data->notif_iter); - goto end; - case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN: - test_event.type = TEST_EV_TYPE_AGAIN; - ret = BT_COMPONENT_STATUS_AGAIN; - goto end; - default: - break; - } - - BT_ASSERT(notification); switch (bt_notification_get_type(notification)) { case BT_NOTIFICATION_TYPE_EVENT: @@ -871,12 +840,60 @@ enum bt_component_status sink_consume( break; } + append_test_event(&test_event); +} + +static +enum bt_component_status sink_consume( + struct bt_private_component *priv_component) +{ + enum bt_component_status ret = BT_COMPONENT_STATUS_OK; + bt_notification_array notifications = NULL; + uint64_t count; + struct sink_user_data *user_data = + bt_private_component_get_user_data(priv_component); + enum bt_notification_iterator_status it_ret; + struct test_event test_event; + bool do_append_test_event = true; + uint64_t i; + + BT_ASSERT(user_data && user_data->notif_iter); + it_ret = bt_private_connection_notification_iterator_next( + user_data->notif_iter, ¬ifications, &count); + if (it_ret < 0) { + ret = BT_COMPONENT_STATUS_ERROR; + do_append_test_event = false; + goto end; + } + + switch (it_ret) { + case BT_NOTIFICATION_ITERATOR_STATUS_END: + test_event.type = TEST_EV_TYPE_END; + ret = BT_COMPONENT_STATUS_END; + BT_PUT(user_data->notif_iter); + goto end; + case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN: + test_event.type = TEST_EV_TYPE_AGAIN; + ret = BT_COMPONENT_STATUS_AGAIN; + goto end; + default: + break; + } + + BT_ASSERT(notifications); + + for (i = 0; i < count; i++) { + append_test_event_from_notification(notifications[i]); + bt_put(notifications[i]); + } + + do_append_test_event = false; + end: if (do_append_test_event) { append_test_event(&test_event); } - bt_put(notification); return ret; } -- 2.34.1