#include <inttypes.h>
#include <stdlib.h>
+/*
+ * TODO: Use graph's state (number of active iterators, etc.) and
+ * possibly system specifications to make a better guess than this.
+ */
+#define NOTIF_BATCH_SIZE 15
+
struct discarded_elements_state {
struct bt_clock_value *cur_begin;
uint64_t cur_count;
static
void destroy_base_notification_iterator(struct bt_object *obj)
{
- BT_ASSERT(obj);
- g_free(obj);
+ struct bt_notification_iterator *iterator = (void *) obj;
+
+ BT_ASSERT(iterator);
+
+ if (iterator->notifs) {
+ g_ptr_array_free(iterator->notifs, TRUE);
+ }
+
+ g_free(iterator);
}
static
}
static
-void init_notification_iterator(struct bt_notification_iterator *iterator,
+int init_notification_iterator(struct bt_notification_iterator *iterator,
enum bt_notification_iterator_type type,
bt_object_release_func destroy)
{
+ int ret = 0;
+
bt_object_init_shared(&iterator->base, destroy);
iterator->type = type;
+ iterator->notifs = g_ptr_array_new();
+ if (!iterator->notifs) {
+ BT_LOGE_STR("Failed to allocate a GPtrArray.");
+ ret = -1;
+ goto end;
+ }
+
+ g_ptr_array_set_size(iterator->notifs, NOTIF_BATCH_SIZE);
+
+end:
+ return ret;
}
BT_HIDDEN
enum bt_connection_status status = BT_CONNECTION_STATUS_OK;
enum bt_component_class_type type;
struct bt_notification_iterator_private_connection *iterator = NULL;
+ int ret;
BT_ASSERT(upstream_comp);
BT_ASSERT(upstream_port);
goto end;
}
- init_notification_iterator((void *) iterator,
+ ret = init_notification_iterator((void *) iterator,
BT_NOTIFICATION_ITERATOR_TYPE_PRIVATE_CONNECTION,
bt_private_connection_notification_iterator_destroy);
+ if (ret) {
+ /* init_notification_iterator() logs errors */
+ status = BT_CONNECTION_STATUS_NOMEM;
+ goto end;
+ }
iterator->stream_states = g_hash_table_new_full(g_direct_hash,
g_direct_equal, NULL, (GDestroyNotify) destroy_stream_state);
return is_valid;
}
+BT_ASSERT_PRE_FUNC
+static inline
+bool validate_notifications(
+ struct bt_notification_iterator_private_connection *iterator,
+ uint64_t count)
+{
+ bool ret = true;
+ bt_notification_array notifs = (void *) iterator->base.notifs->pdata;
+ uint64_t i;
+
+ for (i = 0; i < count; i++) {
+ ret = validate_notification(iterator, notifs[i]);
+ if (!ret) {
+ break;
+ }
+ }
+
+ return ret;
+}
+
BT_ASSERT_PRE_FUNC
static inline bool priv_conn_notif_iter_can_end(
struct bt_notification_iterator_private_connection *iterator)
enum bt_notification_iterator_status
bt_private_connection_notification_iterator_next(
struct bt_notification_iterator *user_iterator,
- struct bt_notification **user_notif)
+ struct bt_notification ***user_notifs, uint64_t *user_count)
{
struct bt_notification_iterator_private_connection *iterator =
(void *) user_iterator;
struct bt_private_connection_private_notification_iterator *priv_iterator =
bt_private_connection_private_notification_iterator_from_notification_iterator(iterator);
bt_component_class_notification_iterator_next_method next_method = NULL;
- struct bt_notification_iterator_next_method_return next_return = {
- .status = BT_NOTIFICATION_ITERATOR_STATUS_OK,
- .notification = NULL,
- };
enum bt_notification_iterator_status status =
BT_NOTIFICATION_ITERATOR_STATUS_OK;
BT_ASSERT_PRE_NON_NULL(user_iterator, "Notification iterator");
- BT_ASSERT_PRE_NON_NULL(user_notif, "Notification");
+ BT_ASSERT_PRE_NON_NULL(user_notifs, "Notification array");
+ BT_ASSERT_PRE_NON_NULL(user_count, "Notification count");
BT_ASSERT_PRE(user_iterator->type ==
BT_NOTIFICATION_ITERATOR_TYPE_PRIVATE_CONNECTION,
"Notification iterator was not created from a private connection: "
*/
BT_ASSERT(next_method);
BT_LOGD_STR("Calling user's \"next\" method.");
- next_return = next_method(priv_iterator);
+ status = next_method(priv_iterator,
+ (void *) user_iterator->notifs->pdata,
+ NOTIF_BATCH_SIZE, user_count);
BT_LOGD("User method returned: status=%s",
- bt_notification_iterator_status_string(next_return.status));
- if (next_return.status < 0) {
+ bt_notification_iterator_status_string(status));
+ if (status < 0) {
BT_LOGW_STR("User method failed.");
- status = next_return.status;
goto end;
}
* BT_NOTIFICATION_ITERATOR_STATUS_OK because
* otherwise this field could be garbage.
*/
- if (next_return.status ==
- BT_NOTIFICATION_ITERATOR_STATUS_OK) {
- bt_put(next_return.notification);
+ if (status == BT_NOTIFICATION_ITERATOR_STATUS_OK) {
+ uint64_t i;
+ bt_notification_array notifs =
+ (void *) user_iterator->notifs->pdata;
+
+ for (i = 0; i < *user_count; i++) {
+ bt_put(notifs[i]);
+ }
}
status = BT_NOTIFICATION_ITERATOR_STATUS_CANCELED;
goto end;
}
- switch (next_return.status) {
+ switch (status) {
+ case BT_NOTIFICATION_ITERATOR_STATUS_OK:
+ BT_ASSERT_PRE(validate_notifications(iterator, *user_count),
+ "Notifications are invalid at this point: "
+ "%![notif-iter-]+i, count=%" PRIu64,
+ iterator, *user_count);
+ *user_notifs = (void *) user_iterator->notifs->pdata;
+ break;
+ case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
+ status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
+ goto end;
case BT_NOTIFICATION_ITERATOR_STATUS_END:
BT_ASSERT_PRE(priv_conn_notif_iter_can_end(iterator),
"Notification iterator cannot end at this point: "
BT_LOGD("Set new status: status=%s",
bt_notification_iterator_status_string(status));
goto end;
- case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
- status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
- goto end;
- case BT_NOTIFICATION_ITERATOR_STATUS_OK:
- BT_ASSERT_PRE(next_return.notification,
- "User method returned BT_NOTIFICATION_ITERATOR_STATUS_OK, but notification is NULL: "
- "%!+i", iterator);
- BT_ASSERT_PRE(validate_notification(iterator,
- next_return.notification),
- "Notification is invalid at this point: "
- "%![notif-iter-]+i, %![notif-]+n",
- iterator, next_return.notification);
- *user_notif = next_return.notification;
- break;
default:
/* Unknown non-error status */
abort();
enum bt_notification_iterator_status
bt_output_port_notification_iterator_next(
struct bt_notification_iterator *iterator,
- struct bt_notification **user_notif)
+ bt_notification_array *notifs_to_user,
+ uint64_t *count_to_user)
{
enum bt_notification_iterator_status status;
struct bt_notification_iterator_output_port *out_port_iter =
enum bt_graph_status graph_status;
BT_ASSERT_PRE_NON_NULL(iterator, "Notification iterator");
- BT_ASSERT_PRE_NON_NULL(user_notif, "Notification");
+ BT_ASSERT_PRE_NON_NULL(notifs_to_user, "Notification array");
+ BT_ASSERT_PRE_NON_NULL(count_to_user, "Notification count");
BT_ASSERT_PRE(iterator->type ==
BT_NOTIFICATION_ITERATOR_TYPE_OUTPUT_PORT,
"Notification iterator was not created from an output port: "
"%!+i", iterator);
BT_LIB_LOGD("Getting next output port notification iterator's notification: %!+i",
iterator);
+
graph_status = bt_graph_consume_sink_no_check(
out_port_iter->graph, out_port_iter->colander);
switch (graph_status) {
case BT_GRAPH_STATUS_CANCELED:
- BT_ASSERT(!out_port_iter->notif);
status = BT_NOTIFICATION_ITERATOR_STATUS_CANCELED;
break;
case BT_GRAPH_STATUS_AGAIN:
- BT_ASSERT(!out_port_iter->notif);
status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
break;
case BT_GRAPH_STATUS_END:
- BT_ASSERT(!out_port_iter->notif);
status = BT_NOTIFICATION_ITERATOR_STATUS_END;
break;
case BT_GRAPH_STATUS_NOMEM:
- BT_ASSERT(!out_port_iter->notif);
status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
break;
case BT_GRAPH_STATUS_OK:
- BT_ASSERT(out_port_iter->notif);
status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
- *user_notif = out_port_iter->notif;
- out_port_iter->notif = NULL;
+
+ /*
+ * On success, the colander sink moves the notifications
+ * to this iterator's array and sets this iterator's
+ * notification count: move them to the user.
+ */
+ *notifs_to_user = (void *) iterator->notifs->pdata;
+ *count_to_user = out_port_iter->count;
break;
default:
/* Other errors */
BT_LOGD("Destroying output port notification iterator object: addr=%p",
iterator);
- BT_ASSERT(!iterator->notif);
BT_LOGD_STR("Putting graph.");
bt_put(iterator->graph);
BT_LOGD_STR("Putting colander sink component.");
const char *colander_comp_name;
struct bt_port *colander_in_port = NULL;
struct bt_component_class_sink_colander_data colander_data;
+ int ret;
BT_ASSERT_PRE_NON_NULL(output_port, "Output port");
BT_ASSERT_PRE(bt_port_get_type(output_port) == BT_PORT_TYPE_OUTPUT,
goto error;
}
- init_notification_iterator((void *) iterator,
+ ret = init_notification_iterator((void *) iterator,
BT_NOTIFICATION_ITERATOR_TYPE_OUTPUT_PORT,
bt_output_port_notification_iterator_destroy);
+ if (ret) {
+ /* init_notification_iterator() logs errors */
+ BT_PUT(iterator);
+ goto end;
+ }
/* Create colander component */
colander_comp_cls = bt_component_class_sink_colander_get();
BT_MOVE(iterator->graph, graph);
colander_comp_name =
colander_component_name ? colander_component_name : "colander";
- colander_data.notification = &iterator->notif;
+ colander_data.notifs = (void *) iterator->base.notifs->pdata;
+ colander_data.count_addr = &iterator->count;
+
graph_status = bt_graph_add_component_with_init_method_data(
iterator->graph, colander_comp_cls, colander_comp_name,
NULL, &colander_data, &iterator->colander);