X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=lib%2Fgraph%2Fiterator.c;h=a2111746793fa71a8def54e06bbc022a37cc66d7;hb=56e18c4ce186892c36d7f2cb5078087425e60134;hp=b98121a1d596a65f06854b10b2318bde390d563d;hpb=f42867e2d049c1e7cad50cd097290a3adef8d54c;p=babeltrace.git diff --git a/lib/graph/iterator.c b/lib/graph/iterator.c index b98121a1..a2111746 100644 --- a/lib/graph/iterator.c +++ b/lib/graph/iterator.c @@ -30,12 +30,12 @@ #include #include -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include +#include #include #include #include @@ -53,7 +53,6 @@ #include #include #include -#include #include #include #include @@ -63,29 +62,20 @@ #include #include -struct discarded_elements_state { - struct bt_clock_value *cur_begin; - uint64_t cur_count; -}; +/* + * TODO: Use graph's state (number of active iterators, etc.) and + * possibly system specifications to make a better guess than this. + */ +#define NOTIF_BATCH_SIZE 15 struct stream_state { struct bt_stream *stream; /* owned by this */ struct bt_packet *cur_packet; /* owned by this */ - struct discarded_elements_state discarded_packets_state; - struct discarded_elements_state discarded_events_state; uint64_t expected_notif_seq_num; bt_bool is_ended; }; -static -void stream_destroy_listener(struct bt_stream *stream, void *data) -{ - struct bt_notification_iterator_private_connection *iterator = data; - - /* Remove associated stream state */ - g_hash_table_remove(iterator->stream_states, stream); -} - +BT_ASSERT_PRE_FUNC static void destroy_stream_state(struct stream_state *stream_state) { @@ -98,11 +88,10 @@ void destroy_stream_state(struct stream_state *stream_state) bt_put(stream_state->cur_packet); BT_LOGV_STR("Putting stream state's stream."); bt_put(stream_state->stream); - bt_put(stream_state->discarded_packets_state.cur_begin); - bt_put(stream_state->discarded_events_state.cur_begin); g_free(stream_state); } +BT_ASSERT_PRE_FUNC static struct stream_state *create_stream_state(struct bt_stream *stream) { @@ -113,15 +102,6 @@ struct stream_state *create_stream_state(struct bt_stream *stream) goto end; } - /* - * The packet index is a monotonic counter which may not start - * at 0 at the beginning of the stream. We therefore need to - * have an internal object initial state of -1ULL to distinguish - * between initial state and having seen a packet with - * the sequence number 0. - */ - stream_state->discarded_packets_state.cur_count = -1ULL; - /* * We keep a reference to the stream until we know it's ended. */ @@ -137,11 +117,14 @@ end: static void destroy_base_notification_iterator(struct bt_object *obj) { - struct bt_notification_iterator *iterator = - container_of(obj, struct bt_notification_iterator, base); + struct bt_notification_iterator *iterator = (void *) obj; + + BT_ASSERT(iterator); + + if (iterator->notifs) { + g_ptr_array_free(iterator->notifs, TRUE); + } - BT_LOGD_STR("Putting current notification."); - bt_put(iterator->current_notification); g_free(iterator); } @@ -162,8 +145,8 @@ void bt_private_connection_notification_iterator_destroy(struct bt_object *obj) * reference count would go from 1 to 0 again and this function * would be called again. */ - obj->ref_count.count++; - iterator = (void *) container_of(obj, struct bt_notification_iterator, base); + obj->ref_count++; + iterator = (void *) obj; BT_LOGD("Destroying private connection notification iterator object: addr=%p", iterator); bt_private_connection_notification_iterator_finalize(iterator); @@ -175,20 +158,6 @@ void bt_private_connection_notification_iterator_destroy(struct bt_object *obj) * listener would be called with an invalid/other * notification iterator object. */ - GHashTableIter ht_iter; - gpointer stream_gptr, stream_state_gptr; - - g_hash_table_iter_init(&ht_iter, iterator->stream_states); - - while (g_hash_table_iter_next(&ht_iter, &stream_gptr, &stream_state_gptr)) { - BT_ASSERT(stream_gptr); - - BT_LOGD_STR("Removing stream's destroy listener for notification iterator."); - bt_stream_remove_destroy_listener( - (void *) stream_gptr, stream_destroy_listener, - iterator); - } - g_hash_table_destroy(iterator->stream_states); } @@ -292,12 +261,25 @@ void bt_private_connection_notification_iterator_set_connection( } static -void init_notification_iterator(struct bt_notification_iterator *iterator, +int init_notification_iterator(struct bt_notification_iterator *iterator, enum bt_notification_iterator_type type, bt_object_release_func destroy) { - bt_object_init(iterator, destroy); + int ret = 0; + + bt_object_init_shared(&iterator->base, destroy); iterator->type = type; + iterator->notifs = g_ptr_array_new(); + if (!iterator->notifs) { + BT_LOGE_STR("Failed to allocate a GPtrArray."); + ret = -1; + goto end; + } + + g_ptr_array_set_size(iterator->notifs, NOTIF_BATCH_SIZE); + +end: + return ret; } BT_HIDDEN @@ -310,6 +292,7 @@ enum bt_connection_status bt_private_connection_notification_iterator_create( enum bt_connection_status status = BT_CONNECTION_STATUS_OK; enum bt_component_class_type type; struct bt_notification_iterator_private_connection *iterator = NULL; + int ret; BT_ASSERT(upstream_comp); BT_ASSERT(upstream_port); @@ -332,9 +315,14 @@ enum bt_connection_status bt_private_connection_notification_iterator_create( goto end; } - init_notification_iterator((void *) iterator, + ret = init_notification_iterator((void *) iterator, BT_NOTIFICATION_ITERATOR_TYPE_PRIVATE_CONNECTION, bt_private_connection_notification_iterator_destroy); + if (ret) { + /* init_notification_iterator() logs errors */ + status = BT_CONNECTION_STATUS_NOMEM; + goto end; + } iterator->stream_states = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) destroy_stream_state); @@ -347,6 +335,7 @@ enum bt_connection_status bt_private_connection_notification_iterator_create( iterator->upstream_component = upstream_comp; iterator->upstream_port = upstream_port; iterator->connection = connection; + iterator->graph = bt_component_borrow_graph(upstream_comp); iterator->state = BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_NON_INITIALIZED; BT_LOGD("Created notification iterator: " "upstream-comp-addr=%p, upstream-comp-name=\"%s\", " @@ -368,7 +357,7 @@ end: void *bt_private_connection_private_notification_iterator_get_user_data( struct bt_private_connection_private_notification_iterator *private_iterator) { - struct bt_notification_iterator_private_connection *iterator = + struct bt_notification_iterator_private_connection *iterator = (void *) bt_private_connection_notification_iterator_borrow_from_private(private_iterator); BT_ASSERT_PRE_NON_NULL(private_iterator, "Notification iterator"); @@ -380,7 +369,7 @@ bt_private_connection_private_notification_iterator_set_user_data( struct bt_private_connection_private_notification_iterator *private_iterator, void *data) { - struct bt_notification_iterator_private_connection *iterator = + struct bt_notification_iterator_private_connection *iterator = (void *) bt_private_connection_notification_iterator_borrow_from_private(private_iterator); BT_ASSERT_PRE_NON_NULL(iterator, "Notification iterator"); @@ -390,12 +379,15 @@ bt_private_connection_private_notification_iterator_set_user_data( return BT_NOTIFICATION_ITERATOR_STATUS_OK; } -struct bt_notification *bt_notification_iterator_get_notification( - struct bt_notification_iterator *iterator) +struct bt_graph *bt_private_connection_private_notification_iterator_borrow_graph( + struct bt_private_connection_private_notification_iterator *private_iterator) { + struct bt_notification_iterator_private_connection *iterator = (void *) + bt_private_connection_notification_iterator_borrow_from_private( + private_iterator); + BT_ASSERT_PRE_NON_NULL(iterator, "Notification iterator"); - return bt_get( - bt_notification_iterator_borrow_current_notification(iterator)); + return iterator->graph; } BT_ASSERT_PRE_FUNC @@ -592,6 +584,26 @@ end: return is_valid; } +BT_ASSERT_PRE_FUNC +static inline +bool validate_notifications( + struct bt_notification_iterator_private_connection *iterator, + uint64_t count) +{ + bool ret = true; + bt_notification_array notifs = (void *) iterator->base.notifs->pdata; + uint64_t i; + + for (i = 0; i < count; i++) { + ret = validate_notification(iterator, notifs[i]); + if (!ret) { + break; + } + } + + return ret; +} + BT_ASSERT_PRE_FUNC static inline bool priv_conn_notif_iter_can_end( struct bt_notification_iterator_private_connection *iterator) @@ -627,23 +639,27 @@ end: return ret; } -static enum bt_notification_iterator_status -bt_priv_conn_private_notification_iterator_next( - struct bt_notification_iterator_private_connection *iterator) +bt_private_connection_notification_iterator_next( + struct bt_notification_iterator *user_iterator, + struct bt_notification ***user_notifs, uint64_t *user_count) { + struct bt_notification_iterator_private_connection *iterator = + (void *) user_iterator; struct bt_private_connection_private_notification_iterator *priv_iterator = bt_private_connection_private_notification_iterator_from_notification_iterator(iterator); bt_component_class_notification_iterator_next_method next_method = NULL; - struct bt_notification_iterator_next_method_return next_return = { - .status = BT_NOTIFICATION_ITERATOR_STATUS_OK, - .notification = NULL, - }; enum bt_notification_iterator_status status = BT_NOTIFICATION_ITERATOR_STATUS_OK; - BT_ASSERT(iterator); - BT_LIB_LOGD("Getting next notification iterator's notification: %!+i", + BT_ASSERT_PRE_NON_NULL(user_iterator, "Notification iterator"); + BT_ASSERT_PRE_NON_NULL(user_notifs, "Notification array"); + BT_ASSERT_PRE_NON_NULL(user_count, "Notification count"); + BT_ASSERT_PRE(user_iterator->type == + BT_NOTIFICATION_ITERATOR_TYPE_PRIVATE_CONNECTION, + "Notification iterator was not created from a private connection: " + "%!+i", iterator); + BT_LIB_LOGD("Getting next private connection notification iterator's notification: %!+i", iterator); BT_ASSERT_PRE(iterator->state == BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_ACTIVE, @@ -684,12 +700,13 @@ bt_priv_conn_private_notification_iterator_next( */ BT_ASSERT(next_method); BT_LOGD_STR("Calling user's \"next\" method."); - next_return = next_method(priv_iterator); + status = next_method(priv_iterator, + (void *) user_iterator->notifs->pdata, + NOTIF_BATCH_SIZE, user_count); BT_LOGD("User method returned: status=%s", - bt_notification_iterator_status_string(next_return.status)); - if (next_return.status < 0) { + bt_notification_iterator_status_string(status)); + if (status < 0) { BT_LOGW_STR("User method failed."); - status = next_return.status; goto end; } @@ -708,16 +725,31 @@ bt_priv_conn_private_notification_iterator_next( * BT_NOTIFICATION_ITERATOR_STATUS_OK because * otherwise this field could be garbage. */ - if (next_return.status == - BT_NOTIFICATION_ITERATOR_STATUS_OK) { - bt_put(next_return.notification); + if (status == BT_NOTIFICATION_ITERATOR_STATUS_OK) { + uint64_t i; + bt_notification_array notifs = + (void *) user_iterator->notifs->pdata; + + for (i = 0; i < *user_count; i++) { + bt_put(notifs[i]); + } } status = BT_NOTIFICATION_ITERATOR_STATUS_CANCELED; goto end; } - switch (next_return.status) { + switch (status) { + case BT_NOTIFICATION_ITERATOR_STATUS_OK: + BT_ASSERT_PRE(validate_notifications(iterator, *user_count), + "Notifications are invalid at this point: " + "%![notif-iter-]+i, count=%" PRIu64, + iterator, *user_count); + *user_notifs = (void *) user_iterator->notifs->pdata; + break; + case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN: + status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN; + goto end; case BT_NOTIFICATION_ITERATOR_STATUS_END: BT_ASSERT_PRE(priv_conn_notif_iter_can_end(iterator), "Notification iterator cannot end at this point: " @@ -729,22 +761,6 @@ bt_priv_conn_private_notification_iterator_next( BT_LOGD("Set new status: status=%s", bt_notification_iterator_status_string(status)); goto end; - case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN: - status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN; - goto end; - case BT_NOTIFICATION_ITERATOR_STATUS_OK: - BT_ASSERT_PRE(next_return.notification, - "User method returned BT_NOTIFICATION_ITERATOR_STATUS_OK, but notification is NULL: " - "%!+i", iterator); - BT_ASSERT_PRE(validate_notification(iterator, - next_return.notification), - "Notification is invalid at this point: " - "%![notif-iter-]+i, %![notif-]+n", - iterator, next_return.notification); - bt_notification_iterator_replace_current_notification( - (void *) iterator, next_return.notification); - bt_put(next_return.notification); - break; default: /* Unknown non-error status */ abort(); @@ -755,86 +771,55 @@ end: } enum bt_notification_iterator_status -bt_notification_iterator_next(struct bt_notification_iterator *iterator) +bt_output_port_notification_iterator_next( + struct bt_notification_iterator *iterator, + bt_notification_array *notifs_to_user, + uint64_t *count_to_user) { enum bt_notification_iterator_status status; + struct bt_notification_iterator_output_port *out_port_iter = + (void *) iterator; + enum bt_graph_status graph_status; BT_ASSERT_PRE_NON_NULL(iterator, "Notification iterator"); - BT_LOGD("Notification iterator's \"next\": iter-addr=%p", iterator); - BT_ASSERT(iterator->type == BT_NOTIFICATION_ITERATOR_TYPE_PRIVATE_CONNECTION || - iterator->type == BT_NOTIFICATION_ITERATOR_TYPE_OUTPUT_PORT); - - switch (iterator->type) { - case BT_NOTIFICATION_ITERATOR_TYPE_PRIVATE_CONNECTION: - { - struct bt_notification_iterator_private_connection *priv_conn_iter = - (void *) iterator; + BT_ASSERT_PRE_NON_NULL(notifs_to_user, "Notification array"); + BT_ASSERT_PRE_NON_NULL(count_to_user, "Notification count"); + BT_ASSERT_PRE(iterator->type == + BT_NOTIFICATION_ITERATOR_TYPE_OUTPUT_PORT, + "Notification iterator was not created from an output port: " + "%!+i", iterator); + BT_LIB_LOGD("Getting next output port notification iterator's notification: %!+i", + iterator); - /* - * Make sure that the iterator's queue contains at least - * one notification. - */ - status = bt_priv_conn_private_notification_iterator_next( - priv_conn_iter); + graph_status = bt_graph_consume_sink_no_check( + out_port_iter->graph, out_port_iter->colander); + switch (graph_status) { + case BT_GRAPH_STATUS_CANCELED: + status = BT_NOTIFICATION_ITERATOR_STATUS_CANCELED; break; - } - case BT_NOTIFICATION_ITERATOR_TYPE_OUTPUT_PORT: - { - struct bt_notification_iterator_output_port *out_port_iter = - (void *) iterator; - - /* - * Keep current notification in case there's an error: - * restore this notification so that the current - * notification is not changed from the user's point of - * view. - */ - struct bt_notification *old_notif = - bt_get(bt_notification_iterator_borrow_current_notification(iterator)); - enum bt_graph_status graph_status; + case BT_GRAPH_STATUS_AGAIN: + status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN; + break; + case BT_GRAPH_STATUS_END: + status = BT_NOTIFICATION_ITERATOR_STATUS_END; + break; + case BT_GRAPH_STATUS_NOMEM: + status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM; + break; + case BT_GRAPH_STATUS_OK: + status = BT_NOTIFICATION_ITERATOR_STATUS_OK; /* - * Put current notification since it's possibly - * about to be replaced by a new one by the - * colander sink. + * On success, the colander sink moves the notifications + * to this iterator's array and sets this iterator's + * notification count: move them to the user. */ - bt_notification_iterator_replace_current_notification( - iterator, NULL); - graph_status = bt_graph_consume_sink_no_check( - out_port_iter->graph, out_port_iter->colander); - switch (graph_status) { - case BT_GRAPH_STATUS_CANCELED: - status = BT_NOTIFICATION_ITERATOR_STATUS_CANCELED; - break; - case BT_GRAPH_STATUS_AGAIN: - status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN; - break; - case BT_GRAPH_STATUS_END: - status = BT_NOTIFICATION_ITERATOR_STATUS_END; - break; - case BT_GRAPH_STATUS_NOMEM: - status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM; - break; - case BT_GRAPH_STATUS_OK: - status = BT_NOTIFICATION_ITERATOR_STATUS_OK; - BT_ASSERT(bt_notification_iterator_borrow_current_notification(iterator)); - break; - default: - /* Other errors */ - status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; - } - - if (status != BT_NOTIFICATION_ITERATOR_STATUS_OK) { - /* Error/exception: restore old notification */ - bt_notification_iterator_replace_current_notification( - iterator, old_notif); - } - - bt_put(old_notif); + *notifs_to_user = (void *) iterator->notifs->pdata; + *count_to_user = out_port_iter->count; break; - } default: - abort(); + /* Other errors */ + status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; } return status; @@ -873,8 +858,6 @@ void bt_output_port_notification_iterator_destroy(struct bt_object *obj) iterator); BT_LOGD_STR("Putting graph."); bt_put(iterator->graph); - BT_LOGD_STR("Putting output port."); - bt_put(iterator->output_port); BT_LOGD_STR("Putting colander sink component."); bt_put(iterator->colander); destroy_base_notification_iterator(obj); @@ -884,7 +867,6 @@ struct bt_notification_iterator *bt_output_port_notification_iterator_create( struct bt_port *output_port, const char *colander_component_name) { - struct bt_notification_iterator *iterator_base = NULL; struct bt_notification_iterator_output_port *iterator = NULL; struct bt_component_class *colander_comp_cls = NULL; struct bt_component *output_port_comp = NULL; @@ -894,6 +876,7 @@ struct bt_notification_iterator *bt_output_port_notification_iterator_create( const char *colander_comp_name; struct bt_port *colander_in_port = NULL; struct bt_component_class_sink_colander_data colander_data; + int ret; BT_ASSERT_PRE_NON_NULL(output_port, "Output port"); BT_ASSERT_PRE(bt_port_get_type(output_port) == BT_PORT_TYPE_OUTPUT, @@ -915,9 +898,14 @@ struct bt_notification_iterator *bt_output_port_notification_iterator_create( goto error; } - init_notification_iterator((void *) iterator, + ret = init_notification_iterator((void *) iterator, BT_NOTIFICATION_ITERATOR_TYPE_OUTPUT_PORT, bt_output_port_notification_iterator_destroy); + if (ret) { + /* init_notification_iterator() logs errors */ + BT_PUT(iterator); + goto end; + } /* Create colander component */ colander_comp_cls = bt_component_class_sink_colander_get(); @@ -927,10 +915,11 @@ struct bt_notification_iterator *bt_output_port_notification_iterator_create( } BT_MOVE(iterator->graph, graph); - iterator_base = (void *) iterator; colander_comp_name = colander_component_name ? colander_component_name : "colander"; - colander_data.notification = &iterator_base->current_notification; + colander_data.notifs = (void *) iterator->base.notifs->pdata; + colander_data.count_addr = &iterator->count; + graph_status = bt_graph_add_component_with_init_method_data( iterator->graph, colander_comp_cls, colander_comp_name, NULL, &colander_data, &iterator->colander); @@ -964,8 +953,8 @@ struct bt_notification_iterator *bt_output_port_notification_iterator_create( * nonconsumable forever so that only this notification iterator * can consume (thanks to bt_graph_consume_sink_no_check()). * This avoids leaking the notification created by the colander - * sink and moved to the base notification iterator's current - * notification member. + * sink and moved to the notification iterator's notification + * member. */ bt_graph_set_can_consume(iterator->graph, BT_FALSE); goto end; @@ -1005,10 +994,8 @@ end: } struct bt_notification_iterator * -bt_private_connection_notification_iterator_from_private( +bt_private_connection_notification_iterator_borrow_from_private( struct bt_private_connection_private_notification_iterator *private_notification_iterator) { - return bt_get( - bt_private_connection_notification_iterator_borrow_from_private( - private_notification_iterator)); + return (void *) private_notification_iterator; }