X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Flib%2Fgraph%2Fiterator.c;h=a76e325be16fbf271aa06456bf5bf5b823cc0e51;hb=e74015680521597497c3218160a9e80081932896;hp=796ade072d59c0c13b45758980a7e29b5af1fc47;hpb=4fb18ad2a9fe1fe95931f1aa2960417b9e558ae2;p=babeltrace.git diff --git a/src/lib/graph/iterator.c b/src/lib/graph/iterator.c index 796ade07..a76e325b 100644 --- a/src/lib/graph/iterator.c +++ b/src/lib/graph/iterator.c @@ -66,6 +66,26 @@ (_iter)->state == BT_MESSAGE_ITERATOR_STATE_LAST_SEEKING_RETURNED_ERROR, \ "Message iterator is in the wrong state: %!+i", (_iter)) +#ifdef BT_DEV_MODE +struct per_stream_state +{ + bt_packet *cur_packet; + + /* Bit mask of expected message types. */ + guint expected_msg_types; +}; +#endif + +static void +clear_per_stream_state (struct bt_message_iterator *iterator) +{ +#ifdef BT_DEV_MODE + g_hash_table_remove_all(iterator->per_stream_state); +#else + BT_USE_EXPR(iterator); +#endif +} + static inline void set_msg_iterator_state(struct bt_message_iterator *iterator, enum bt_message_iterator_state state) @@ -135,10 +155,13 @@ void bt_message_iterator_destroy(struct bt_object *obj) iterator->msgs = NULL; } +#ifdef BT_DEV_MODE + g_hash_table_destroy(iterator->per_stream_state); +#endif + g_free(iterator); } -BT_HIDDEN void bt_message_iterator_try_finalize( struct bt_message_iterator *iterator) { @@ -240,7 +263,6 @@ end: return; } -BT_HIDDEN void bt_message_iterator_set_connection( struct bt_message_iterator *iterator, struct bt_connection *connection) @@ -253,8 +275,9 @@ void bt_message_iterator_set_connection( static enum bt_message_iterator_can_seek_beginning_status can_seek_ns_from_origin_true( - struct bt_message_iterator *iterator, - int64_t ns_from_origin, bt_bool *can_seek) + struct bt_message_iterator *iterator __attribute__((unused)), + int64_t ns_from_origin __attribute__((unused)), + bt_bool *can_seek) { *can_seek = BT_TRUE; @@ -263,7 +286,7 @@ enum bt_message_iterator_can_seek_beginning_status can_seek_ns_from_origin_true( static enum bt_message_iterator_can_seek_beginning_status can_seek_beginning_true( - struct bt_message_iterator *iterator, + struct bt_message_iterator *iterator __attribute__((unused)), bt_bool *can_seek) { *can_seek = BT_TRUE; @@ -342,6 +365,16 @@ int create_self_component_input_port_message_iterator( g_ptr_array_set_size(iterator->msgs, MSG_BATCH_SIZE); iterator->last_ns_from_origin = INT64_MIN; + +#ifdef BT_DEV_MODE + /* The per-stream state is only used for dev assertions right now. */ + iterator->per_stream_state = g_hash_table_new_full( + g_direct_hash, + g_direct_equal, + NULL, + g_free); +#endif + iterator->auto_seek.msgs = g_queue_new(); if (!iterator->auto_seek.msgs) { BT_LIB_LOGE_APPEND_CAUSE("Failed to allocate a GQueue."); @@ -457,6 +490,7 @@ end: return status; } +BT_EXPORT bt_message_iterator_create_from_message_iterator_status bt_message_iterator_create_from_message_iterator( struct bt_self_message_iterator *self_msg_iter, @@ -469,6 +503,7 @@ bt_message_iterator_create_from_message_iterator( input_port, message_iterator, __func__); } +BT_EXPORT bt_message_iterator_create_from_sink_component_status bt_message_iterator_create_from_sink_component( struct bt_self_component_sink *self_comp, @@ -481,6 +516,7 @@ bt_message_iterator_create_from_sink_component( input_port, message_iterator, __func__); } +BT_EXPORT void *bt_self_message_iterator_get_data( const struct bt_self_message_iterator *self_iterator) { @@ -491,6 +527,7 @@ void *bt_self_message_iterator_get_data( return iterator->user_data; } +BT_EXPORT void bt_self_message_iterator_set_data( struct bt_self_message_iterator *self_iterator, void *data) { @@ -503,6 +540,7 @@ void bt_self_message_iterator_set_data( "%!+i, user-data-addr=%p", iterator, data); } +BT_EXPORT void bt_self_message_iterator_configuration_set_can_seek_forward( bt_self_message_iterator_configuration *config, bt_bool can_seek_forward) @@ -784,13 +822,350 @@ end: return result; } +#ifdef BT_DEV_MODE +static +const bt_stream *get_stream_from_msg(const struct bt_message *msg) +{ + struct bt_stream *stream; + + switch (msg->type) { + case BT_MESSAGE_TYPE_STREAM_BEGINNING: + case BT_MESSAGE_TYPE_STREAM_END: + { + struct bt_message_stream *msg_stream = + (struct bt_message_stream *) msg; + stream = msg_stream->stream; + break; + } + case BT_MESSAGE_TYPE_EVENT: + { + struct bt_message_event *msg_event = + (struct bt_message_event *) msg; + stream = msg_event->event->stream; + break; + } + case BT_MESSAGE_TYPE_PACKET_BEGINNING: + case BT_MESSAGE_TYPE_PACKET_END: + { + struct bt_message_packet *msg_packet = + (struct bt_message_packet *) msg; + stream = msg_packet->packet->stream; + break; + } + case BT_MESSAGE_TYPE_DISCARDED_EVENTS: + case BT_MESSAGE_TYPE_DISCARDED_PACKETS: + { + struct bt_message_discarded_items *msg_discarded = + (struct bt_message_discarded_items *) msg; + stream = msg_discarded->stream; + break; + } + case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY: + stream = NULL; + break; + default: + bt_common_abort(); + } + + return stream; +} + +static +GString *message_types_to_string(guint msg_types) +{ + GString *str = g_string_new(""); + + for (int msg_type = 1; msg_type <= BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY; + msg_type <<= 1) { + if (msg_type & msg_types) { + if (str->len > 0) { + g_string_append_c(str, '|'); + } + + g_string_append(str, + bt_common_message_type_string(msg_type)); + } + } + + return str; +} + +static +void update_expected_msg_type(const struct bt_stream *stream, + struct per_stream_state *state, + const struct bt_message *msg) +{ + switch (msg->type) { + case BT_MESSAGE_TYPE_STREAM_BEGINNING: + state->expected_msg_types = BT_MESSAGE_TYPE_STREAM_END; + + if (stream->class->supports_packets) { + state->expected_msg_types |= + BT_MESSAGE_TYPE_PACKET_BEGINNING; + + if (stream->class->supports_discarded_packets) { + state->expected_msg_types |= + BT_MESSAGE_TYPE_DISCARDED_PACKETS; + } + } else { + state->expected_msg_types |= BT_MESSAGE_TYPE_EVENT; + } + + if (stream->class->supports_discarded_events) { + state->expected_msg_types |= + BT_MESSAGE_TYPE_DISCARDED_EVENTS; + } + + break; + case BT_MESSAGE_TYPE_STREAM_END: + state->expected_msg_types = 0; + break; + case BT_MESSAGE_TYPE_EVENT: + { + state->expected_msg_types = BT_MESSAGE_TYPE_EVENT; + + if (stream->class->supports_packets) { + state->expected_msg_types |= BT_MESSAGE_TYPE_PACKET_END; + } else { + state->expected_msg_types |= BT_MESSAGE_TYPE_STREAM_END; + } + + if (stream->class->supports_discarded_events) { + state->expected_msg_types |= + BT_MESSAGE_TYPE_DISCARDED_EVENTS; + } + + break; + } + case BT_MESSAGE_TYPE_PACKET_BEGINNING: + { + state->expected_msg_types = BT_MESSAGE_TYPE_EVENT | + BT_MESSAGE_TYPE_PACKET_END; + + if (stream->class->supports_discarded_events) { + state->expected_msg_types |= + BT_MESSAGE_TYPE_DISCARDED_EVENTS; + } + + break; + } + case BT_MESSAGE_TYPE_PACKET_END: + { + state->expected_msg_types = BT_MESSAGE_TYPE_PACKET_BEGINNING | + BT_MESSAGE_TYPE_STREAM_END; + + if (stream->class->supports_discarded_events) { + state->expected_msg_types |= + BT_MESSAGE_TYPE_DISCARDED_EVENTS; + } + + if (stream->class->supports_discarded_packets) { + state->expected_msg_types |= + BT_MESSAGE_TYPE_DISCARDED_PACKETS; + } + + break; + } + case BT_MESSAGE_TYPE_DISCARDED_EVENTS: + state->expected_msg_types = BT_MESSAGE_TYPE_DISCARDED_EVENTS; + + if (state->cur_packet) { + state->expected_msg_types |= BT_MESSAGE_TYPE_EVENT | + BT_MESSAGE_TYPE_PACKET_END; + } else { + state->expected_msg_types |= BT_MESSAGE_TYPE_STREAM_END; + + if (stream->class->supports_packets) { + state->expected_msg_types |= + BT_MESSAGE_TYPE_PACKET_BEGINNING; + + if (stream->class->supports_discarded_packets) { + state->expected_msg_types |= + BT_MESSAGE_TYPE_DISCARDED_PACKETS; + } + } else { + state->expected_msg_types |= + BT_MESSAGE_TYPE_EVENT; + } + } + + break; + case BT_MESSAGE_TYPE_DISCARDED_PACKETS: + state->expected_msg_types = BT_MESSAGE_TYPE_DISCARDED_PACKETS | + BT_MESSAGE_TYPE_PACKET_BEGINNING | + BT_MESSAGE_TYPE_STREAM_END; + + if (stream->class->supports_discarded_events) { + state->expected_msg_types |= + BT_MESSAGE_TYPE_DISCARDED_EVENTS; + } + break; + default: + /* + * Other message types are not associated to a stream, so we + * should not get them here. + */ + bt_common_abort(); + } +} + +static +struct per_stream_state *get_per_stream_state( + struct bt_message_iterator *iterator, + const struct bt_stream *stream) +{ + struct per_stream_state *state = g_hash_table_lookup( + iterator->per_stream_state, stream); + + if (!state) { + state = g_new0(struct per_stream_state, 1); + state->expected_msg_types = BT_MESSAGE_TYPE_STREAM_BEGINNING; + g_hash_table_insert(iterator->per_stream_state, + (gpointer) stream, state); + } + + return state; +} +#endif + +#define NEXT_METHOD_NAME "bt_message_iterator_class_next_method" + +#ifdef BT_DEV_MODE +static +void assert_post_dev_expected_sequence(struct bt_message_iterator *iterator, + const struct bt_message *msg) +{ + const bt_stream *stream = get_stream_from_msg(msg); + struct per_stream_state *state; + + if (!stream) { + goto end; + } + + state = get_per_stream_state(iterator, stream); + + /* + * We don't free the return value of message_types_to_string(), but + * that's because we know the program is going to abort anyway, and + * we don't want to call it if the assertion holds. + */ + BT_ASSERT_POST_DEV(NEXT_METHOD_NAME, + "message-type-is-expected", + msg->type & state->expected_msg_types, + "Unexpected message type: %![stream-]s, %![iterator-]i, " + "%![message-]n, expected-msg-types=%s", + stream, iterator, msg, + message_types_to_string(state->expected_msg_types)->str); + + update_expected_msg_type(stream, state, msg); + +end: + return; +} + +static +void assert_post_dev_expected_packet(struct bt_message_iterator *iterator, + const struct bt_message *msg) +{ + const bt_stream *stream = get_stream_from_msg(msg); + struct per_stream_state *state; + const bt_packet *actual_packet = NULL; + const bt_packet *expected_packet = NULL; + + if (!stream) { + goto end; + } + + state = get_per_stream_state(iterator, stream); + + switch (msg->type) { + case BT_MESSAGE_TYPE_EVENT: + { + const struct bt_message_event *msg_event = + (const struct bt_message_event *) msg; + + actual_packet = msg_event->event->packet; + expected_packet = state->cur_packet; + break; + } + case BT_MESSAGE_TYPE_PACKET_BEGINNING: + { + const struct bt_message_packet *msg_packet = + (const struct bt_message_packet *) msg; + + BT_ASSERT(!state->cur_packet); + state->cur_packet = msg_packet->packet; + break; + } + case BT_MESSAGE_TYPE_PACKET_END: + { + const struct bt_message_packet *msg_packet = + (const struct bt_message_packet *) msg; + + actual_packet = msg_packet->packet; + expected_packet = state->cur_packet; + BT_ASSERT(state->cur_packet); + state->cur_packet = NULL; + break; + } + default: + break; + } + + BT_ASSERT_POST_DEV(NEXT_METHOD_NAME, + "message-packet-is-expected", + actual_packet == expected_packet, + "Message's packet is not expected: %![stream-]s, %![iterator-]i, " + "%![message-]n, %![received-packet-]a, %![expected-packet-]a", + stream, iterator, msg, actual_packet, expected_packet); + +end: + return; +} + +static +void assert_post_dev_next( + struct bt_message_iterator *iterator, + bt_message_iterator_class_next_method_status status, + bt_message_array_const msgs, uint64_t msg_count) +{ + if (status == BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) { + uint64_t i; + + for (i = 0; i < msg_count; i++) { + assert_post_dev_expected_sequence(iterator, msgs[i]); + assert_post_dev_expected_packet(iterator, msgs[i]); + } + } else if (status == BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END) { + GHashTableIter iter; + + gpointer stream_v, stream_state_v; + + g_hash_table_iter_init(&iter, iterator->per_stream_state); + while (g_hash_table_iter_next(&iter, &stream_v, + &stream_state_v)) { + struct bt_stream *stream = stream_v; + struct per_stream_state *stream_state = stream_state_v; + + BT_ASSERT_POST_DEV(NEXT_METHOD_NAME, + "stream-is-ended", + stream_state->expected_msg_types == 0, + "Stream is not ended: %![stream-]s, " + "%![iterator-]i, expected-msg-types=%s", + stream, iterator, + message_types_to_string( + stream_state->expected_msg_types)->str); + } + + } +} +#endif + /* * Call the `next` method of the iterator. Do some validation on the returned * messages. */ -#define NEXT_METHOD_NAME "bt_message_iterator_class_next_method" - static enum bt_message_iterator_class_next_method_status call_iterator_next_method( @@ -818,12 +1193,17 @@ call_iterator_next_method( "Clock snapshots are not monotonic"); } +#ifdef BT_DEV_MODE + assert_post_dev_next(iterator, status, msgs, *user_count); +#endif + BT_ASSERT_POST_DEV_NO_ERROR_IF_NO_ERROR_STATUS(NEXT_METHOD_NAME, status); return status; } +BT_EXPORT enum bt_message_iterator_next_status bt_message_iterator_next( struct bt_message_iterator *iterator, @@ -877,7 +1257,7 @@ bt_message_iterator_next( * by its downstream owner. * * For the same reason, there is no way that this iterator could - * have seeked (cannot seek a self message iterator). + * have sought (cannot seek a self message iterator). */ BT_ASSERT_DBG(iterator->state == BT_MESSAGE_ITERATOR_STATE_ACTIVE); @@ -906,6 +1286,7 @@ end: return status; } +BT_EXPORT struct bt_component * bt_message_iterator_borrow_component( struct bt_message_iterator *iterator) @@ -914,6 +1295,7 @@ bt_message_iterator_borrow_component( return iterator->upstream_component; } +BT_EXPORT struct bt_self_component *bt_self_message_iterator_borrow_component( struct bt_self_message_iterator *self_iterator) { @@ -924,6 +1306,7 @@ struct bt_self_component *bt_self_message_iterator_borrow_component( return (void *) iterator->upstream_component; } +BT_EXPORT struct bt_self_component_port_output *bt_self_message_iterator_borrow_port( struct bt_self_message_iterator *self_iterator) { @@ -937,6 +1320,7 @@ struct bt_self_component_port_output *bt_self_message_iterator_borrow_port( #define CAN_SEEK_NS_FROM_ORIGIN_METHOD_NAME \ "bt_message_iterator_class_can_seek_ns_from_origin_method" +BT_EXPORT enum bt_message_iterator_can_seek_ns_from_origin_status bt_message_iterator_can_seek_ns_from_origin( struct bt_message_iterator *iterator, @@ -1014,6 +1398,7 @@ end: #define CAN_SEEK_BEGINNING_METHOD_NAME \ "bt_message_iterator_class_can_seek_beginning" +BT_EXPORT enum bt_message_iterator_can_seek_beginning_status bt_message_iterator_can_seek_beginning( struct bt_message_iterator *iterator, @@ -1113,6 +1498,7 @@ bool message_iterator_can_seek_beginning( #define SEEK_BEGINNING_METHOD_NAME \ "bt_message_iterator_class_seek_beginning_method" +BT_EXPORT enum bt_message_iterator_seek_beginning_status bt_message_iterator_seek_beginning(struct bt_message_iterator *iterator) { @@ -1158,10 +1544,13 @@ bt_message_iterator_seek_beginning(struct bt_message_iterator *iterator) iterator, bt_common_func_status_string(status)); } + clear_per_stream_state(iterator); + set_iterator_state_after_seeking(iterator, status); return status; } +BT_EXPORT bt_bool bt_message_iterator_can_seek_forward( bt_message_iterator *iterator) @@ -1286,6 +1675,16 @@ int auto_seek_handle_message( const struct bt_message_packet *packet_msg = (const void *) msg; + if (msg->type == BT_MESSAGE_TYPE_PACKET_BEGINNING + && !packet_msg->packet->stream->class->packets_have_beginning_default_clock_snapshot) { + goto skip_msg; + } + + if (msg->type == BT_MESSAGE_TYPE_PACKET_END + && !packet_msg->packet->stream->class->packets_have_end_default_clock_snapshot) { + goto skip_msg; + } + clk_snapshot = packet_msg->default_cs; BT_ASSERT_POST_DEV(NEXT_METHOD_NAME, "packet-message-has-default-clock-snapshot", @@ -1300,6 +1699,16 @@ int auto_seek_handle_message( struct bt_message_discarded_items *msg_disc_items = (void *) msg; + if (msg->type == BT_MESSAGE_TYPE_DISCARDED_EVENTS && + !msg_disc_items->stream->class->discarded_events_have_default_clock_snapshots) { + goto skip_msg; + } + + if (msg->type == BT_MESSAGE_TYPE_DISCARDED_PACKETS && + !msg_disc_items->stream->class->discarded_packets_have_default_clock_snapshots) { + goto skip_msg; + } + BT_ASSERT_POST_DEV(NEXT_METHOD_NAME, "discarded-events-packets-message-has-default-clock-snapshot", msg_disc_items->default_begin_cs && @@ -1619,7 +2028,7 @@ end: /* * This function is installed as the iterator's next callback after we have - * auto-seeked (seeked to the beginning and fast-forwarded) to send the + * auto-sought (sought to the beginning and fast-forwarded) to send the * messages saved in iterator->auto_seek.msgs. Once this is done, the original * next callback is put back. */ @@ -1689,6 +2098,7 @@ bool message_iterator_can_seek_ns_from_origin( "bt_message_iterator_class_seek_ns_from_origin_method" +BT_EXPORT enum bt_message_iterator_seek_ns_from_origin_status bt_message_iterator_seek_ns_from_origin( struct bt_message_iterator *iterator, @@ -1793,6 +2203,8 @@ bt_message_iterator_seek_ns_from_origin( iterator, bt_common_func_status_string(status)); } + clear_per_stream_state(iterator); + switch (status) { case BT_FUNC_STATUS_OK: break; @@ -1949,6 +2361,8 @@ bt_message_iterator_seek_ns_from_origin( } } + clear_per_stream_state(iterator); + /* * The following messages returned by the next method (including * post_auto_seek_next) must be after (or at) `ns_from_origin`. @@ -1965,6 +2379,7 @@ end: return status; } +BT_EXPORT bt_bool bt_self_message_iterator_is_interrupted( const struct bt_self_message_iterator *self_msg_iter) { @@ -1975,12 +2390,14 @@ bt_bool bt_self_message_iterator_is_interrupted( return (bt_bool) bt_graph_is_interrupted(iterator->graph); } +BT_EXPORT void bt_message_iterator_get_ref( const struct bt_message_iterator *iterator) { bt_object_get_ref(iterator); } +BT_EXPORT void bt_message_iterator_put_ref( const struct bt_message_iterator *iterator) {