lib: notification iterator: transfer a batch of notifications
[babeltrace.git] / lib / graph / iterator.c
index 5135bceedc66b3bb7357518cc2afbba0b8333403..0deb40c7721467bd3f40b3b5ca5d93cd1bbc2306 100644 (file)
 #include <inttypes.h>
 #include <stdlib.h>
 
+/*
+ * TODO: Use graph's state (number of active iterators, etc.) and
+ * possibly system specifications to make a better guess than this.
+ */
+#define NOTIF_BATCH_SIZE       15
+
 struct discarded_elements_state {
        struct bt_clock_value *cur_begin;
        uint64_t cur_count;
@@ -130,8 +136,15 @@ end:
 static
 void destroy_base_notification_iterator(struct bt_object *obj)
 {
-       BT_ASSERT(obj);
-       g_free(obj);
+       struct bt_notification_iterator *iterator = (void *) obj;
+
+       BT_ASSERT(iterator);
+
+       if (iterator->notifs) {
+               g_ptr_array_free(iterator->notifs, TRUE);
+       }
+
+       g_free(iterator);
 }
 
 static
@@ -267,12 +280,25 @@ void bt_private_connection_notification_iterator_set_connection(
 }
 
 static
-void init_notification_iterator(struct bt_notification_iterator *iterator,
+int init_notification_iterator(struct bt_notification_iterator *iterator,
                enum bt_notification_iterator_type type,
                bt_object_release_func destroy)
 {
+       int ret = 0;
+
        bt_object_init_shared(&iterator->base, destroy);
        iterator->type = type;
+       iterator->notifs = g_ptr_array_new();
+       if (!iterator->notifs) {
+               BT_LOGE_STR("Failed to allocate a GPtrArray.");
+               ret = -1;
+               goto end;
+       }
+
+       g_ptr_array_set_size(iterator->notifs, NOTIF_BATCH_SIZE);
+
+end:
+       return ret;
 }
 
 BT_HIDDEN
@@ -285,6 +311,7 @@ enum bt_connection_status bt_private_connection_notification_iterator_create(
        enum bt_connection_status status = BT_CONNECTION_STATUS_OK;
        enum bt_component_class_type type;
        struct bt_notification_iterator_private_connection *iterator = NULL;
+       int ret;
 
        BT_ASSERT(upstream_comp);
        BT_ASSERT(upstream_port);
@@ -307,9 +334,14 @@ enum bt_connection_status bt_private_connection_notification_iterator_create(
                goto end;
        }
 
-       init_notification_iterator((void *) iterator,
+       ret = init_notification_iterator((void *) iterator,
                BT_NOTIFICATION_ITERATOR_TYPE_PRIVATE_CONNECTION,
                bt_private_connection_notification_iterator_destroy);
+       if (ret) {
+               /* init_notification_iterator() logs errors */
+               status = BT_CONNECTION_STATUS_NOMEM;
+               goto end;
+       }
 
        iterator->stream_states = g_hash_table_new_full(g_direct_hash,
                g_direct_equal, NULL, (GDestroyNotify) destroy_stream_state);
@@ -571,6 +603,26 @@ end:
        return is_valid;
 }
 
+BT_ASSERT_PRE_FUNC
+static inline
+bool validate_notifications(
+               struct bt_notification_iterator_private_connection *iterator,
+               uint64_t count)
+{
+       bool ret = true;
+       bt_notification_array notifs = (void *) iterator->base.notifs->pdata;
+       uint64_t i;
+
+       for (i = 0; i < count; i++) {
+               ret = validate_notification(iterator, notifs[i]);
+               if (!ret) {
+                       break;
+               }
+       }
+
+       return ret;
+}
+
 BT_ASSERT_PRE_FUNC
 static inline bool priv_conn_notif_iter_can_end(
                struct bt_notification_iterator_private_connection *iterator)
@@ -609,22 +661,19 @@ end:
 enum bt_notification_iterator_status
 bt_private_connection_notification_iterator_next(
                struct bt_notification_iterator *user_iterator,
-               struct bt_notification **user_notif)
+               struct bt_notification ***user_notifs, uint64_t *user_count)
 {
        struct bt_notification_iterator_private_connection *iterator =
                (void *) user_iterator;
        struct bt_private_connection_private_notification_iterator *priv_iterator =
                bt_private_connection_private_notification_iterator_from_notification_iterator(iterator);
        bt_component_class_notification_iterator_next_method next_method = NULL;
-       struct bt_notification_iterator_next_method_return next_return = {
-               .status = BT_NOTIFICATION_ITERATOR_STATUS_OK,
-               .notification = NULL,
-       };
        enum bt_notification_iterator_status status =
                BT_NOTIFICATION_ITERATOR_STATUS_OK;
 
        BT_ASSERT_PRE_NON_NULL(user_iterator, "Notification iterator");
-       BT_ASSERT_PRE_NON_NULL(user_notif, "Notification");
+       BT_ASSERT_PRE_NON_NULL(user_notifs, "Notification array");
+       BT_ASSERT_PRE_NON_NULL(user_count, "Notification count");
        BT_ASSERT_PRE(user_iterator->type ==
                BT_NOTIFICATION_ITERATOR_TYPE_PRIVATE_CONNECTION,
                "Notification iterator was not created from a private connection: "
@@ -670,12 +719,13 @@ bt_private_connection_notification_iterator_next(
         */
        BT_ASSERT(next_method);
        BT_LOGD_STR("Calling user's \"next\" method.");
-       next_return = next_method(priv_iterator);
+       status = next_method(priv_iterator,
+               (void *) user_iterator->notifs->pdata,
+               NOTIF_BATCH_SIZE, user_count);
        BT_LOGD("User method returned: status=%s",
-               bt_notification_iterator_status_string(next_return.status));
-       if (next_return.status < 0) {
+               bt_notification_iterator_status_string(status));
+       if (status < 0) {
                BT_LOGW_STR("User method failed.");
-               status = next_return.status;
                goto end;
        }
 
@@ -694,16 +744,31 @@ bt_private_connection_notification_iterator_next(
                 * BT_NOTIFICATION_ITERATOR_STATUS_OK because
                 * otherwise this field could be garbage.
                 */
-               if (next_return.status ==
-                               BT_NOTIFICATION_ITERATOR_STATUS_OK) {
-                       bt_put(next_return.notification);
+               if (status == BT_NOTIFICATION_ITERATOR_STATUS_OK) {
+                       uint64_t i;
+                       bt_notification_array notifs =
+                               (void *) user_iterator->notifs->pdata;
+
+                       for (i = 0; i < *user_count; i++) {
+                               bt_put(notifs[i]);
+                       }
                }
 
                status = BT_NOTIFICATION_ITERATOR_STATUS_CANCELED;
                goto end;
        }
 
-       switch (next_return.status) {
+       switch (status) {
+       case BT_NOTIFICATION_ITERATOR_STATUS_OK:
+               BT_ASSERT_PRE(validate_notifications(iterator, *user_count),
+                       "Notifications are invalid at this point: "
+                       "%![notif-iter-]+i, count=%" PRIu64,
+                       iterator, *user_count);
+               *user_notifs = (void *) user_iterator->notifs->pdata;
+               break;
+       case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
+               status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
+               goto end;
        case BT_NOTIFICATION_ITERATOR_STATUS_END:
                BT_ASSERT_PRE(priv_conn_notif_iter_can_end(iterator),
                        "Notification iterator cannot end at this point: "
@@ -715,20 +780,6 @@ bt_private_connection_notification_iterator_next(
                BT_LOGD("Set new status: status=%s",
                        bt_notification_iterator_status_string(status));
                goto end;
-       case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
-               status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
-               goto end;
-       case BT_NOTIFICATION_ITERATOR_STATUS_OK:
-               BT_ASSERT_PRE(next_return.notification,
-                       "User method returned BT_NOTIFICATION_ITERATOR_STATUS_OK, but notification is NULL: "
-                       "%!+i", iterator);
-               BT_ASSERT_PRE(validate_notification(iterator,
-                       next_return.notification),
-                       "Notification is invalid at this point: "
-                       "%![notif-iter-]+i, %![notif-]+n",
-                       iterator, next_return.notification);
-               *user_notif = next_return.notification;
-               break;
        default:
                /* Unknown non-error status */
                abort();
@@ -741,7 +792,8 @@ end:
 enum bt_notification_iterator_status
 bt_output_port_notification_iterator_next(
                struct bt_notification_iterator *iterator,
-               struct bt_notification **user_notif)
+               bt_notification_array *notifs_to_user,
+               uint64_t *count_to_user)
 {
        enum bt_notification_iterator_status status;
        struct bt_notification_iterator_output_port *out_port_iter =
@@ -749,37 +801,40 @@ bt_output_port_notification_iterator_next(
        enum bt_graph_status graph_status;
 
        BT_ASSERT_PRE_NON_NULL(iterator, "Notification iterator");
-       BT_ASSERT_PRE_NON_NULL(user_notif, "Notification");
+       BT_ASSERT_PRE_NON_NULL(notifs_to_user, "Notification array");
+       BT_ASSERT_PRE_NON_NULL(count_to_user, "Notification count");
        BT_ASSERT_PRE(iterator->type ==
                BT_NOTIFICATION_ITERATOR_TYPE_OUTPUT_PORT,
                "Notification iterator was not created from an output port: "
                "%!+i", iterator);
        BT_LIB_LOGD("Getting next output port notification iterator's notification: %!+i",
                iterator);
+
        graph_status = bt_graph_consume_sink_no_check(
                out_port_iter->graph, out_port_iter->colander);
        switch (graph_status) {
        case BT_GRAPH_STATUS_CANCELED:
-               BT_ASSERT(!out_port_iter->notif);
                status = BT_NOTIFICATION_ITERATOR_STATUS_CANCELED;
                break;
        case BT_GRAPH_STATUS_AGAIN:
-               BT_ASSERT(!out_port_iter->notif);
                status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
                break;
        case BT_GRAPH_STATUS_END:
-               BT_ASSERT(!out_port_iter->notif);
                status = BT_NOTIFICATION_ITERATOR_STATUS_END;
                break;
        case BT_GRAPH_STATUS_NOMEM:
-               BT_ASSERT(!out_port_iter->notif);
                status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
                break;
        case BT_GRAPH_STATUS_OK:
-               BT_ASSERT(out_port_iter->notif);
                status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
-               *user_notif = out_port_iter->notif;
-               out_port_iter->notif = NULL;
+
+               /*
+                * On success, the colander sink moves the notifications
+                * to this iterator's array and sets this iterator's
+                * notification count: move them to the user.
+                */
+               *notifs_to_user = (void *) iterator->notifs->pdata;
+               *count_to_user = out_port_iter->count;
                break;
        default:
                /* Other errors */
@@ -820,7 +875,6 @@ void bt_output_port_notification_iterator_destroy(struct bt_object *obj)
 
        BT_LOGD("Destroying output port notification iterator object: addr=%p",
                iterator);
-       BT_ASSERT(!iterator->notif);
        BT_LOGD_STR("Putting graph.");
        bt_put(iterator->graph);
        BT_LOGD_STR("Putting colander sink component.");
@@ -841,6 +895,7 @@ struct bt_notification_iterator *bt_output_port_notification_iterator_create(
        const char *colander_comp_name;
        struct bt_port *colander_in_port = NULL;
        struct bt_component_class_sink_colander_data colander_data;
+       int ret;
 
        BT_ASSERT_PRE_NON_NULL(output_port, "Output port");
        BT_ASSERT_PRE(bt_port_get_type(output_port) == BT_PORT_TYPE_OUTPUT,
@@ -862,9 +917,14 @@ struct bt_notification_iterator *bt_output_port_notification_iterator_create(
                goto error;
        }
 
-       init_notification_iterator((void *) iterator,
+       ret = init_notification_iterator((void *) iterator,
                BT_NOTIFICATION_ITERATOR_TYPE_OUTPUT_PORT,
                bt_output_port_notification_iterator_destroy);
+       if (ret) {
+               /* init_notification_iterator() logs errors */
+               BT_PUT(iterator);
+               goto end;
+       }
 
        /* Create colander component */
        colander_comp_cls = bt_component_class_sink_colander_get();
@@ -876,7 +936,9 @@ struct bt_notification_iterator *bt_output_port_notification_iterator_create(
        BT_MOVE(iterator->graph, graph);
        colander_comp_name =
                colander_component_name ? colander_component_name : "colander";
-       colander_data.notification = &iterator->notif;
+       colander_data.notifs = (void *) iterator->base.notifs->pdata;
+       colander_data.count_addr = &iterator->count;
+
        graph_status = bt_graph_add_component_with_init_method_data(
                iterator->graph, colander_comp_cls, colander_comp_name,
                NULL, &colander_data, &iterator->colander);
This page took 0.027404 seconds and 4 git commands to generate.