lib: notification iterator: transfer a batch of notifications
authorPhilippe Proulx <eeppeliteloop@gmail.com>
Fri, 22 Jun 2018 20:54:27 +0000 (16:54 -0400)
committerPhilippe Proulx <eeppeliteloop@gmail.com>
Thu, 2 May 2019 04:05:45 +0000 (00:05 -0400)
This patch makes each notification iterator's "next" operation return a
batch of notifications intead of a single one. This has a positive
effect on performance as we're decreasing the number of executed
instructions because there are fewer function calls per notification.

The batch size is arbitrarily set to 15 here as a value between 10 and
50 experimentally showed to be beneficial, and there's usually more than
one active stream on a typical graph configuration. As the batch size
increases, the performance eventually degrades, as I suspect that
(pooled) notification objects are constantly getting into and out of CPU
cache. The goal of hiding the batch size in the library's implementation
is to eventually determine a value at run time depending on the
situation, and even change it when needed between consuming sinks.

Each notification iterator contains a fixed-sized array of notification
pointers. This array is allocated when the iterator is created and freed
when it is destroyed. For convenience, I'm adding this typedef:

    typedef struct bt_notification **bt_notification_array;

Consumer side
=============
The consuming side API is now:

    enum bt_notification_iterator_status
    bt_private_connection_notification_iterator_next(
        struct bt_notification_iterator *iterator,
        bt_notification_array *notifs, uint64_t *count);

    enum bt_notification_iterator_status
    bt_output_port_notification_iterator_next(
        struct bt_notification_iterator *iterator,
        bt_notification_array *notifs, uint64_t *count);

In English: the function sets `notifs` to the address of a notification
array containing `*count` notifications when the returned status is
`BT_NOTIFICATION_ITERATOR_STATUS_OK`. In this case, the caller is
responsible for putting each notification in the array (the array's
content is owned by the caller, but not the array itself), as the
functions above do not put what's already in the iterator's array before
filling it. If the status is not `BT_NOTIFICATION_ITERATOR_STATUS_OK`,
then the function does not set `*notifs` and `*count`.

Because the `utils.flt.muxer` component class kept each upstream
notification iterator's current notification, it now keeps a queue of
notifications, and the upstream iterator is now considered invalid if
its notification queue is empty.

If less than `*count` notifications are required, the caller can either:

* Discard (put) unneeded notifications: they are lost forever.

* Keep a queue (or other data structure) of notifications by keeping the
  references. The caller must NOT keep the array itself in this case,
  but each contained notification object, copying the pointer values to
  its territory.

Producer side
=============
The "next" method signature is now:

    enum bt_notification_iterator_status next_method(
        struct bt_private_connection_private_notification_iterator *notif_iter,
        bt_notification_array notifs, uint64_t capacity,
        uint64_t *count);

The callback must either:

* Fill `notifs` with up to `capacity` notifications (notification's
  ownership is transfered to the array), set `*count` to how many
  notifications were transfered (at least one), and return
  `BT_NOTIFICATION_ITERATOR_STATUS_OK`.

  `capacity` is always greater than zero.

* Return something else than `BT_NOTIFICATION_ITERATOR_STATUS_OK`,
  not touching `notifs` and `count`.

IMPORTANT: The callback must NOT transfer one or more notifications to
`notifs` and return something else than
`BT_NOTIFICATION_ITERATOR_STATUS_OK`.

Signed-off-by: Philippe Proulx <eeppeliteloop@gmail.com>
27 files changed:
include/babeltrace/graph/component-class-sink-colander-internal.h
include/babeltrace/graph/component-class.h
include/babeltrace/graph/notification-iterator-internal.h
include/babeltrace/graph/notification.h
include/babeltrace/graph/output-port-notification-iterator.h
include/babeltrace/graph/private-connection-notification-iterator.h
lib/graph/component-class-sink-colander.c
lib/graph/iterator.c
plugins/Makefile.am
plugins/ctf/common/notif-iter/notif-iter.c
plugins/ctf/fs-src/data-stream-file.c
plugins/ctf/fs-src/data-stream-file.h
plugins/ctf/fs-src/fs.c
plugins/ctf/fs-src/fs.h
plugins/text/dmesg/dmesg.c
plugins/text/dmesg/dmesg.h
plugins/text/pretty/pretty.c
plugins/utils/Makefile.am
plugins/utils/counter/counter.c
plugins/utils/counter/counter.h
plugins/utils/dummy/dummy.c
plugins/utils/muxer/muxer.c
plugins/utils/muxer/muxer.h
tests/lib/test-plugin-plugins/sfs.c
tests/lib/test_bt_notification_iterator.c
tests/lib/test_graph_topo.c
tests/plugins/test-utils-muxer.c

index d58a8ddf8344ce8d231034a97be1931f2269f5e5..d1069543c176bfdb0d0e42bf73090f27c6c8e787 100644 (file)
@@ -23,6 +23,7 @@
  * SOFTWARE.
  */
 
+#include <stdint.h>
 #include <babeltrace/types.h>
 
 #ifdef __cplusplus
@@ -33,7 +34,8 @@ struct bt_component_class;
 struct bt_notification;
 
 struct bt_component_class_sink_colander_data {
-       struct bt_notification **notification;
+       bt_notification_array notifs;
+       uint64_t *count_addr;
 };
 
 extern struct bt_component_class *bt_component_class_sink_colander_get(void);
index acad8eec14389191a247a275fb1d1d020615707c..f7d243649f02a690f9f544048f88c792c0ae1d37 100644 (file)
@@ -39,6 +39,9 @@
 /* For bt_bool */
 #include <babeltrace/types.h>
 
+/* For bt_notification_array */
+#include <babeltrace/graph/notification.h>
+
 #ifdef __cplusplus
 extern "C" {
 #endif
@@ -68,11 +71,6 @@ enum bt_component_class_type {
        BT_COMPONENT_CLASS_TYPE_FILTER =        2,
 };
 
-struct bt_notification_iterator_next_method_return {
-       struct bt_notification *notification;
-       enum bt_notification_iterator_status status;
-};
-
 struct bt_component_class_query_method_return {
        struct bt_value *result;
        enum bt_query_status status;
@@ -93,9 +91,11 @@ typedef enum bt_notification_iterator_status
 typedef void (*bt_component_class_notification_iterator_finalize_method)(
                struct bt_private_connection_private_notification_iterator *notification_iterator);
 
-typedef struct bt_notification_iterator_next_method_return
+typedef enum bt_notification_iterator_status
 (*bt_component_class_notification_iterator_next_method)(
-               struct bt_private_connection_private_notification_iterator *notification_iterator);
+               struct bt_private_connection_private_notification_iterator *notification_iterator,
+               bt_notification_array notifs, uint64_t capacity,
+               uint64_t *count);
 
 typedef struct bt_component_class_query_method_return (*bt_component_class_query_method)(
                struct bt_component_class *component_class,
index dd80a5192b595f1199415df706ce6c057f87bde2..2cfad159836e00dab62b653c9038c9895127078d 100644 (file)
@@ -86,6 +86,7 @@ enum bt_private_connection_notification_iterator_state {
 struct bt_notification_iterator {
        struct bt_object base;
        enum bt_notification_iterator_type type;
+       GPtrArray *notifs;
 };
 
 struct bt_notification_iterator_private_connection {
@@ -124,8 +125,11 @@ struct bt_notification_iterator_output_port {
        struct bt_graph *graph; /* Owned by this */
        struct bt_component *colander; /* Owned by this */
 
-       /* Only used temporarily: should always be NULL */
-       struct bt_notification *notif;
+       /*
+        * Only used temporarily as a bridge between a colander sink and
+        * the user.
+        */
+       uint64_t count;
 };
 
 static inline
index ce4c374f3ad05d2be77f63765aa0cf0b1a95c52a..1f8b7a3150eaf7ae42e8d33fa0a9efc6febe6cfe 100644 (file)
@@ -33,6 +33,8 @@ extern "C" {
 
 struct bt_notification;
 
+typedef struct bt_notification **bt_notification_array;
+
 /**
  * Notification types. Unhandled notification types should be ignored.
  */
index d5c897d385caa61fbb453af4d10d345c728e281a..ab6c7e4f3dc7ab949b10b4ccf1721483563baeb1 100644 (file)
@@ -38,8 +38,9 @@ extern struct bt_notification_iterator *bt_output_port_notification_iterator_cre
                struct bt_port *port, const char *colander_component_name);
 
 extern enum bt_notification_iterator_status
-bt_output_port_notification_iterator_next(struct bt_notification_iterator *iterator,
-               struct bt_notification **notification);
+bt_output_port_notification_iterator_next(
+               struct bt_notification_iterator *iterator,
+               bt_notification_array *notifs, uint64_t *count);
 
 #ifdef __cplusplus
 }
index b1a3dc2d20f2448c48c5aced366064381cf6b9e9..46e6d137308fe78feb89b2f475f6ce3a41184db1 100644 (file)
@@ -40,7 +40,7 @@ extern struct bt_component *bt_private_connection_notification_iterator_get_comp
 extern enum bt_notification_iterator_status
 bt_private_connection_notification_iterator_next(
                struct bt_notification_iterator *iterator,
-               struct bt_notification **notification);
+               bt_notification_array *notifs, uint64_t *count);
 
 #ifdef __cplusplus
 }
index e9e23d54ac065a069fa5dc917f48e12b7aa98f67..c6c128791bb14d27617e7d25f2cfa0ac192c69f4 100644 (file)
@@ -39,8 +39,8 @@ static
 struct bt_component_class *colander_comp_cls;
 
 struct colander_data {
-       struct bt_notification **user_notif;
-       enum bt_notification_type *notif_types;
+       bt_notification_array notifs;
+       uint64_t *count_addr;
        struct bt_notification_iterator *notif_iter;
 };
 
@@ -67,7 +67,8 @@ enum bt_component_status colander_init(
                goto end;
        }
 
-       colander_data->user_notif = user_provided_data->notification;
+       colander_data->notifs = user_provided_data->notifs;
+       colander_data->count_addr = user_provided_data->count_addr;
        status = bt_private_component_sink_add_input_private_port(
                priv_comp, "in", NULL, NULL);
        if (status != BT_COMPONENT_STATUS_OK) {
@@ -95,7 +96,6 @@ void colander_finalize(struct bt_private_component *priv_comp)
                bt_put(colander_data->notif_iter);
        }
 
-       g_free(colander_data->notif_types);
        g_free(colander_data);
 }
 
@@ -131,9 +131,9 @@ enum bt_component_status colander_consume(
 {
        enum bt_component_status status = BT_COMPONENT_STATUS_OK;
        enum bt_notification_iterator_status notif_iter_status;
-       struct bt_notification *notif = NULL;
        struct colander_data *colander_data =
                bt_private_component_get_user_data(priv_comp);
+       bt_notification_array notifs;
 
        BT_ASSERT(colander_data);
 
@@ -144,7 +144,7 @@ enum bt_component_status colander_consume(
        }
 
        notif_iter_status = bt_private_connection_notification_iterator_next(
-               colander_data->notif_iter, &notif);
+               colander_data->notif_iter, &notifs, colander_data->count_addr);
        switch (notif_iter_status) {
        case BT_NOTIFICATION_ITERATOR_STATUS_CANCELED:
                status = BT_COMPONENT_STATUS_OK;
@@ -156,17 +156,17 @@ enum bt_component_status colander_consume(
                status = BT_COMPONENT_STATUS_END;
                goto end;
        case BT_NOTIFICATION_ITERATOR_STATUS_OK:
+               /* Move notifications to user (count already set) */
+               memcpy(colander_data->notifs, notifs,
+                       sizeof(*notifs) * *colander_data->count_addr);
                break;
        default:
                status = BT_COMPONENT_STATUS_ERROR;
                goto end;
        }
 
-       BT_ASSERT(notif);
-
 end:
        /* Move notification to user's pointer, even if NULL. */
-       *colander_data->user_notif = notif;
        return status;
 }
 
index 5135bceedc66b3bb7357518cc2afbba0b8333403..0deb40c7721467bd3f40b3b5ca5d93cd1bbc2306 100644 (file)
 #include <inttypes.h>
 #include <stdlib.h>
 
+/*
+ * TODO: Use graph's state (number of active iterators, etc.) and
+ * possibly system specifications to make a better guess than this.
+ */
+#define NOTIF_BATCH_SIZE       15
+
 struct discarded_elements_state {
        struct bt_clock_value *cur_begin;
        uint64_t cur_count;
@@ -130,8 +136,15 @@ end:
 static
 void destroy_base_notification_iterator(struct bt_object *obj)
 {
-       BT_ASSERT(obj);
-       g_free(obj);
+       struct bt_notification_iterator *iterator = (void *) obj;
+
+       BT_ASSERT(iterator);
+
+       if (iterator->notifs) {
+               g_ptr_array_free(iterator->notifs, TRUE);
+       }
+
+       g_free(iterator);
 }
 
 static
@@ -267,12 +280,25 @@ void bt_private_connection_notification_iterator_set_connection(
 }
 
 static
-void init_notification_iterator(struct bt_notification_iterator *iterator,
+int init_notification_iterator(struct bt_notification_iterator *iterator,
                enum bt_notification_iterator_type type,
                bt_object_release_func destroy)
 {
+       int ret = 0;
+
        bt_object_init_shared(&iterator->base, destroy);
        iterator->type = type;
+       iterator->notifs = g_ptr_array_new();
+       if (!iterator->notifs) {
+               BT_LOGE_STR("Failed to allocate a GPtrArray.");
+               ret = -1;
+               goto end;
+       }
+
+       g_ptr_array_set_size(iterator->notifs, NOTIF_BATCH_SIZE);
+
+end:
+       return ret;
 }
 
 BT_HIDDEN
@@ -285,6 +311,7 @@ enum bt_connection_status bt_private_connection_notification_iterator_create(
        enum bt_connection_status status = BT_CONNECTION_STATUS_OK;
        enum bt_component_class_type type;
        struct bt_notification_iterator_private_connection *iterator = NULL;
+       int ret;
 
        BT_ASSERT(upstream_comp);
        BT_ASSERT(upstream_port);
@@ -307,9 +334,14 @@ enum bt_connection_status bt_private_connection_notification_iterator_create(
                goto end;
        }
 
-       init_notification_iterator((void *) iterator,
+       ret = init_notification_iterator((void *) iterator,
                BT_NOTIFICATION_ITERATOR_TYPE_PRIVATE_CONNECTION,
                bt_private_connection_notification_iterator_destroy);
+       if (ret) {
+               /* init_notification_iterator() logs errors */
+               status = BT_CONNECTION_STATUS_NOMEM;
+               goto end;
+       }
 
        iterator->stream_states = g_hash_table_new_full(g_direct_hash,
                g_direct_equal, NULL, (GDestroyNotify) destroy_stream_state);
@@ -571,6 +603,26 @@ end:
        return is_valid;
 }
 
+BT_ASSERT_PRE_FUNC
+static inline
+bool validate_notifications(
+               struct bt_notification_iterator_private_connection *iterator,
+               uint64_t count)
+{
+       bool ret = true;
+       bt_notification_array notifs = (void *) iterator->base.notifs->pdata;
+       uint64_t i;
+
+       for (i = 0; i < count; i++) {
+               ret = validate_notification(iterator, notifs[i]);
+               if (!ret) {
+                       break;
+               }
+       }
+
+       return ret;
+}
+
 BT_ASSERT_PRE_FUNC
 static inline bool priv_conn_notif_iter_can_end(
                struct bt_notification_iterator_private_connection *iterator)
@@ -609,22 +661,19 @@ end:
 enum bt_notification_iterator_status
 bt_private_connection_notification_iterator_next(
                struct bt_notification_iterator *user_iterator,
-               struct bt_notification **user_notif)
+               struct bt_notification ***user_notifs, uint64_t *user_count)
 {
        struct bt_notification_iterator_private_connection *iterator =
                (void *) user_iterator;
        struct bt_private_connection_private_notification_iterator *priv_iterator =
                bt_private_connection_private_notification_iterator_from_notification_iterator(iterator);
        bt_component_class_notification_iterator_next_method next_method = NULL;
-       struct bt_notification_iterator_next_method_return next_return = {
-               .status = BT_NOTIFICATION_ITERATOR_STATUS_OK,
-               .notification = NULL,
-       };
        enum bt_notification_iterator_status status =
                BT_NOTIFICATION_ITERATOR_STATUS_OK;
 
        BT_ASSERT_PRE_NON_NULL(user_iterator, "Notification iterator");
-       BT_ASSERT_PRE_NON_NULL(user_notif, "Notification");
+       BT_ASSERT_PRE_NON_NULL(user_notifs, "Notification array");
+       BT_ASSERT_PRE_NON_NULL(user_count, "Notification count");
        BT_ASSERT_PRE(user_iterator->type ==
                BT_NOTIFICATION_ITERATOR_TYPE_PRIVATE_CONNECTION,
                "Notification iterator was not created from a private connection: "
@@ -670,12 +719,13 @@ bt_private_connection_notification_iterator_next(
         */
        BT_ASSERT(next_method);
        BT_LOGD_STR("Calling user's \"next\" method.");
-       next_return = next_method(priv_iterator);
+       status = next_method(priv_iterator,
+               (void *) user_iterator->notifs->pdata,
+               NOTIF_BATCH_SIZE, user_count);
        BT_LOGD("User method returned: status=%s",
-               bt_notification_iterator_status_string(next_return.status));
-       if (next_return.status < 0) {
+               bt_notification_iterator_status_string(status));
+       if (status < 0) {
                BT_LOGW_STR("User method failed.");
-               status = next_return.status;
                goto end;
        }
 
@@ -694,16 +744,31 @@ bt_private_connection_notification_iterator_next(
                 * BT_NOTIFICATION_ITERATOR_STATUS_OK because
                 * otherwise this field could be garbage.
                 */
-               if (next_return.status ==
-                               BT_NOTIFICATION_ITERATOR_STATUS_OK) {
-                       bt_put(next_return.notification);
+               if (status == BT_NOTIFICATION_ITERATOR_STATUS_OK) {
+                       uint64_t i;
+                       bt_notification_array notifs =
+                               (void *) user_iterator->notifs->pdata;
+
+                       for (i = 0; i < *user_count; i++) {
+                               bt_put(notifs[i]);
+                       }
                }
 
                status = BT_NOTIFICATION_ITERATOR_STATUS_CANCELED;
                goto end;
        }
 
-       switch (next_return.status) {
+       switch (status) {
+       case BT_NOTIFICATION_ITERATOR_STATUS_OK:
+               BT_ASSERT_PRE(validate_notifications(iterator, *user_count),
+                       "Notifications are invalid at this point: "
+                       "%![notif-iter-]+i, count=%" PRIu64,
+                       iterator, *user_count);
+               *user_notifs = (void *) user_iterator->notifs->pdata;
+               break;
+       case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
+               status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
+               goto end;
        case BT_NOTIFICATION_ITERATOR_STATUS_END:
                BT_ASSERT_PRE(priv_conn_notif_iter_can_end(iterator),
                        "Notification iterator cannot end at this point: "
@@ -715,20 +780,6 @@ bt_private_connection_notification_iterator_next(
                BT_LOGD("Set new status: status=%s",
                        bt_notification_iterator_status_string(status));
                goto end;
-       case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
-               status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
-               goto end;
-       case BT_NOTIFICATION_ITERATOR_STATUS_OK:
-               BT_ASSERT_PRE(next_return.notification,
-                       "User method returned BT_NOTIFICATION_ITERATOR_STATUS_OK, but notification is NULL: "
-                       "%!+i", iterator);
-               BT_ASSERT_PRE(validate_notification(iterator,
-                       next_return.notification),
-                       "Notification is invalid at this point: "
-                       "%![notif-iter-]+i, %![notif-]+n",
-                       iterator, next_return.notification);
-               *user_notif = next_return.notification;
-               break;
        default:
                /* Unknown non-error status */
                abort();
@@ -741,7 +792,8 @@ end:
 enum bt_notification_iterator_status
 bt_output_port_notification_iterator_next(
                struct bt_notification_iterator *iterator,
-               struct bt_notification **user_notif)
+               bt_notification_array *notifs_to_user,
+               uint64_t *count_to_user)
 {
        enum bt_notification_iterator_status status;
        struct bt_notification_iterator_output_port *out_port_iter =
@@ -749,37 +801,40 @@ bt_output_port_notification_iterator_next(
        enum bt_graph_status graph_status;
 
        BT_ASSERT_PRE_NON_NULL(iterator, "Notification iterator");
-       BT_ASSERT_PRE_NON_NULL(user_notif, "Notification");
+       BT_ASSERT_PRE_NON_NULL(notifs_to_user, "Notification array");
+       BT_ASSERT_PRE_NON_NULL(count_to_user, "Notification count");
        BT_ASSERT_PRE(iterator->type ==
                BT_NOTIFICATION_ITERATOR_TYPE_OUTPUT_PORT,
                "Notification iterator was not created from an output port: "
                "%!+i", iterator);
        BT_LIB_LOGD("Getting next output port notification iterator's notification: %!+i",
                iterator);
+
        graph_status = bt_graph_consume_sink_no_check(
                out_port_iter->graph, out_port_iter->colander);
        switch (graph_status) {
        case BT_GRAPH_STATUS_CANCELED:
-               BT_ASSERT(!out_port_iter->notif);
                status = BT_NOTIFICATION_ITERATOR_STATUS_CANCELED;
                break;
        case BT_GRAPH_STATUS_AGAIN:
-               BT_ASSERT(!out_port_iter->notif);
                status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
                break;
        case BT_GRAPH_STATUS_END:
-               BT_ASSERT(!out_port_iter->notif);
                status = BT_NOTIFICATION_ITERATOR_STATUS_END;
                break;
        case BT_GRAPH_STATUS_NOMEM:
-               BT_ASSERT(!out_port_iter->notif);
                status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
                break;
        case BT_GRAPH_STATUS_OK:
-               BT_ASSERT(out_port_iter->notif);
                status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
-               *user_notif = out_port_iter->notif;
-               out_port_iter->notif = NULL;
+
+               /*
+                * On success, the colander sink moves the notifications
+                * to this iterator's array and sets this iterator's
+                * notification count: move them to the user.
+                */
+               *notifs_to_user = (void *) iterator->notifs->pdata;
+               *count_to_user = out_port_iter->count;
                break;
        default:
                /* Other errors */
@@ -820,7 +875,6 @@ void bt_output_port_notification_iterator_destroy(struct bt_object *obj)
 
        BT_LOGD("Destroying output port notification iterator object: addr=%p",
                iterator);
-       BT_ASSERT(!iterator->notif);
        BT_LOGD_STR("Putting graph.");
        bt_put(iterator->graph);
        BT_LOGD_STR("Putting colander sink component.");
@@ -841,6 +895,7 @@ struct bt_notification_iterator *bt_output_port_notification_iterator_create(
        const char *colander_comp_name;
        struct bt_port *colander_in_port = NULL;
        struct bt_component_class_sink_colander_data colander_data;
+       int ret;
 
        BT_ASSERT_PRE_NON_NULL(output_port, "Output port");
        BT_ASSERT_PRE(bt_port_get_type(output_port) == BT_PORT_TYPE_OUTPUT,
@@ -862,9 +917,14 @@ struct bt_notification_iterator *bt_output_port_notification_iterator_create(
                goto error;
        }
 
-       init_notification_iterator((void *) iterator,
+       ret = init_notification_iterator((void *) iterator,
                BT_NOTIFICATION_ITERATOR_TYPE_OUTPUT_PORT,
                bt_output_port_notification_iterator_destroy);
+       if (ret) {
+               /* init_notification_iterator() logs errors */
+               BT_PUT(iterator);
+               goto end;
+       }
 
        /* Create colander component */
        colander_comp_cls = bt_component_class_sink_colander_get();
@@ -876,7 +936,9 @@ struct bt_notification_iterator *bt_output_port_notification_iterator_create(
        BT_MOVE(iterator->graph, graph);
        colander_comp_name =
                colander_component_name ? colander_component_name : "colander";
-       colander_data.notification = &iterator->notif;
+       colander_data.notifs = (void *) iterator->base.notifs->pdata;
+       colander_data.count_addr = &iterator->count;
+
        graph_status = bt_graph_add_component_with_init_method_data(
                iterator->graph, colander_comp_cls, colander_comp_name,
                NULL, &colander_data, &iterator->colander);
index 7b4ca384e00fbee90e3ff05ab4d4180255d71ee6..4622c0729c88c94a23c9748956126468f897010e 100644 (file)
@@ -1,4 +1,4 @@
-SUBDIRS = text utils ctf
+SUBDIRS = utils ctf text
 # libctfcopytrace
 
 if ENABLE_DEBUG_INFO
index 4ab01c4ea1b501a99944498511dc4edacba8f6b0..fc9bccc2c1eef88d9e7c93feb9df32c5d590d3ef 100644 (file)
@@ -3138,7 +3138,8 @@ enum bt_notif_iter_status bt_notif_iter_get_next_notification(
                                goto end;
                        }
 
-                       BT_MOVE(*notification, notit->event_notif);
+                       *notification = notit->event_notif;
+                       notit->event_notif = NULL;
                        goto end;
                case STATE_EMIT_NOTIF_END_OF_PACKET:
                        /* Update clock with timestamp_end field. */
index 76f5688a412a7670942477adb19b8a8122d51c86..2fbc3c0c0aba614ba9961b258d2072f9dddf02ea 100644 (file)
@@ -818,25 +818,23 @@ void ctf_fs_ds_file_destroy(struct ctf_fs_ds_file *ds_file)
 }
 
 BT_HIDDEN
-struct bt_notification_iterator_next_method_return ctf_fs_ds_file_next(
-               struct ctf_fs_ds_file *ds_file)
+enum bt_notification_iterator_status ctf_fs_ds_file_next(
+               struct ctf_fs_ds_file *ds_file,
+               struct bt_notification **notif)
 {
        enum bt_notif_iter_status notif_iter_status;
-       struct bt_notification_iterator_next_method_return ret = {
-               .status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR,
-               .notification = NULL,
-       };
+       enum bt_notification_iterator_status status;
 
        notif_iter_status = bt_notif_iter_get_next_notification(
                ds_file->notif_iter, ds_file->cc_prio_map, ds_file->graph,
-               &ret.notification);
+               notif);
 
        switch (notif_iter_status) {
        case BT_NOTIF_ITER_STATUS_EOF:
-               ret.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
+               status = BT_NOTIFICATION_ITERATOR_STATUS_END;
                break;
        case BT_NOTIF_ITER_STATUS_OK:
-               ret.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
+               status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
                break;
        case BT_NOTIF_ITER_STATUS_AGAIN:
                /*
@@ -848,11 +846,11 @@ struct bt_notification_iterator_next_method_return ctf_fs_ds_file_next(
        case BT_NOTIF_ITER_STATUS_INVAL:
        case BT_NOTIF_ITER_STATUS_ERROR:
        default:
-               ret.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+               status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
                break;
        }
 
-       return ret;
+       return status;
 }
 
 BT_HIDDEN
index e7d8b04acb5ac8f0ad8ed09735b9ff32577a113d..be035606e293870757bda19f6dc586b4364a9f89 100644 (file)
@@ -140,8 +140,9 @@ BT_HIDDEN
 void ctf_fs_ds_file_destroy(struct ctf_fs_ds_file *stream);
 
 BT_HIDDEN
-struct bt_notification_iterator_next_method_return ctf_fs_ds_file_next(
-               struct ctf_fs_ds_file *stream);
+enum bt_notification_iterator_status ctf_fs_ds_file_next(
+               struct ctf_fs_ds_file *ds_file,
+               struct bt_notification **notif);
 
 BT_HIDDEN
 struct ctf_fs_ds_index *ctf_fs_ds_file_build_index(
index 04421d3147a8f3b5f70ea5e9b555181dab9a1c8c..eac20e420d2c2221106065eb4eed7d7c47b5d502 100644 (file)
@@ -87,19 +87,19 @@ void ctf_fs_notif_iter_data_destroy(
        g_free(notif_iter_data);
 }
 
-struct bt_notification_iterator_next_method_return ctf_fs_iterator_next(
-               struct bt_private_connection_private_notification_iterator *iterator)
+static
+enum bt_notification_iterator_status ctf_fs_iterator_next_one(
+               struct ctf_fs_notif_iter_data *notif_iter_data,
+               struct bt_notification **notif)
 {
-       struct bt_notification_iterator_next_method_return next_ret;
-       struct ctf_fs_notif_iter_data *notif_iter_data =
-               bt_private_connection_private_notification_iterator_get_user_data(iterator);
+       enum bt_notification_iterator_status status;
        int ret;
 
        BT_ASSERT(notif_iter_data->ds_file);
-       next_ret = ctf_fs_ds_file_next(notif_iter_data->ds_file);
+       status = ctf_fs_ds_file_next(notif_iter_data->ds_file, notif);
 
-       if (next_ret.status == BT_NOTIFICATION_ITERATOR_STATUS_OK &&
-                       bt_notification_get_type(next_ret.notification) ==
+       if (status == BT_NOTIFICATION_ITERATOR_STATUS_OK &&
+                       bt_notification_get_type(*notif) ==
                        BT_NOTIFICATION_TYPE_STREAM_BEGIN) {
                if (notif_iter_data->skip_stream_begin_notifs) {
                        /*
@@ -107,9 +107,10 @@ struct bt_notification_iterator_next_method_return ctf_fs_iterator_next(
                         * BT_NOTIFICATION_TYPE_STREAM_BEGIN
                         * notification: skip this one, get a new one.
                         */
-                       BT_PUT(next_ret.notification);
-                       next_ret = ctf_fs_ds_file_next(notif_iter_data->ds_file);
-                       BT_ASSERT(next_ret.status != BT_NOTIFICATION_ITERATOR_STATUS_END);
+                       BT_PUT(*notif);
+                       status = ctf_fs_ds_file_next(notif_iter_data->ds_file,
+                               notif);
+                       BT_ASSERT(status != BT_NOTIFICATION_ITERATOR_STATUS_END);
                        goto end;
                } else {
                        /*
@@ -121,8 +122,8 @@ struct bt_notification_iterator_next_method_return ctf_fs_iterator_next(
                }
        }
 
-       if (next_ret.status == BT_NOTIFICATION_ITERATOR_STATUS_OK &&
-                       bt_notification_get_type(next_ret.notification) ==
+       if (status == BT_NOTIFICATION_ITERATOR_STATUS_OK &&
+                       bt_notification_get_type(*notif) ==
                        BT_NOTIFICATION_TYPE_STREAM_END) {
                notif_iter_data->ds_file_info_index++;
 
@@ -140,7 +141,7 @@ struct bt_notification_iterator_next_method_return ctf_fs_iterator_next(
                        goto end;
                }
 
-               BT_PUT(next_ret.notification);
+               BT_PUT(*notif);
                bt_notif_iter_reset(notif_iter_data->notif_iter);
 
                /*
@@ -149,11 +150,11 @@ struct bt_notification_iterator_next_method_return ctf_fs_iterator_next(
                 */
                ret = notif_iter_data_set_current_ds_file(notif_iter_data);
                if (ret) {
-                       next_ret.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+                       status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
                        goto end;
                }
 
-               next_ret = ctf_fs_ds_file_next(notif_iter_data->ds_file);
+               status = ctf_fs_ds_file_next(notif_iter_data->ds_file, notif);
 
                /*
                 * If we get a notification, we expect to get a
@@ -173,17 +174,56 @@ struct bt_notification_iterator_next_method_return ctf_fs_iterator_next(
                 */
                BT_ASSERT(notif_iter_data->skip_stream_begin_notifs);
 
-               if (next_ret.status == BT_NOTIFICATION_ITERATOR_STATUS_OK) {
-                       BT_ASSERT(bt_notification_get_type(next_ret.notification) ==
+               if (status == BT_NOTIFICATION_ITERATOR_STATUS_OK) {
+                       BT_ASSERT(bt_notification_get_type(*notif) ==
                                BT_NOTIFICATION_TYPE_STREAM_BEGIN);
-                       BT_PUT(next_ret.notification);
-                       next_ret = ctf_fs_ds_file_next(notif_iter_data->ds_file);
-                       BT_ASSERT(next_ret.status != BT_NOTIFICATION_ITERATOR_STATUS_END);
+                       BT_PUT(*notif);
+                       status = ctf_fs_ds_file_next(notif_iter_data->ds_file,
+                               notif);
+                       BT_ASSERT(status != BT_NOTIFICATION_ITERATOR_STATUS_END);
                }
        }
 
 end:
-       return next_ret;
+       return status;
+}
+
+BT_HIDDEN
+enum bt_notification_iterator_status ctf_fs_iterator_next(
+               struct bt_private_connection_private_notification_iterator *iterator,
+               bt_notification_array notifs, uint64_t capacity,
+               uint64_t *count)
+{
+       enum bt_notification_iterator_status status =
+               BT_NOTIFICATION_ITERATOR_STATUS_OK;
+       struct ctf_fs_notif_iter_data *notif_iter_data =
+               bt_private_connection_private_notification_iterator_get_user_data(iterator);
+       uint64_t i = 0;
+
+       while (i < capacity && status == BT_NOTIFICATION_ITERATOR_STATUS_OK) {
+               status = ctf_fs_iterator_next_one(notif_iter_data, &notifs[i]);
+               if (status == BT_NOTIFICATION_ITERATOR_STATUS_OK) {
+                       i++;
+               }
+       }
+
+       if (i > 0) {
+               /*
+                * Even if ctf_fs_iterator_next_one() returned something
+                * else than BT_NOTIFICATION_ITERATOR_STATUS_OK, we
+                * accumulated notification objects in the output
+                * notification array, so we need to return
+                * BT_NOTIFICATION_ITERATOR_STATUS_OK so that they are
+                * transfered to downstream. This other status occurs
+                * again the next time muxer_notif_iter_do_next() is
+                * called, possibly without any accumulated
+                * notification, in which case we'll return it.
+                */
+               *count = i;
+               status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
+       }
+
+       return status;
 }
 
 void ctf_fs_iterator_finalize(struct bt_private_connection_private_notification_iterator *it)
index b54e4722d323caa1a3746e9a8228ac1cd13fc1dd..534cb6b513cc2ec7afeaa6a485970d31e31ecc3a 100644 (file)
@@ -179,7 +179,9 @@ BT_HIDDEN
 void ctf_fs_iterator_finalize(struct bt_private_connection_private_notification_iterator *it);
 
 BT_HIDDEN
-struct bt_notification_iterator_next_method_return ctf_fs_iterator_next(
-               struct bt_private_connection_private_notification_iterator *iterator);
+enum bt_notification_iterator_status ctf_fs_iterator_next(
+               struct bt_private_connection_private_notification_iterator *iterator,
+               bt_notification_array notifs, uint64_t capacity,
+               uint64_t *count);
 
 #endif /* BABELTRACE_PLUGIN_CTF_FS_H */
index 9f0d9e1cbe8f6ca8b426e72ac5884bba445f9b9a..4a3aca8015f33f61e2738b5acff8e82ece0ef648 100644 (file)
@@ -872,26 +872,22 @@ void dmesg_notif_iter_finalize(
                priv_notif_iter));
 }
 
-BT_HIDDEN
-struct bt_notification_iterator_next_method_return dmesg_notif_iter_next(
-               struct bt_private_connection_private_notification_iterator *priv_notif_iter)
+static
+enum bt_notification_iterator_status dmesg_notif_iter_next_one(
+               struct dmesg_notif_iter *dmesg_notif_iter,
+               struct bt_notification **notif)
 {
        ssize_t len;
-       struct dmesg_notif_iter *dmesg_notif_iter =
-               bt_private_connection_private_notification_iterator_get_user_data(
-                       priv_notif_iter);
        struct dmesg_component *dmesg_comp;
-       struct bt_notification_iterator_next_method_return next_ret = {
-               .status = BT_NOTIFICATION_ITERATOR_STATUS_OK,
-               .notification = NULL
-       };
+       enum bt_notification_iterator_status status =
+               BT_NOTIFICATION_ITERATOR_STATUS_OK;
 
        BT_ASSERT(dmesg_notif_iter);
        dmesg_comp = dmesg_notif_iter->dmesg_comp;
        BT_ASSERT(dmesg_comp);
 
        if (dmesg_notif_iter->state == STATE_DONE) {
-               next_ret.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
+               status = BT_NOTIFICATION_ITERATOR_STATUS_END;
                goto end;
        }
 
@@ -909,16 +905,14 @@ struct bt_notification_iterator_next_method_return dmesg_notif_iter_next(
                        &dmesg_notif_iter->linebuf_len, dmesg_notif_iter->fp);
                if (len < 0) {
                        if (errno == EINVAL) {
-                               next_ret.status =
-                                       BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+                               status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
                        } else if (errno == ENOMEM) {
-                               next_ret.status =
+                               status =
                                        BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
                        } else {
                                if (dmesg_notif_iter->state == STATE_EMIT_STREAM_BEGINNING) {
                                        /* Stream did not even begin */
-                                       next_ret.status =
-                                               BT_NOTIFICATION_ITERATOR_STATUS_END;
+                                       status = BT_NOTIFICATION_ITERATOR_STATUS_END;
                                        goto end;
                                } else {
                                        /* End current packet now */
@@ -961,28 +955,28 @@ handle_state:
        switch (dmesg_notif_iter->state) {
        case STATE_EMIT_STREAM_BEGINNING:
                BT_ASSERT(dmesg_notif_iter->tmp_event_notif);
-               next_ret.notification = bt_notification_stream_begin_create(
+               *notif = bt_notification_stream_begin_create(
                        dmesg_comp->graph, dmesg_comp->stream);
                dmesg_notif_iter->state = STATE_EMIT_PACKET_BEGINNING;
                break;
        case STATE_EMIT_PACKET_BEGINNING:
                BT_ASSERT(dmesg_notif_iter->tmp_event_notif);
-               next_ret.notification = bt_notification_packet_begin_create(
+               *notif = bt_notification_packet_begin_create(
                        dmesg_comp->graph, dmesg_comp->packet);
                dmesg_notif_iter->state = STATE_EMIT_EVENT;
                break;
        case STATE_EMIT_EVENT:
                BT_ASSERT(dmesg_notif_iter->tmp_event_notif);
-               BT_MOVE(next_ret.notification,
-                       dmesg_notif_iter->tmp_event_notif);
+               *notif = dmesg_notif_iter->tmp_event_notif;
+               dmesg_notif_iter->tmp_event_notif = NULL;
                break;
        case STATE_EMIT_PACKET_END:
-               next_ret.notification = bt_notification_packet_end_create(
+               *notif = bt_notification_packet_end_create(
                        dmesg_comp->graph, dmesg_comp->packet);
                dmesg_notif_iter->state = STATE_EMIT_STREAM_END;
                break;
        case STATE_EMIT_STREAM_END:
-               next_ret.notification = bt_notification_stream_end_create(
+               *notif = bt_notification_stream_end_create(
                        dmesg_comp->graph, dmesg_comp->stream);
                dmesg_notif_iter->state = STATE_DONE;
                break;
@@ -990,12 +984,52 @@ handle_state:
                break;
        }
 
-       if (!next_ret.notification) {
+       if (!*notif) {
                BT_LOGE("Cannot create notification: dmesg-comp-addr=%p",
                        dmesg_comp);
-               next_ret.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+               status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
        }
 
 end:
-       return next_ret;
+       return status;
+}
+
+BT_HIDDEN
+enum bt_notification_iterator_status dmesg_notif_iter_next(
+               struct bt_private_connection_private_notification_iterator *priv_notif_iter,
+               bt_notification_array notifs, uint64_t capacity,
+               uint64_t *count)
+{
+       struct dmesg_notif_iter *dmesg_notif_iter =
+               bt_private_connection_private_notification_iterator_get_user_data(
+                       priv_notif_iter);
+       enum bt_notification_iterator_status status =
+               BT_NOTIFICATION_ITERATOR_STATUS_OK;
+       uint64_t i = 0;
+
+       while (i < capacity && status == BT_NOTIFICATION_ITERATOR_STATUS_OK) {
+               status = dmesg_notif_iter_next_one(dmesg_notif_iter, &notifs[i]);
+               if (status == BT_NOTIFICATION_ITERATOR_STATUS_OK) {
+                       i++;
+               }
+       }
+
+       if (i > 0) {
+               /*
+                * Even if dmesg_notif_iter_next_one() returned
+                * something else than
+                * BT_NOTIFICATION_ITERATOR_STATUS_OK, we accumulated
+                * notification objects in the output notification
+                * array, so we need to return
+                * BT_NOTIFICATION_ITERATOR_STATUS_OK so that they are
+                * transfered to downstream. This other status occurs
+                * again the next time muxer_notif_iter_do_next() is
+                * called, possibly without any accumulated
+                * notification, in which case we'll return it.
+                */
+               *count = i;
+               status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
+       }
+
+       return status;
 }
index e85a890603b5ddcfcc438a217afd2a99fe5852f2..3e2398cf359a466800bde96892a13c4c14ba1831 100644 (file)
@@ -44,7 +44,9 @@ void dmesg_notif_iter_finalize(
                struct bt_private_connection_private_notification_iterator *priv_notif_iter);
 
 BT_HIDDEN
-struct bt_notification_iterator_next_method_return dmesg_notif_iter_next(
-               struct bt_private_connection_private_notification_iterator *priv_notif_iter);
+enum bt_notification_iterator_status dmesg_notif_iter_next(
+               struct bt_private_connection_private_notification_iterator *priv_notif_iter,
+               bt_notification_array notifs, uint64_t capacity,
+               uint64_t *count);
 
 #endif /* BABELTRACE_PLUGIN_TEXT_DMESG_DMESG_H */
index 776266765e25a99656ce799813de158459070012..1010917d09faada2fa8ef6510b53f3fc8f1bed4b 100644 (file)
@@ -179,11 +179,13 @@ BT_HIDDEN
 enum bt_component_status pretty_consume(struct bt_private_component *component)
 {
        enum bt_component_status ret;
-       struct bt_notification *notif = NULL;
+       bt_notification_array notifs;
        struct bt_notification_iterator *it;
        struct pretty_component *pretty =
                bt_private_component_get_user_data(component);
        enum bt_notification_iterator_status it_ret;
+       uint64_t count = 0;
+       uint64_t i = 0;
 
        if (unlikely(pretty->error)) {
                ret = BT_COMPONENT_STATUS_ERROR;
@@ -191,7 +193,8 @@ enum bt_component_status pretty_consume(struct bt_private_component *component)
        }
 
        it = pretty->input_iterator;
-       it_ret = bt_private_connection_notification_iterator_next(it, &notif);
+       it_ret = bt_private_connection_notification_iterator_next(it, &notifs,
+               &count);
 
        switch (it_ret) {
        case BT_NOTIFICATION_ITERATOR_STATUS_END:
@@ -208,11 +211,22 @@ enum bt_component_status pretty_consume(struct bt_private_component *component)
                goto end;
        }
 
-       BT_ASSERT(notif);
-       ret = handle_notification(pretty, notif);
+       BT_ASSERT(it_ret == BT_NOTIFICATION_ITERATOR_STATUS_OK);
+
+       for (i = 0; i < count; i++) {
+               ret = handle_notification(pretty, notifs[i]);
+               if (ret) {
+                       goto end;
+               }
+
+               bt_put(notifs[i]);
+       }
 
 end:
-       bt_put(notif);
+       for (; i < count; i++) {
+               bt_put(notifs[i]);
+       }
+
        return ret;
 }
 
index b99ab20694e165ccbd9d7e8f18fe5c2428bb4d82..d85364e2b2e2b97641eee80b1e91d447fee82f06 100644 (file)
@@ -1,6 +1,6 @@
 AM_CPPFLAGS += -I$(top_srcdir)/plugins
 
-SUBDIRS = dummy counter muxer .
+SUBDIRS = dummy muxer counter .
 # trimmer
 
 plugindir = "$(PLUGINSDIR)"
index 7ff3d33978759a7a058f607b801db529b91bbb5d..bbcb82769590b4dd7f4bdecf676970c417f8d8e7 100644 (file)
@@ -54,8 +54,10 @@ uint64_t get_total_count(struct counter *counter)
 }
 
 static
-void print_count(struct counter *counter, uint64_t total)
+void print_count(struct counter *counter)
 {
+       uint64_t total = get_total_count(counter);
+
        PRINTF_COUNT("event", "events", event);
        PRINTF_COUNT("stream beginning", "stream beginnings", stream_begin);
        PRINTF_COUNT("stream end", "stream ends", stream_end);
@@ -83,19 +85,18 @@ void print_count(struct counter *counter, uint64_t total)
 }
 
 static
-void try_print_count(struct counter *counter)
+void try_print_count(struct counter *counter, uint64_t notif_count)
 {
-       uint64_t total;
-
        if (counter->step == 0) {
                /* No update */
                return;
        }
 
-       total = get_total_count(counter);
+       counter->at += notif_count;
 
-       if (total % counter->step == 0) {
-               print_count(counter, total);
+       if (counter->at >= counter->step) {
+               counter->at = 0;
+               print_count(counter);
                putchar('\n');
        }
 }
@@ -106,7 +107,7 @@ void try_print_last(struct counter *counter)
        const uint64_t total = get_total_count(counter);
 
        if (total != counter->last_printed_total) {
-               print_count(counter, total);
+               print_count(counter);
        }
 }
 
@@ -215,7 +216,8 @@ enum bt_component_status counter_consume(struct bt_private_component *component)
        struct counter *counter;
        enum bt_notification_iterator_status it_ret;
        int64_t count;
-       struct bt_notification *notif = NULL;
+       uint64_t notif_count;
+       bt_notification_array notifs;
 
        counter = bt_private_component_get_user_data(component);
        BT_ASSERT(counter);
@@ -231,9 +233,9 @@ enum bt_component_status counter_consume(struct bt_private_component *component)
                goto end;
        }
 
-       /* Consume one notification  */
+       /* Consume notifications */
        it_ret = bt_private_connection_notification_iterator_next(
-               counter->notif_iter, &notif);
+               counter->notif_iter, &notifs, &notif_count);
        if (it_ret < 0) {
                ret = BT_COMPONENT_STATUS_ERROR;
                goto end;
@@ -249,53 +251,60 @@ enum bt_component_status counter_consume(struct bt_private_component *component)
                goto end;
        case BT_NOTIFICATION_ITERATOR_STATUS_OK:
        {
-               BT_ASSERT(notif);
-               switch (bt_notification_get_type(notif)) {
-               case BT_NOTIFICATION_TYPE_EVENT:
-                       counter->count.event++;
-                       break;
-               case BT_NOTIFICATION_TYPE_INACTIVITY:
-                       counter->count.inactivity++;
-                       break;
-               case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
-                       counter->count.stream_begin++;
-                       break;
-               case BT_NOTIFICATION_TYPE_STREAM_END:
-                       counter->count.stream_end++;
-                       break;
-               case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
-                       counter->count.packet_begin++;
-                       break;
-               case BT_NOTIFICATION_TYPE_PACKET_END:
-                       counter->count.packet_end++;
-                       break;
-               case BT_NOTIFICATION_TYPE_DISCARDED_EVENTS:
-                       counter->count.discarded_events_notifs++;
-                       count = bt_notification_discarded_events_get_count(
-                               notif);
-                       if (count >= 0) {
-                               counter->count.discarded_events += count;
+               uint64_t i;
+
+               for (i = 0; i < notif_count; i++) {
+                       struct bt_notification *notif = notifs[i];
+
+                       BT_ASSERT(notif);
+                       switch (bt_notification_get_type(notif)) {
+                       case BT_NOTIFICATION_TYPE_EVENT:
+                               counter->count.event++;
+                               break;
+                       case BT_NOTIFICATION_TYPE_INACTIVITY:
+                               counter->count.inactivity++;
+                               break;
+                       case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
+                               counter->count.stream_begin++;
+                               break;
+                       case BT_NOTIFICATION_TYPE_STREAM_END:
+                               counter->count.stream_end++;
+                               break;
+                       case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
+                               counter->count.packet_begin++;
+                               break;
+                       case BT_NOTIFICATION_TYPE_PACKET_END:
+                               counter->count.packet_end++;
+                               break;
+                       case BT_NOTIFICATION_TYPE_DISCARDED_EVENTS:
+                               counter->count.discarded_events_notifs++;
+                               count = bt_notification_discarded_events_get_count(
+                                       notif);
+                               if (count >= 0) {
+                                       counter->count.discarded_events += count;
+                               }
+                               break;
+                       case BT_NOTIFICATION_TYPE_DISCARDED_PACKETS:
+                               counter->count.discarded_packets_notifs++;
+                               count = bt_notification_discarded_packets_get_count(
+                                       notif);
+                               if (count >= 0) {
+                                       counter->count.discarded_packets += count;
+                               }
+                               break;
+                       default:
+                               counter->count.other++;
                        }
-                       break;
-               case BT_NOTIFICATION_TYPE_DISCARDED_PACKETS:
-                       counter->count.discarded_packets_notifs++;
-                       count = bt_notification_discarded_packets_get_count(
-                               notif);
-                       if (count >= 0) {
-                               counter->count.discarded_packets += count;
-                       }
-                       break;
-               default:
-                       counter->count.other++;
+
+                       bt_put(notif);
                }
        }
        default:
                break;
        }
 
-       try_print_count(counter);
+       try_print_count(counter, notif_count);
 
 end:
-       bt_put(notif);
        return ret;
 }
index 6915532bebc39703f629affd3ff24a913f777558..2630f10ecbbd82f55e6022e1fbfa40d378155dc8 100644 (file)
@@ -44,6 +44,7 @@ struct counter {
                uint64_t other;
        } count;
        uint64_t last_printed_total;
+       uint64_t at;
        uint64_t step;
        bool hide_zero;
        bool error;
index 5109259eff34033d172c923f8b8e707baa03d41b..926a6dd56728ef9fc54449c4f448568402ed6097 100644 (file)
@@ -103,9 +103,11 @@ end:
 enum bt_component_status dummy_consume(struct bt_private_component *component)
 {
        enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
-       struct bt_notification *notif = NULL;
+       bt_notification_array notifs;
+       uint64_t count;
        struct dummy *dummy;
        enum bt_notification_iterator_status it_ret;
+       uint64_t i;
 
        dummy = bt_private_component_get_user_data(component);
        BT_ASSERT(dummy);
@@ -122,22 +124,29 @@ enum bt_component_status dummy_consume(struct bt_private_component *component)
 
        /* Consume one notification  */
        it_ret = bt_private_connection_notification_iterator_next(
-               dummy->notif_iter, &notif);
+               dummy->notif_iter, &notifs, &count);
        switch (it_ret) {
-       case BT_NOTIFICATION_ITERATOR_STATUS_ERROR:
-               ret = BT_COMPONENT_STATUS_ERROR;
-               goto end;
+       case BT_NOTIFICATION_ITERATOR_STATUS_OK:
+               ret = BT_COMPONENT_STATUS_OK;
+
+               for (i = 0; i < count; i++) {
+                       bt_put(notifs[i]);
+               }
+
+               break;
        case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
                ret = BT_COMPONENT_STATUS_AGAIN;
                goto end;
        case BT_NOTIFICATION_ITERATOR_STATUS_END:
                ret = BT_COMPONENT_STATUS_END;
                goto end;
+       case BT_NOTIFICATION_ITERATOR_STATUS_ERROR:
+               ret = BT_COMPONENT_STATUS_ERROR;
+               goto end;
        default:
                break;
        }
 
 end:
-       bt_put(notif);
        return ret;
 }
index 61cd4883319ff3332037d844df015e75d93fada7..663bd3460d0a7fe741631eba2a02aba1e9bdf7c4 100644 (file)
@@ -61,21 +61,8 @@ struct muxer_upstream_notif_iter {
        /* Owned by this, NULL if ended */
        struct bt_notification_iterator *notif_iter;
 
-       /* Owned by this */
-       struct bt_notification *notif;
-
-       /*
-        * This flag is true if the upstream notification iterator's
-        * current notification must be considered for the multiplexing
-        * operations. If the upstream iterator returns
-        * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN, then this object
-        * is considered invalid, because its current notification is
-        * still the previous one, but we already took it into account.
-        *
-        * The value of this flag is not important if notif_iter above
-        * is NULL (which means the upstream iterator is finished).
-        */
-       bool is_valid;
+       /* Contains `struct bt_notification *`, owned by this */
+       GQueue *notifs;
 };
 
 enum muxer_notif_iter_clock_class_expectation {
@@ -107,9 +94,6 @@ struct muxer_notif_iter {
         */
        GList *newly_connected_priv_ports;
 
-       /* Next thing to return by the "next" method */
-       struct bt_notification_iterator_next_method_return next_next_return;
-
        /* Last time returned in a notification */
        int64_t last_returned_ts_ns;
 
@@ -133,12 +117,23 @@ void destroy_muxer_upstream_notif_iter(
        }
 
        BT_LOGD("Destroying muxer's upstream notification iterator wrapper: "
-               "addr=%p, notif-iter-addr=%p, is-valid=%d",
+               "addr=%p, notif-iter-addr=%p, queue-len=%u",
                muxer_upstream_notif_iter,
                muxer_upstream_notif_iter->notif_iter,
-               muxer_upstream_notif_iter->is_valid);
+               muxer_upstream_notif_iter->notifs->length);
        bt_put(muxer_upstream_notif_iter->notif_iter);
-       bt_put(muxer_upstream_notif_iter->notif);
+
+       if (muxer_upstream_notif_iter->notifs) {
+               struct bt_notification *notif;
+
+               while ((notif = g_queue_pop_head(
+                               muxer_upstream_notif_iter->notifs))) {
+                       bt_put(notif);
+               }
+
+               g_queue_free(muxer_upstream_notif_iter->notifs);
+       }
+
        g_free(muxer_upstream_notif_iter);
 }
 
@@ -157,7 +152,13 @@ struct muxer_upstream_notif_iter *muxer_notif_iter_add_upstream_notif_iter(
        }
 
        muxer_upstream_notif_iter->notif_iter = bt_get(notif_iter);
-       muxer_upstream_notif_iter->is_valid = false;
+       muxer_upstream_notif_iter->notifs = g_queue_new();
+       if (!muxer_upstream_notif_iter->notifs) {
+               BT_LOGE_STR("Failed to allocate a GQueue.");
+
+               goto end;
+       }
+
        g_ptr_array_add(muxer_notif_iter->muxer_upstream_notif_iters,
                muxer_upstream_notif_iter);
        BT_LOGD("Added muxer's upstream notification iterator wrapper: "
@@ -452,36 +453,45 @@ enum bt_notification_iterator_status muxer_upstream_notif_iter_next(
                struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
 {
        enum bt_notification_iterator_status status;
-       struct bt_notification *notif = NULL;
+       bt_notification_array notifs;
+       uint64_t i;
+       uint64_t count;
 
        BT_LOGV("Calling upstream notification iterator's \"next\" method: "
                "muxer-upstream-notif-iter-wrap-addr=%p, notif-iter-addr=%p",
                muxer_upstream_notif_iter,
                muxer_upstream_notif_iter->notif_iter);
        status = bt_private_connection_notification_iterator_next(
-               muxer_upstream_notif_iter->notif_iter, &notif);
+               muxer_upstream_notif_iter->notif_iter, &notifs, &count);
        BT_LOGV("Upstream notification iterator's \"next\" method returned: "
                "status=%s", bt_notification_iterator_status_string(status));
 
        switch (status) {
        case BT_NOTIFICATION_ITERATOR_STATUS_OK:
                /*
-                * Notification iterator's current notification is valid:
-                * it must be considered for muxing operations.
+                * Notification iterator's current notification is
+                * valid: it must be considered for muxing operations.
                 */
                BT_LOGV_STR("Validated upstream notification iterator wrapper.");
-               muxer_upstream_notif_iter->is_valid = true;
-               BT_MOVE(muxer_upstream_notif_iter->notif, notif);
+               BT_ASSERT(count > 0);
+
+               /* Move notifications to our queue */
+               for (i = 0; i < count; i++) {
+                       /*
+                        * Push to tail in order; other side
+                        * (muxer_notif_iter_do_next_one()) consumes
+                        * from the head first.
+                        */
+                       g_queue_push_tail(muxer_upstream_notif_iter->notifs,
+                               notifs[i]);
+               }
                break;
        case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
                /*
                 * Notification iterator's current notification is not
                 * valid anymore. Return
-                * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN
-                * immediately.
+                * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN immediately.
                 */
-               BT_LOGV_STR("Invalidated upstream notification iterator wrapper because of BT_NOTIFICATION_ITERATOR_STATUS_AGAIN.");
-               muxer_upstream_notif_iter->is_valid = false;
                break;
        case BT_NOTIFICATION_ITERATOR_STATUS_END:       /* Fall-through. */
        case BT_NOTIFICATION_ITERATOR_STATUS_CANCELED:
@@ -490,9 +500,7 @@ enum bt_notification_iterator_status muxer_upstream_notif_iter_next(
                 * won't be considered again to find the youngest
                 * notification.
                 */
-               BT_LOGV_STR("Invalidated upstream notification iterator wrapper because of BT_NOTIFICATION_ITERATOR_STATUS_END or BT_NOTIFICATION_ITERATOR_STATUS_CANCELED.");
                BT_PUT(muxer_upstream_notif_iter->notif_iter);
-               muxer_upstream_notif_iter->is_valid = false;
                status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
                break;
        default:
@@ -503,7 +511,6 @@ enum bt_notification_iterator_status muxer_upstream_notif_iter_next(
                break;
        }
 
-       BT_ASSERT(!notif);
        return status;
 }
 
@@ -518,7 +525,7 @@ int muxer_notif_iter_handle_newly_connected_ports(
 
        /*
         * Here we create one upstream notification iterator for each
-        * newly connected port. We do not perform an initial "next" on
+        * newly connected port. We do NOT perform an initial "next" on
         * those new upstream notification iterators: they are
         * invalidated, to be validated later. The list of newly
         * connected ports to handle here is updated by
@@ -906,8 +913,8 @@ muxer_notif_iter_youngest_upstream_notif_iter(
                        continue;
                }
 
-               BT_ASSERT(cur_muxer_upstream_notif_iter->is_valid);
-               notif = cur_muxer_upstream_notif_iter->notif;
+               BT_ASSERT(cur_muxer_upstream_notif_iter->notifs->length > 0);
+               notif = g_queue_peek_head(cur_muxer_upstream_notif_iter->notifs);
                BT_ASSERT(notif);
                ret = get_notif_ts_ns(muxer_comp, muxer_notif_iter, notif,
                        muxer_notif_iter->last_returned_ts_ns, &notif_ts_ns);
@@ -946,8 +953,12 @@ enum bt_notification_iterator_status validate_muxer_upstream_notif_iter(
                "muxer-upstream-notif-iter-wrap-addr=%p",
                muxer_upstream_notif_iter);
 
-       if (muxer_upstream_notif_iter->is_valid ||
+       if (muxer_upstream_notif_iter->notifs->length > 0 ||
                        !muxer_upstream_notif_iter->notif_iter) {
+               BT_LOGV("Already valid or not considered: "
+                       "queue-len=%u, upstream-notif-iter-addr=%p",
+                       muxer_upstream_notif_iter->notifs->length,
+                       muxer_upstream_notif_iter->notif_iter);
                goto end;
        }
 
@@ -960,7 +971,7 @@ end:
 
 static
 enum bt_notification_iterator_status validate_muxer_upstream_notif_iters(
-       struct muxer_notif_iter *muxer_notif_iter)
+               struct muxer_notif_iter *muxer_notif_iter)
 {
        enum bt_notification_iterator_status status =
                BT_NOTIFICATION_ITERATOR_STATUS_OK;
@@ -1019,16 +1030,15 @@ end:
        return status;
 }
 
-static
-struct bt_notification_iterator_next_method_return muxer_notif_iter_do_next(
+static inline
+enum bt_notification_iterator_status muxer_notif_iter_do_next_one(
                struct muxer_comp *muxer_comp,
-               struct muxer_notif_iter *muxer_notif_iter)
+               struct muxer_notif_iter *muxer_notif_iter,
+               struct bt_notification **notif)
 {
+       enum bt_notification_iterator_status status =
+               BT_NOTIFICATION_ITERATOR_STATUS_OK;
        struct muxer_upstream_notif_iter *muxer_upstream_notif_iter = NULL;
-       struct bt_notification_iterator_next_method_return next_return = {
-               .notification = NULL,
-               .status = BT_NOTIFICATION_ITERATOR_STATUS_OK,
-       };
        int64_t next_return_ts;
 
        while (true) {
@@ -1040,14 +1050,12 @@ struct bt_notification_iterator_next_method_return muxer_notif_iter_do_next(
                                "muxer-comp-addr=%p, muxer-notif-iter-addr=%p, "
                                "ret=%d",
                                muxer_comp, muxer_notif_iter, ret);
-                       next_return.status =
-                               BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+                       status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
                        goto end;
                }
 
-               next_return.status =
-                       validate_muxer_upstream_notif_iters(muxer_notif_iter);
-               if (next_return.status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
+               status = validate_muxer_upstream_notif_iters(muxer_notif_iter);
+               if (status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
                        /* validate_muxer_upstream_notif_iters() logs details */
                        goto end;
                }
@@ -1075,21 +1083,19 @@ struct bt_notification_iterator_next_method_return muxer_notif_iter_do_next(
         * amongst those, of which the current notification is the
         * youngest.
         */
-       next_return.status =
-               muxer_notif_iter_youngest_upstream_notif_iter(muxer_comp,
+       status = muxer_notif_iter_youngest_upstream_notif_iter(muxer_comp,
                        muxer_notif_iter, &muxer_upstream_notif_iter,
                        &next_return_ts);
-       if (next_return.status < 0 ||
-                       next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_END ||
-                       next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_CANCELED) {
-               if (next_return.status < 0) {
+       if (status < 0 || status == BT_NOTIFICATION_ITERATOR_STATUS_END ||
+                       status == BT_NOTIFICATION_ITERATOR_STATUS_CANCELED) {
+               if (status < 0) {
                        BT_LOGE("Cannot find the youngest upstream notification iterator wrapper: "
                                "status=%s",
-                               bt_notification_iterator_status_string(next_return.status));
+                               bt_notification_iterator_status_string(status));
                } else {
                        BT_LOGV("Cannot find the youngest upstream notification iterator wrapper: "
                                "status=%s",
-                               bt_notification_iterator_status_string(next_return.status));
+                               bt_notification_iterator_status_string(status));
                }
 
                goto end;
@@ -1101,7 +1107,7 @@ struct bt_notification_iterator_next_method_return muxer_notif_iter_do_next(
                        "last-returned-ts=%" PRId64,
                        muxer_notif_iter, next_return_ts,
                        muxer_notif_iter->last_returned_ts_ns);
-               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+               status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
                goto end;
        }
 
@@ -1110,21 +1116,58 @@ struct bt_notification_iterator_next_method_return muxer_notif_iter_do_next(
                "muxer-upstream-notif-iter-wrap-addr=%p, "
                "ts=%" PRId64,
                muxer_notif_iter, muxer_upstream_notif_iter, next_return_ts);
-       BT_ASSERT(next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_OK);
+       BT_ASSERT(status == BT_NOTIFICATION_ITERATOR_STATUS_OK);
        BT_ASSERT(muxer_upstream_notif_iter);
-       next_return.notification = bt_get(muxer_upstream_notif_iter->notif);
-       BT_ASSERT(next_return.notification);
 
        /*
-        * We invalidate the upstream notification iterator so that, the
-        * next time this function is called,
-        * validate_muxer_upstream_notif_iters() will make it valid.
+        * Consume from the queue's head: other side
+        * (muxer_upstream_notif_iter_next()) writes to the tail.
         */
-       muxer_upstream_notif_iter->is_valid = false;
+       *notif = g_queue_pop_head(muxer_upstream_notif_iter->notifs);
+       BT_ASSERT(*notif);
        muxer_notif_iter->last_returned_ts_ns = next_return_ts;
 
 end:
-       return next_return;
+       return status;
+}
+
+static
+enum bt_notification_iterator_status muxer_notif_iter_do_next(
+               struct muxer_comp *muxer_comp,
+               struct muxer_notif_iter *muxer_notif_iter,
+               bt_notification_array notifs, uint64_t capacity,
+               uint64_t *count)
+{
+       enum bt_notification_iterator_status status =
+               BT_NOTIFICATION_ITERATOR_STATUS_OK;
+       uint64_t i = 0;
+
+       while (i < capacity && status == BT_NOTIFICATION_ITERATOR_STATUS_OK) {
+               status = muxer_notif_iter_do_next_one(muxer_comp,
+                       muxer_notif_iter, &notifs[i]);
+               if (status == BT_NOTIFICATION_ITERATOR_STATUS_OK) {
+                       i++;
+               }
+       }
+
+       if (i > 0) {
+               /*
+                * Even if muxer_notif_iter_do_next_one() returned
+                * something else than
+                * BT_NOTIFICATION_ITERATOR_STATUS_OK, we accumulated
+                * notification objects in the output notification
+                * array, so we need to return
+                * BT_NOTIFICATION_ITERATOR_STATUS_OK so that they are
+                * transfered to downstream. This other status occurs
+                * again the next time muxer_notif_iter_do_next() is
+                * called, possibly without any accumulated
+                * notification, in which case we'll return it.
+                */
+               *count = i;
+               status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
+       }
+
+       return status;
 }
 
 static
@@ -1338,10 +1381,12 @@ void muxer_notif_iter_finalize(
 }
 
 BT_HIDDEN
-struct bt_notification_iterator_next_method_return muxer_notif_iter_next(
-               struct bt_private_connection_private_notification_iterator *priv_notif_iter)
+enum bt_notification_iterator_status muxer_notif_iter_next(
+               struct bt_private_connection_private_notification_iterator *priv_notif_iter,
+               bt_notification_array notifs, uint64_t capacity,
+               uint64_t *count)
 {
-       struct bt_notification_iterator_next_method_return next_ret;
+       enum bt_notification_iterator_status status;
        struct muxer_notif_iter *muxer_notif_iter =
                bt_private_connection_private_notification_iterator_get_user_data(priv_notif_iter);
        struct bt_private_component *priv_comp = NULL;
@@ -1353,7 +1398,6 @@ struct bt_notification_iterator_next_method_return muxer_notif_iter_next(
        BT_ASSERT(priv_comp);
        muxer_comp = bt_private_component_get_user_data(priv_comp);
        BT_ASSERT(muxer_comp);
-
        BT_LOGV("Muxer component's notification iterator's \"next\" method called: "
                "comp-addr=%p, muxer-comp-addr=%p, muxer-notif-iter-addr=%p, "
                "notif-iter-addr=%p",
@@ -1365,28 +1409,27 @@ struct bt_notification_iterator_next_method_return muxer_notif_iter_next(
                        "comp-addr=%p, muxer-comp-addr=%p, muxer-notif-iter-addr=%p, "
                        "notif-iter-addr=%p",
                        priv_comp, muxer_comp, muxer_notif_iter, priv_notif_iter);
-               next_ret.notification = NULL;
-               next_ret.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+               status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
                goto end;
        }
 
-       next_ret = muxer_notif_iter_do_next(muxer_comp, muxer_notif_iter);
-       if (next_ret.status < 0) {
+       status = muxer_notif_iter_do_next(muxer_comp, muxer_notif_iter,
+               notifs, capacity, count);
+       if (status < 0) {
                BT_LOGE("Cannot get next notification: "
                        "comp-addr=%p, muxer-comp-addr=%p, muxer-notif-iter-addr=%p, "
                        "notif-iter-addr=%p, status=%s",
                        priv_comp, muxer_comp, muxer_notif_iter, priv_notif_iter,
-                       bt_notification_iterator_status_string(next_ret.status));
+                       bt_notification_iterator_status_string(status));
        } else {
                BT_LOGV("Returning from muxer component's notification iterator's \"next\" method: "
-                       "status=%s, notif-addr=%p",
-                       bt_notification_iterator_status_string(next_ret.status),
-                       next_ret.notification);
+                       "status=%s",
+                       bt_notification_iterator_status_string(status));
        }
 
 end:
        bt_put(priv_comp);
-       return next_ret;
+       return status;
 }
 
 BT_HIDDEN
index b7e05ba3a0af84081c887e434e55b3f06719988b..5e7f121849525430e2fed1693e4b0612f18e3780 100644 (file)
@@ -24,6 +24,7 @@
  * SOFTWARE.
  */
 
+#include <stdint.h>
 #include <babeltrace/babeltrace-internal.h>
 
 BT_HIDDEN
@@ -45,8 +46,10 @@ void muxer_notif_iter_finalize(
                struct bt_private_connection_private_notification_iterator *priv_notif_iter);
 
 BT_HIDDEN
-struct bt_notification_iterator_next_method_return muxer_notif_iter_next(
-               struct bt_private_connection_private_notification_iterator *priv_notif_iter);
+enum bt_notification_iterator_status muxer_notif_iter_next(
+               struct bt_private_connection_private_notification_iterator *priv_notif_iter,
+               bt_notification_array notifs, uint64_t capacity,
+               uint64_t *count);
 
 BT_HIDDEN
 void muxer_port_connected(
index f9d521300818638ed4a633659efadb163ce00e44..4d0357c604a76f81beae6108e4a9bb7c2f023e35 100644 (file)
@@ -40,15 +40,12 @@ static void dummy_iterator_finalize_method(
 {
 }
 
-static struct bt_notification_iterator_next_method_return dummy_iterator_next_method(
-               struct bt_private_connection_private_notification_iterator *private_iterator)
+static enum bt_notification_iterator_status dummy_iterator_next_method(
+               struct bt_private_connection_private_notification_iterator *private_iterator,
+               bt_notification_array notifs, uint64_t capacity,
+               uint64_t *count)
 {
-       struct bt_notification_iterator_next_method_return next_return = {
-               .notification = NULL,
-               .status = BT_NOTIFICATION_ITERATOR_STATUS_OK,
-       };
-
-       return next_return;
+       return BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
 }
 
 static struct bt_component_class_query_method_return query_method(
index f076af51095e93dd808e873ce877e783bc24428d..4cecf36282cc68bf678f9fbad61141d0d7a320f8 100644 (file)
@@ -404,96 +404,61 @@ enum bt_notification_iterator_status src_iter_init(
 }
 
 static
-struct bt_notification_iterator_next_method_return src_iter_next_seq(
-               struct src_iter_user_data *user_data)
+void src_iter_next_seq_one(struct src_iter_user_data *user_data,
+               struct bt_notification **notif)
 {
-       struct bt_notification_iterator_next_method_return next_return = {
-               .status = BT_NOTIFICATION_ITERATOR_STATUS_OK,
-       };
-       int64_t cur_ts_ns;
        struct bt_packet *event_packet = NULL;
 
-       BT_ASSERT(user_data->seq);
-       cur_ts_ns = user_data->seq[user_data->at];
-
-       switch (cur_ts_ns) {
-       case SEQ_END:
-               next_return.status =
-                       BT_NOTIFICATION_ITERATOR_STATUS_END;
-               break;
+       switch (user_data->seq[user_data->at]) {
        case SEQ_INACTIVITY:
-               next_return.notification =
-                       bt_notification_inactivity_create(graph,
+               *notif = bt_notification_inactivity_create(graph,
                                src_empty_cc_prio_map);
-               BT_ASSERT(next_return.notification);
                break;
        case SEQ_STREAM1_BEGIN:
-               next_return.notification =
-                       bt_notification_stream_begin_create(graph, src_stream1);
-               BT_ASSERT(next_return.notification);
+               *notif = bt_notification_stream_begin_create(graph,
+                       src_stream1);
                break;
        case SEQ_STREAM2_BEGIN:
-               next_return.notification =
-                       bt_notification_stream_begin_create(graph, src_stream2);
-               BT_ASSERT(next_return.notification);
+               *notif = bt_notification_stream_begin_create(graph,
+                       src_stream2);
                break;
        case SEQ_STREAM1_END:
-               next_return.notification =
-                       bt_notification_stream_end_create(graph, src_stream1);
-               BT_ASSERT(next_return.notification);
+               *notif = bt_notification_stream_end_create(graph, src_stream1);
                break;
        case SEQ_STREAM2_END:
-               next_return.notification =
-                       bt_notification_stream_end_create(graph, src_stream2);
-               BT_ASSERT(next_return.notification);
+               *notif = bt_notification_stream_end_create(graph, src_stream2);
                break;
        case SEQ_STREAM1_PACKET1_BEGIN:
-               next_return.notification =
-                       bt_notification_packet_begin_create(graph,
-                               src_stream1_packet1);
-               BT_ASSERT(next_return.notification);
+               *notif = bt_notification_packet_begin_create(graph,
+                       src_stream1_packet1);
                break;
        case SEQ_STREAM1_PACKET2_BEGIN:
-               next_return.notification =
-                       bt_notification_packet_begin_create(graph,
-                               src_stream1_packet2);
-               BT_ASSERT(next_return.notification);
+               *notif = bt_notification_packet_begin_create(graph,
+                       src_stream1_packet2);
                break;
        case SEQ_STREAM2_PACKET1_BEGIN:
-               next_return.notification =
-                       bt_notification_packet_begin_create(graph,
-                               src_stream2_packet1);
-               BT_ASSERT(next_return.notification);
+               *notif = bt_notification_packet_begin_create(graph,
+                       src_stream2_packet1);
                break;
        case SEQ_STREAM2_PACKET2_BEGIN:
-               next_return.notification =
-                       bt_notification_packet_begin_create(graph,
-                               src_stream2_packet2);
-               BT_ASSERT(next_return.notification);
+               *notif = bt_notification_packet_begin_create(graph,
+                       src_stream2_packet2);
                break;
        case SEQ_STREAM1_PACKET1_END:
-               next_return.notification =
-                       bt_notification_packet_end_create(graph,
-                               src_stream1_packet1);
-               BT_ASSERT(next_return.notification);
+               *notif = bt_notification_packet_end_create(graph,
+                       src_stream1_packet1);
                break;
        case SEQ_STREAM1_PACKET2_END:
-               next_return.notification =
-                       bt_notification_packet_end_create(graph,
-                               src_stream1_packet2);
-               BT_ASSERT(next_return.notification);
+               *notif = bt_notification_packet_end_create(graph,
+                       src_stream1_packet2);
                break;
        case SEQ_STREAM2_PACKET1_END:
-               next_return.notification =
-                       bt_notification_packet_end_create(graph,
-                               src_stream2_packet1);
-               BT_ASSERT(next_return.notification);
+               *notif = bt_notification_packet_end_create(graph,
+                       src_stream2_packet1);
                break;
        case SEQ_STREAM2_PACKET2_END:
-               next_return.notification =
-                       bt_notification_packet_end_create(graph,
-                               src_stream2_packet2);
-               BT_ASSERT(next_return.notification);
+               *notif = bt_notification_packet_end_create(graph,
+                       src_stream2_packet2);
                break;
        case SEQ_EVENT_STREAM1_PACKET1:
                event_packet = src_stream1_packet1;
@@ -512,33 +477,54 @@ struct bt_notification_iterator_next_method_return src_iter_next_seq(
        }
 
        if (event_packet) {
-               next_return.notification =
-                       bt_notification_event_create(graph, src_event_class,
-                               event_packet, src_empty_cc_prio_map);
-               BT_ASSERT(next_return.notification);
+               *notif = bt_notification_event_create(graph, src_event_class,
+                       event_packet, src_empty_cc_prio_map);
+       }
+
+       BT_ASSERT(*notif);
+       user_data->at++;
+}
+
+static
+enum bt_notification_iterator_status src_iter_next_seq(
+               struct src_iter_user_data *user_data,
+               bt_notification_array notifs, uint64_t capacity,
+               uint64_t *count)
+{
+       enum bt_notification_iterator_status status =
+               BT_NOTIFICATION_ITERATOR_STATUS_OK;
+       uint64_t i = 0;
+
+       BT_ASSERT(user_data->seq);
+
+       if (user_data->seq[user_data->at] == SEQ_END) {
+               status = BT_NOTIFICATION_ITERATOR_STATUS_END;
+               goto end;
        }
 
-       if (next_return.status != BT_NOTIFICATION_ITERATOR_STATUS_END) {
-               user_data->at++;
+       while (i < capacity && user_data->seq[user_data->at] != SEQ_END) {
+               src_iter_next_seq_one(user_data, &notifs[i]);
+               i++;
        }
 
-       return next_return;
+       BT_ASSERT(i > 0 && i <= capacity);
+       *count = i;
+
+end:
+       return status;
 }
 
 static
-struct bt_notification_iterator_next_method_return src_iter_next(
-               struct bt_private_connection_private_notification_iterator *priv_iterator)
+enum bt_notification_iterator_status src_iter_next(
+               struct bt_private_connection_private_notification_iterator *priv_iterator,
+               bt_notification_array notifs, uint64_t capacity,
+               uint64_t *count)
 {
-       struct bt_notification_iterator_next_method_return next_return = {
-               .status = BT_NOTIFICATION_ITERATOR_STATUS_OK,
-               .notification = NULL,
-       };
        struct src_iter_user_data *user_data =
                bt_private_connection_private_notification_iterator_get_user_data(priv_iterator);
 
        BT_ASSERT(user_data);
-       next_return = src_iter_next_seq(user_data);
-       return next_return;
+       return src_iter_next_seq(user_data, notifs, capacity, count);
 }
 
 static
@@ -560,40 +546,9 @@ void src_finalize(struct bt_private_component *private_component)
 }
 
 static
-enum bt_notification_iterator_status common_consume(
-               struct bt_notification_iterator *notif_iter,
-               bool is_output_port_notif_iter)
+void append_test_events_from_notification(struct bt_notification *notification)
 {
-       enum bt_notification_iterator_status ret;
-       struct bt_notification *notification = NULL;
        struct test_event test_event = { 0 };
-       bool do_append_test_event = true;
-       BT_ASSERT(notif_iter);
-
-       if (is_output_port_notif_iter) {
-               ret = bt_output_port_notification_iterator_next(notif_iter,
-                       &notification);
-       } else {
-               ret = bt_private_connection_notification_iterator_next(
-                       notif_iter, &notification);
-       }
-
-       if (ret < 0) {
-               do_append_test_event = false;
-               goto end;
-       }
-
-       switch (ret) {
-       case BT_NOTIFICATION_ITERATOR_STATUS_END:
-               test_event.type = TEST_EV_TYPE_END;
-               goto end;
-       case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
-               abort();
-       default:
-               break;
-       }
-
-       BT_ASSERT(notification);
 
        switch (bt_notification_get_type(notification)) {
        case BT_NOTIFICATION_TYPE_EVENT:
@@ -644,12 +599,54 @@ enum bt_notification_iterator_status common_consume(
                BT_ASSERT(test_event.stream);
        }
 
-end:
-       if (do_append_test_event) {
+       append_test_event(&test_event);
+}
+
+static
+enum bt_notification_iterator_status common_consume(
+               struct bt_notification_iterator *notif_iter,
+               bool is_output_port_notif_iter)
+{
+       enum bt_notification_iterator_status ret;
+       bt_notification_array notifications = NULL;
+       uint64_t count = 0;
+       struct test_event test_event = { 0 };
+       uint64_t i;
+
+       BT_ASSERT(notif_iter);
+
+       if (is_output_port_notif_iter) {
+               ret = bt_output_port_notification_iterator_next(notif_iter,
+                       &notifications, &count);
+       } else {
+               ret = bt_private_connection_notification_iterator_next(
+                       notif_iter, &notifications, &count);
+       }
+
+       if (ret < 0) {
+               goto end;
+       }
+
+       switch (ret) {
+       case BT_NOTIFICATION_ITERATOR_STATUS_END:
+               test_event.type = TEST_EV_TYPE_END;
                append_test_event(&test_event);
+               goto end;
+       case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
+               abort();
+       default:
+               break;
        }
 
-       bt_put(notification);
+       BT_ASSERT(notifications);
+       BT_ASSERT(count > 0);
+
+       for (i = 0; i < count; i++) {
+               append_test_events_from_notification(notifications[i]);
+               bt_put(notifications[i]);
+       }
+
+end:
        return ret;
 }
 
index e69a61a860f128133106c1cc2a92193636c705b3..efc2cc5f30acb5299f8f1c7e3781790492165731 100644 (file)
@@ -285,14 +285,12 @@ size_t event_pos(struct event *event)
 }
 
 static
-struct bt_notification_iterator_next_method_return src_iter_next(
-               struct bt_private_connection_private_notification_iterator *priv_iterator)
+enum bt_notification_iterator_status src_iter_next(
+               struct bt_private_connection_private_notification_iterator *priv_iterator,
+               bt_notification_array notifs, uint64_t capacity,
+               uint64_t *count)
 {
-       struct bt_notification_iterator_next_method_return ret = {
-               .status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR,
-       };
-
-       return ret;
+       return BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
 }
 
 static
index 5cbe9fee7ef6165d2a20540c3da9e080973bf475..58a4331f3fa743de4a942b16a74542626928f043 100644 (file)
@@ -131,22 +131,22 @@ static int64_t seq4[] = {
 
 static int64_t seq1_with_again[] = {
        SEQ_STREAM_BEGIN, SEQ_PACKET_BEGIN, 24, 53, 97, 105, 119, 210,
-       SEQ_AGAIN, 222, 240, 292, 317, 353, 407, 433, 473, 487, 504,
-       572, 615, 708, 766, 850, 852, 931, 951, 956, 996,
+       SEQ_AGAIN, SEQ_AGAIN, 222, 240, 292, 317, 353, 407, 433, 473,
+       487, 504, 572, 615, 708, 766, 850, 852, 931, 951, 956, 996,
        SEQ_PACKET_END, SEQ_STREAM_END, SEQ_END,
 };
 
 static int64_t seq2_with_again[] = {
        SEQ_STREAM_BEGIN, SEQ_PACKET_BEGIN, 51, 59, 68, 77, 91, 121,
        139, 170, 179, 266, 352, 454, 478, 631, 644, 668, 714, 744, 750,
-       778, 790, 836, SEQ_AGAIN, SEQ_PACKET_END, SEQ_STREAM_END,
-       SEQ_END,
+       778, 790, 836, SEQ_AGAIN, SEQ_AGAIN, SEQ_PACKET_END,
+       SEQ_STREAM_END, SEQ_END,
 };
 
 static int64_t seq3_with_again[] = {
        SEQ_STREAM_BEGIN, SEQ_PACKET_BEGIN, 8, 71, 209, 254, 298, 320,
-       350, 393, 419, 624, 651, SEQ_AGAIN, 678, 717, 731, 733, 788,
-       819, 820, 857, 892, 903, 944, 998, SEQ_PACKET_END,
+       350, 393, 419, 624, 651, SEQ_AGAIN, SEQ_AGAIN, 678, 717, 731,
+       733, 788, 819, 820, 857, 892, 903, 944, 998, SEQ_PACKET_END,
        SEQ_STREAM_END, SEQ_END,
 };
 
@@ -534,12 +534,12 @@ struct bt_notification *src_create_event_notif(struct bt_packet *packet,
 }
 
 static
-struct bt_notification_iterator_next_method_return src_iter_next_seq(
-               struct src_iter_user_data *user_data)
+enum bt_notification_iterator_status src_iter_next_seq(
+               struct src_iter_user_data *user_data,
+               bt_notification_array notifs)
 {
-       struct bt_notification_iterator_next_method_return next_return = {
-               .status = BT_NOTIFICATION_ITERATOR_STATUS_OK,
-       };
+       enum bt_notification_iterator_status status =
+               BT_NOTIFICATION_ITERATOR_STATUS_OK;
        int64_t cur_ts_ns;
        struct bt_stream *stream;
 
@@ -548,63 +548,57 @@ struct bt_notification_iterator_next_method_return src_iter_next_seq(
 
        switch (cur_ts_ns) {
        case SEQ_END:
-               next_return.status =
-                       BT_NOTIFICATION_ITERATOR_STATUS_END;
+               status = BT_NOTIFICATION_ITERATOR_STATUS_END;
                break;
        case SEQ_AGAIN:
-               next_return.status =
-                       BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
+               status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
                break;
        case SEQ_PACKET_BEGIN:
-               next_return.notification =
-                       bt_notification_packet_begin_create(graph,
+               notifs[0] = bt_notification_packet_begin_create(graph,
                                user_data->packet);
-               BT_ASSERT(next_return.notification);
+               BT_ASSERT(notifs[0]);
                break;
        case SEQ_PACKET_END:
-               next_return.notification =
-                       bt_notification_packet_end_create(graph,
+               notifs[0] = bt_notification_packet_end_create(graph,
                                user_data->packet);
-               BT_ASSERT(next_return.notification);
+               BT_ASSERT(notifs[0]);
                break;
        case SEQ_STREAM_BEGIN:
                stream = bt_packet_get_stream(user_data->packet);
-               next_return.notification =
-                       bt_notification_stream_begin_create(graph, stream);
-               BT_ASSERT(next_return.notification);
+               notifs[0] = bt_notification_stream_begin_create(graph, stream);
+               BT_ASSERT(notifs[0]);
                bt_put(stream);
                break;
        case SEQ_STREAM_END:
                stream = bt_packet_get_stream(user_data->packet);
-               next_return.notification =
-                       bt_notification_stream_end_create(graph, stream);
-               BT_ASSERT(next_return.notification);
+               notifs[0] = bt_notification_stream_end_create(graph, stream);
+               BT_ASSERT(notifs[0]);
                bt_put(stream);
                break;
        default:
        {
-               next_return.notification = src_create_event_notif(
-                       user_data->packet, src_cc_prio_map, cur_ts_ns);
-               BT_ASSERT(next_return.notification);
+               notifs[0] = src_create_event_notif(user_data->packet,
+                       src_cc_prio_map, cur_ts_ns);
+               BT_ASSERT(notifs[0]);
                break;
        }
        }
 
-       if (next_return.status != BT_NOTIFICATION_ITERATOR_STATUS_END) {
+       if (status != BT_NOTIFICATION_ITERATOR_STATUS_END) {
                user_data->at++;
        }
 
-       return next_return;
+       return status;
 }
 
 static
-struct bt_notification_iterator_next_method_return src_iter_next(
-               struct bt_private_connection_private_notification_iterator *priv_iterator)
+enum bt_notification_iterator_status src_iter_next(
+               struct bt_private_connection_private_notification_iterator *priv_iterator,
+               bt_notification_array notifs, uint64_t capacity,
+               uint64_t *count)
 {
-       struct bt_notification_iterator_next_method_return next_return = {
-               .notification = NULL,
-               .status = BT_NOTIFICATION_ITERATOR_STATUS_OK,
-       };
+       enum bt_notification_iterator_status status =
+               BT_NOTIFICATION_ITERATOR_STATUS_OK;
        struct src_iter_user_data *user_data =
                bt_private_connection_private_notification_iterator_get_user_data(priv_iterator);
        struct bt_private_component *private_component =
@@ -615,52 +609,59 @@ struct bt_notification_iterator_next_method_return src_iter_next(
        BT_ASSERT(user_data);
        BT_ASSERT(private_component);
 
+       /*
+        * We can always set it to 1: it's not going to be considered
+        * anyway if the status is not
+        * BT_NOTIFICATION_ITERATOR_STATUS_OK.
+        */
+       *count = 1;
+
        switch (current_test) {
        case TEST_NO_TS:
                if (user_data->iter_index == 0) {
                        if (user_data->at == 0) {
                                stream = bt_packet_get_stream(user_data->packet);
-                               next_return.notification =
+                               notifs[0] =
                                        bt_notification_stream_begin_create(
                                                graph, stream);
                                bt_put(stream);
-                               BT_ASSERT(next_return.notification);
+                               BT_ASSERT(notifs[0]);
                        } else if (user_data->at == 1) {
-                               next_return.notification =
+                               notifs[0] =
                                        bt_notification_packet_begin_create(
                                                graph, user_data->packet);
-                               BT_ASSERT(next_return.notification);
+                               BT_ASSERT(notifs[0]);
                        } else if (user_data->at < 7) {
-                               next_return.notification =
+                               notifs[0] =
                                        src_create_event_notif(
                                                user_data->packet,
                                                src_empty_cc_prio_map, 0);
-                               BT_ASSERT(next_return.notification);
+                               BT_ASSERT(notifs[0]);
                        } else if (user_data->at == 7) {
-                               next_return.notification =
+                               notifs[0] =
                                        bt_notification_packet_end_create(
                                                graph, user_data->packet);
-                               BT_ASSERT(next_return.notification);
+                               BT_ASSERT(notifs[0]);
                        } else if (user_data->at == 8) {
                                stream = bt_packet_get_stream(user_data->packet);
-                               next_return.notification =
+                               notifs[0] =
                                        bt_notification_stream_end_create(
                                                graph, stream);
                                bt_put(stream);
-                               BT_ASSERT(next_return.notification);
+                               BT_ASSERT(notifs[0]);
                        } else {
-                               next_return.status =
+                               status =
                                        BT_NOTIFICATION_ITERATOR_STATUS_END;
                        }
 
                        user_data->at++;
                } else {
-                       next_return = src_iter_next_seq(user_data);
+                       status = src_iter_next_seq(user_data, notifs);
                }
                break;
        case TEST_SIMPLE_4_PORTS:
        case TEST_4_PORTS_WITH_RETRIES:
-               next_return = src_iter_next_seq(user_data);
+               status = src_iter_next_seq(user_data, notifs);
                break;
        case TEST_SINGLE_END_THEN_MULTIPLE_FULL:
                if (user_data->iter_index == 0) {
@@ -670,15 +671,15 @@ struct bt_notification_iterator_next_method_return src_iter_next(
                        ret = bt_private_component_source_add_output_private_port(
                                private_component, "out2", NULL, NULL);
                        BT_ASSERT(ret == 0);
-                       next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
+                       status = BT_NOTIFICATION_ITERATOR_STATUS_END;
                } else {
-                       next_return = src_iter_next_seq(user_data);
+                       status = src_iter_next_seq(user_data, notifs);
                }
                break;
        case TEST_SINGLE_AGAIN_END_THEN_MULTIPLE_FULL:
                if (user_data->iter_index == 0) {
                        if (user_data->at == 0) {
-                               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
+                               status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
                                user_data->at++;
                        } else {
                                ret = bt_private_component_source_add_output_private_port(
@@ -687,10 +688,10 @@ struct bt_notification_iterator_next_method_return src_iter_next(
                                ret = bt_private_component_source_add_output_private_port(
                                        private_component, "out2", NULL, NULL);
                                BT_ASSERT(ret == 0);
-                               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
+                               status = BT_NOTIFICATION_ITERATOR_STATUS_END;
                        }
                } else {
-                       next_return = src_iter_next_seq(user_data);
+                       status = src_iter_next_seq(user_data, notifs);
                }
                break;
        default:
@@ -698,7 +699,7 @@ struct bt_notification_iterator_next_method_return src_iter_next(
        }
 
        bt_put(private_component);
-       return next_return;
+       return status;
 }
 
 static
@@ -755,42 +756,10 @@ void src_finalize(struct bt_private_component *private_component)
 }
 
 static
-enum bt_component_status sink_consume(
-               struct bt_private_component *priv_component)
+void append_test_event_from_notification(struct bt_notification *notification)
 {
-       enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
-       struct bt_notification *notification = NULL;
-       struct sink_user_data *user_data =
-               bt_private_component_get_user_data(priv_component);
-       enum bt_notification_iterator_status it_ret;
+       int ret;
        struct test_event test_event;
-       bool do_append_test_event = true;
-
-       BT_ASSERT(user_data && user_data->notif_iter);
-       it_ret = bt_private_connection_notification_iterator_next(
-               user_data->notif_iter, &notification);
-
-       if (it_ret < 0) {
-               ret = BT_COMPONENT_STATUS_ERROR;
-               do_append_test_event = false;
-               goto end;
-       }
-
-       switch (it_ret) {
-       case BT_NOTIFICATION_ITERATOR_STATUS_END:
-               test_event.type = TEST_EV_TYPE_END;
-               ret = BT_COMPONENT_STATUS_END;
-               BT_PUT(user_data->notif_iter);
-               goto end;
-       case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
-               test_event.type = TEST_EV_TYPE_AGAIN;
-               ret = BT_COMPONENT_STATUS_AGAIN;
-               goto end;
-       default:
-               break;
-       }
-
-       BT_ASSERT(notification);
 
        switch (bt_notification_get_type(notification)) {
        case BT_NOTIFICATION_TYPE_EVENT:
@@ -871,12 +840,60 @@ enum bt_component_status sink_consume(
                break;
        }
 
+       append_test_event(&test_event);
+}
+
+static
+enum bt_component_status sink_consume(
+               struct bt_private_component *priv_component)
+{
+       enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
+       bt_notification_array notifications = NULL;
+       uint64_t count;
+       struct sink_user_data *user_data =
+               bt_private_component_get_user_data(priv_component);
+       enum bt_notification_iterator_status it_ret;
+       struct test_event test_event;
+       bool do_append_test_event = true;
+       uint64_t i;
+
+       BT_ASSERT(user_data && user_data->notif_iter);
+       it_ret = bt_private_connection_notification_iterator_next(
+               user_data->notif_iter, &notifications, &count);
+       if (it_ret < 0) {
+               ret = BT_COMPONENT_STATUS_ERROR;
+               do_append_test_event = false;
+               goto end;
+       }
+
+       switch (it_ret) {
+       case BT_NOTIFICATION_ITERATOR_STATUS_END:
+               test_event.type = TEST_EV_TYPE_END;
+               ret = BT_COMPONENT_STATUS_END;
+               BT_PUT(user_data->notif_iter);
+               goto end;
+       case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
+               test_event.type = TEST_EV_TYPE_AGAIN;
+               ret = BT_COMPONENT_STATUS_AGAIN;
+               goto end;
+       default:
+               break;
+       }
+
+       BT_ASSERT(notifications);
+
+       for (i = 0; i < count; i++) {
+               append_test_event_from_notification(notifications[i]);
+               bt_put(notifications[i]);
+       }
+
+       do_append_test_event = false;
+
 end:
        if (do_append_test_event) {
                append_test_event(&test_event);
        }
 
-       bt_put(notification);
        return ret;
 }
 
This page took 0.059108 seconds and 4 git commands to generate.