X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=lib%2Fgraph%2Fiterator.c;h=a2111746793fa71a8def54e06bbc022a37cc66d7;hb=56e18c4ce186892c36d7f2cb5078087425e60134;hp=b5eb5649118abb7cbb01cff18ff3e70f29685566;hpb=07245ac23157616e3f4ff611341f18193de8e37d;p=babeltrace.git diff --git a/lib/graph/iterator.c b/lib/graph/iterator.c index b5eb5649..a2111746 100644 --- a/lib/graph/iterator.c +++ b/lib/graph/iterator.c @@ -30,12 +30,12 @@ #include #include -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include +#include #include #include #include @@ -53,7 +53,6 @@ #include #include #include -#include #include #include #include @@ -63,29 +62,20 @@ #include #include -struct discarded_elements_state { - struct bt_clock_value *cur_begin; - uint64_t cur_count; -}; +/* + * TODO: Use graph's state (number of active iterators, etc.) and + * possibly system specifications to make a better guess than this. + */ +#define NOTIF_BATCH_SIZE 15 struct stream_state { struct bt_stream *stream; /* owned by this */ struct bt_packet *cur_packet; /* owned by this */ - struct discarded_elements_state discarded_packets_state; - struct discarded_elements_state discarded_events_state; uint64_t expected_notif_seq_num; bt_bool is_ended; }; -static -void stream_destroy_listener(struct bt_stream_common *stream, void *data) -{ - struct bt_notification_iterator_private_connection *iterator = data; - - /* Remove associated stream state */ - g_hash_table_remove(iterator->stream_states, stream); -} - +BT_ASSERT_PRE_FUNC static void destroy_stream_state(struct stream_state *stream_state) { @@ -98,11 +88,10 @@ void destroy_stream_state(struct stream_state *stream_state) bt_put(stream_state->cur_packet); BT_LOGV_STR("Putting stream state's stream."); bt_put(stream_state->stream); - bt_put(stream_state->discarded_packets_state.cur_begin); - bt_put(stream_state->discarded_events_state.cur_begin); g_free(stream_state); } +BT_ASSERT_PRE_FUNC static struct stream_state *create_stream_state(struct bt_stream *stream) { @@ -113,15 +102,6 @@ struct stream_state *create_stream_state(struct bt_stream *stream) goto end; } - /* - * The packet index is a monotonic counter which may not start - * at 0 at the beginning of the stream. We therefore need to - * have an internal object initial state of -1ULL to distinguish - * between initial state and having seen a packet with - * the sequence number 0. - */ - stream_state->discarded_packets_state.cur_count = -1ULL; - /* * We keep a reference to the stream until we know it's ended. */ @@ -137,8 +117,15 @@ end: static void destroy_base_notification_iterator(struct bt_object *obj) { - BT_ASSERT(obj); - g_free(obj); + struct bt_notification_iterator *iterator = (void *) obj; + + BT_ASSERT(iterator); + + if (iterator->notifs) { + g_ptr_array_free(iterator->notifs, TRUE); + } + + g_free(iterator); } static @@ -158,7 +145,7 @@ void bt_private_connection_notification_iterator_destroy(struct bt_object *obj) * reference count would go from 1 to 0 again and this function * would be called again. */ - obj->ref_count.count++; + obj->ref_count++; iterator = (void *) obj; BT_LOGD("Destroying private connection notification iterator object: addr=%p", iterator); @@ -171,20 +158,6 @@ void bt_private_connection_notification_iterator_destroy(struct bt_object *obj) * listener would be called with an invalid/other * notification iterator object. */ - GHashTableIter ht_iter; - gpointer stream_gptr, stream_state_gptr; - - g_hash_table_iter_init(&ht_iter, iterator->stream_states); - - while (g_hash_table_iter_next(&ht_iter, &stream_gptr, &stream_state_gptr)) { - BT_ASSERT(stream_gptr); - - BT_LOGD_STR("Removing stream's destroy listener for notification iterator."); - bt_stream_common_remove_destroy_listener( - (void *) stream_gptr, stream_destroy_listener, - iterator); - } - g_hash_table_destroy(iterator->stream_states); } @@ -288,12 +261,25 @@ void bt_private_connection_notification_iterator_set_connection( } static -void init_notification_iterator(struct bt_notification_iterator *iterator, +int init_notification_iterator(struct bt_notification_iterator *iterator, enum bt_notification_iterator_type type, bt_object_release_func destroy) { - bt_object_init(iterator, destroy); + int ret = 0; + + bt_object_init_shared(&iterator->base, destroy); iterator->type = type; + iterator->notifs = g_ptr_array_new(); + if (!iterator->notifs) { + BT_LOGE_STR("Failed to allocate a GPtrArray."); + ret = -1; + goto end; + } + + g_ptr_array_set_size(iterator->notifs, NOTIF_BATCH_SIZE); + +end: + return ret; } BT_HIDDEN @@ -306,6 +292,7 @@ enum bt_connection_status bt_private_connection_notification_iterator_create( enum bt_connection_status status = BT_CONNECTION_STATUS_OK; enum bt_component_class_type type; struct bt_notification_iterator_private_connection *iterator = NULL; + int ret; BT_ASSERT(upstream_comp); BT_ASSERT(upstream_port); @@ -328,9 +315,14 @@ enum bt_connection_status bt_private_connection_notification_iterator_create( goto end; } - init_notification_iterator((void *) iterator, + ret = init_notification_iterator((void *) iterator, BT_NOTIFICATION_ITERATOR_TYPE_PRIVATE_CONNECTION, bt_private_connection_notification_iterator_destroy); + if (ret) { + /* init_notification_iterator() logs errors */ + status = BT_CONNECTION_STATUS_NOMEM; + goto end; + } iterator->stream_states = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) destroy_stream_state); @@ -592,6 +584,26 @@ end: return is_valid; } +BT_ASSERT_PRE_FUNC +static inline +bool validate_notifications( + struct bt_notification_iterator_private_connection *iterator, + uint64_t count) +{ + bool ret = true; + bt_notification_array notifs = (void *) iterator->base.notifs->pdata; + uint64_t i; + + for (i = 0; i < count; i++) { + ret = validate_notification(iterator, notifs[i]); + if (!ret) { + break; + } + } + + return ret; +} + BT_ASSERT_PRE_FUNC static inline bool priv_conn_notif_iter_can_end( struct bt_notification_iterator_private_connection *iterator) @@ -630,22 +642,19 @@ end: enum bt_notification_iterator_status bt_private_connection_notification_iterator_next( struct bt_notification_iterator *user_iterator, - struct bt_notification **user_notif) + struct bt_notification ***user_notifs, uint64_t *user_count) { struct bt_notification_iterator_private_connection *iterator = (void *) user_iterator; struct bt_private_connection_private_notification_iterator *priv_iterator = bt_private_connection_private_notification_iterator_from_notification_iterator(iterator); bt_component_class_notification_iterator_next_method next_method = NULL; - struct bt_notification_iterator_next_method_return next_return = { - .status = BT_NOTIFICATION_ITERATOR_STATUS_OK, - .notification = NULL, - }; enum bt_notification_iterator_status status = BT_NOTIFICATION_ITERATOR_STATUS_OK; BT_ASSERT_PRE_NON_NULL(user_iterator, "Notification iterator"); - BT_ASSERT_PRE_NON_NULL(user_notif, "Notification"); + BT_ASSERT_PRE_NON_NULL(user_notifs, "Notification array"); + BT_ASSERT_PRE_NON_NULL(user_count, "Notification count"); BT_ASSERT_PRE(user_iterator->type == BT_NOTIFICATION_ITERATOR_TYPE_PRIVATE_CONNECTION, "Notification iterator was not created from a private connection: " @@ -691,12 +700,13 @@ bt_private_connection_notification_iterator_next( */ BT_ASSERT(next_method); BT_LOGD_STR("Calling user's \"next\" method."); - next_return = next_method(priv_iterator); + status = next_method(priv_iterator, + (void *) user_iterator->notifs->pdata, + NOTIF_BATCH_SIZE, user_count); BT_LOGD("User method returned: status=%s", - bt_notification_iterator_status_string(next_return.status)); - if (next_return.status < 0) { + bt_notification_iterator_status_string(status)); + if (status < 0) { BT_LOGW_STR("User method failed."); - status = next_return.status; goto end; } @@ -715,16 +725,31 @@ bt_private_connection_notification_iterator_next( * BT_NOTIFICATION_ITERATOR_STATUS_OK because * otherwise this field could be garbage. */ - if (next_return.status == - BT_NOTIFICATION_ITERATOR_STATUS_OK) { - bt_put(next_return.notification); + if (status == BT_NOTIFICATION_ITERATOR_STATUS_OK) { + uint64_t i; + bt_notification_array notifs = + (void *) user_iterator->notifs->pdata; + + for (i = 0; i < *user_count; i++) { + bt_put(notifs[i]); + } } status = BT_NOTIFICATION_ITERATOR_STATUS_CANCELED; goto end; } - switch (next_return.status) { + switch (status) { + case BT_NOTIFICATION_ITERATOR_STATUS_OK: + BT_ASSERT_PRE(validate_notifications(iterator, *user_count), + "Notifications are invalid at this point: " + "%![notif-iter-]+i, count=%" PRIu64, + iterator, *user_count); + *user_notifs = (void *) user_iterator->notifs->pdata; + break; + case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN: + status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN; + goto end; case BT_NOTIFICATION_ITERATOR_STATUS_END: BT_ASSERT_PRE(priv_conn_notif_iter_can_end(iterator), "Notification iterator cannot end at this point: " @@ -736,20 +761,6 @@ bt_private_connection_notification_iterator_next( BT_LOGD("Set new status: status=%s", bt_notification_iterator_status_string(status)); goto end; - case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN: - status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN; - goto end; - case BT_NOTIFICATION_ITERATOR_STATUS_OK: - BT_ASSERT_PRE(next_return.notification, - "User method returned BT_NOTIFICATION_ITERATOR_STATUS_OK, but notification is NULL: " - "%!+i", iterator); - BT_ASSERT_PRE(validate_notification(iterator, - next_return.notification), - "Notification is invalid at this point: " - "%![notif-iter-]+i, %![notif-]+n", - iterator, next_return.notification); - *user_notif = next_return.notification; - break; default: /* Unknown non-error status */ abort(); @@ -762,7 +773,8 @@ end: enum bt_notification_iterator_status bt_output_port_notification_iterator_next( struct bt_notification_iterator *iterator, - struct bt_notification **user_notif) + bt_notification_array *notifs_to_user, + uint64_t *count_to_user) { enum bt_notification_iterator_status status; struct bt_notification_iterator_output_port *out_port_iter = @@ -770,37 +782,40 @@ bt_output_port_notification_iterator_next( enum bt_graph_status graph_status; BT_ASSERT_PRE_NON_NULL(iterator, "Notification iterator"); - BT_ASSERT_PRE_NON_NULL(user_notif, "Notification"); + BT_ASSERT_PRE_NON_NULL(notifs_to_user, "Notification array"); + BT_ASSERT_PRE_NON_NULL(count_to_user, "Notification count"); BT_ASSERT_PRE(iterator->type == BT_NOTIFICATION_ITERATOR_TYPE_OUTPUT_PORT, "Notification iterator was not created from an output port: " "%!+i", iterator); BT_LIB_LOGD("Getting next output port notification iterator's notification: %!+i", iterator); + graph_status = bt_graph_consume_sink_no_check( out_port_iter->graph, out_port_iter->colander); switch (graph_status) { case BT_GRAPH_STATUS_CANCELED: - BT_ASSERT(!out_port_iter->notif); status = BT_NOTIFICATION_ITERATOR_STATUS_CANCELED; break; case BT_GRAPH_STATUS_AGAIN: - BT_ASSERT(!out_port_iter->notif); status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN; break; case BT_GRAPH_STATUS_END: - BT_ASSERT(!out_port_iter->notif); status = BT_NOTIFICATION_ITERATOR_STATUS_END; break; case BT_GRAPH_STATUS_NOMEM: - BT_ASSERT(!out_port_iter->notif); status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM; break; case BT_GRAPH_STATUS_OK: - BT_ASSERT(out_port_iter->notif); status = BT_NOTIFICATION_ITERATOR_STATUS_OK; - *user_notif = out_port_iter->notif; - out_port_iter->notif = NULL; + + /* + * On success, the colander sink moves the notifications + * to this iterator's array and sets this iterator's + * notification count: move them to the user. + */ + *notifs_to_user = (void *) iterator->notifs->pdata; + *count_to_user = out_port_iter->count; break; default: /* Other errors */ @@ -841,7 +856,6 @@ void bt_output_port_notification_iterator_destroy(struct bt_object *obj) BT_LOGD("Destroying output port notification iterator object: addr=%p", iterator); - BT_ASSERT(!iterator->notif); BT_LOGD_STR("Putting graph."); bt_put(iterator->graph); BT_LOGD_STR("Putting colander sink component."); @@ -862,6 +876,7 @@ struct bt_notification_iterator *bt_output_port_notification_iterator_create( const char *colander_comp_name; struct bt_port *colander_in_port = NULL; struct bt_component_class_sink_colander_data colander_data; + int ret; BT_ASSERT_PRE_NON_NULL(output_port, "Output port"); BT_ASSERT_PRE(bt_port_get_type(output_port) == BT_PORT_TYPE_OUTPUT, @@ -883,9 +898,14 @@ struct bt_notification_iterator *bt_output_port_notification_iterator_create( goto error; } - init_notification_iterator((void *) iterator, + ret = init_notification_iterator((void *) iterator, BT_NOTIFICATION_ITERATOR_TYPE_OUTPUT_PORT, bt_output_port_notification_iterator_destroy); + if (ret) { + /* init_notification_iterator() logs errors */ + BT_PUT(iterator); + goto end; + } /* Create colander component */ colander_comp_cls = bt_component_class_sink_colander_get(); @@ -897,7 +917,9 @@ struct bt_notification_iterator *bt_output_port_notification_iterator_create( BT_MOVE(iterator->graph, graph); colander_comp_name = colander_component_name ? colander_component_name : "colander"; - colander_data.notification = &iterator->notif; + colander_data.notifs = (void *) iterator->base.notifs->pdata; + colander_data.count_addr = &iterator->count; + graph_status = bt_graph_add_component_with_init_method_data( iterator->graph, colander_comp_cls, colander_comp_name, NULL, &colander_data, &iterator->colander);