X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Flib%2Fgraph%2Fiterator.c;h=41805663d40dfa1d8a0b85c4d3125ab42c5a25e8;hb=6162e6b7a6d907974d954419c2375f654f39eb72;hp=b6117e125c05b36110ca7db4921fffefd1d64e6c;hpb=19e3b820610759ae071b2207dc1dda2c98fd53f8;p=babeltrace.git diff --git a/src/lib/graph/iterator.c b/src/lib/graph/iterator.c index b6117e12..41805663 100644 --- a/src/lib/graph/iterator.c +++ b/src/lib/graph/iterator.c @@ -25,6 +25,7 @@ #include "lib/logging.h" #include "compat/compiler.h" +#include "compat/glib.h" #include "lib/trace-ir/clock-class.h" #include "lib/trace-ir/clock-snapshot.h" #include @@ -33,16 +34,22 @@ #include #include "lib/trace-ir/packet.h" #include "lib/trace-ir/stream.h" +#include +#include +#include #include #include #include #include -#include +#include #include #include #include +#include +#include #include #include +#include #include #include #include @@ -51,6 +58,7 @@ #include #include "common/assert.h" #include "lib/assert-pre.h" +#include "lib/assert-post.h" #include #include #include @@ -69,7 +77,7 @@ #include "message/message-iterator-inactivity.h" #include "message/stream.h" #include "message/packet.h" -#include "message/stream-activity.h" +#include "lib/func-status.h" /* * TODO: Use graph's state (number of active iterators, etc.) and @@ -144,14 +152,14 @@ void bt_self_component_port_input_message_iterator_destroy(struct bt_object *obj iterator->connection = NULL; } - if (iterator->auto_seek_msgs) { - while (!g_queue_is_empty(iterator->auto_seek_msgs)) { + if (iterator->auto_seek.msgs) { + while (!g_queue_is_empty(iterator->auto_seek.msgs)) { bt_object_put_no_null_check( - g_queue_pop_tail(iterator->auto_seek_msgs)); + g_queue_pop_tail(iterator->auto_seek.msgs)); } - g_queue_free(iterator->auto_seek_msgs); - iterator->auto_seek_msgs = NULL; + g_queue_free(iterator->auto_seek.msgs); + iterator->auto_seek.msgs = NULL; } destroy_base_message_iterator(obj); @@ -180,7 +188,7 @@ void bt_self_component_port_input_message_iterator_try_finalize( "%!+i", iterator); goto end; case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZING: - /* Already finalized */ + /* Finalizing */ BT_LIB_LOGF("Message iterator is already being finalized: " "%!+i", iterator); abort(); @@ -255,7 +263,7 @@ int init_message_iterator(struct bt_message_iterator *iterator, iterator->type = type; iterator->msgs = g_ptr_array_new(); if (!iterator->msgs) { - BT_LOGE_STR("Failed to allocate a GPtrArray."); + BT_LIB_LOGE_APPEND_CAUSE("Failed to allocate a GPtrArray."); ret = -1; goto end; } @@ -302,9 +310,10 @@ bt_self_component_port_input_message_iterator_create_initial( iterator = g_new0( struct bt_self_component_port_input_message_iterator, 1); if (!iterator) { - BT_LOGE_STR("Failed to allocate one self component input port " + BT_LIB_LOGE_APPEND_CAUSE( + "Failed to allocate one self component input port " "message iterator."); - goto end; + goto error; } ret = init_message_iterator((void *) iterator, @@ -312,13 +321,14 @@ bt_self_component_port_input_message_iterator_create_initial( bt_self_component_port_input_message_iterator_destroy); if (ret) { /* init_message_iterator() logs errors */ - BT_OBJECT_PUT_REF_AND_RESET(iterator); - goto end; + goto error; } - iterator->auto_seek_msgs = g_queue_new(); - if (!iterator->auto_seek_msgs) { - BT_LOGE_STR("Failed to allocate a GQueue."); + iterator->last_ns_from_origin = INT64_MIN; + + iterator->auto_seek.msgs = g_queue_new(); + if (!iterator->auto_seek.msgs) { + BT_LIB_LOGE_APPEND_CAUSE("Failed to allocate a GQueue."); ret = -1; goto end; } @@ -396,6 +406,10 @@ bt_self_component_port_input_message_iterator_create_initial( BT_LIB_LOGI("Created initial message iterator on self component input port: " "%![up-port-]+p, %![up-comp-]+c, %![iter-]+i", upstream_port, upstream_comp, iterator); + goto end; + +error: + BT_OBJECT_PUT_REF_AND_RESET(iterator); end: return iterator; @@ -405,7 +419,7 @@ struct bt_self_component_port_input_message_iterator * bt_self_component_port_input_message_iterator_create( struct bt_self_component_port_input *self_port) { - typedef enum bt_self_message_iterator_status (*init_method_t)( + typedef enum bt_component_class_message_iterator_init_method_status (*init_method_t)( void *, void *, void *); init_method_t init_method = NULL; @@ -444,9 +458,9 @@ bt_self_component_port_input_message_iterator_create( iterator = bt_self_component_port_input_message_iterator_create_initial( upstream_comp, upstream_port); if (!iterator) { - BT_LOGW_STR("Cannot create self component input port " - "message iterator."); - goto end; + BT_LIB_LOGE_APPEND_CAUSE( + "Cannot create self component input port message iterator."); + goto error; } switch (upstream_comp_cls->type) { @@ -474,17 +488,20 @@ bt_self_component_port_input_message_iterator_create( } if (init_method) { - int iter_status; + enum bt_component_class_message_iterator_init_method_status iter_status; BT_LIB_LOGD("Calling user's initialization method: %!+i", iterator); iter_status = init_method(iterator, upstream_comp, upstream_port); BT_LOGD("User method returned: status=%s", - bt_message_iterator_status_string(iter_status)); - if (iter_status != BT_MESSAGE_ITERATOR_STATUS_OK) { - BT_LOGW_STR("Initialization method failed."); - BT_OBJECT_PUT_REF_AND_RESET(iterator); - goto end; + bt_common_func_status_string(iter_status)); + if (iter_status != BT_FUNC_STATUS_OK) { + BT_LIB_LOGW_APPEND_CAUSE( + "Component input port message iterator initialization method failed: " + "%![iter-]+i, status=%s", + iterator, + bt_common_func_status_string(iter_status)); + goto error; } } @@ -494,6 +511,10 @@ bt_self_component_port_input_message_iterator_create( BT_LIB_LOGI("Created message iterator on self component input port: " "%![up-port-]+p, %![up-comp-]+c, %![iter-]+i", upstream_port, upstream_comp, iterator); + goto end; + +error: + BT_OBJECT_PUT_REF_AND_RESET(iterator); end: return iterator; @@ -521,12 +542,293 @@ void bt_self_message_iterator_set_data( "%!+i, user-data-addr=%p", iterator, data); } -enum bt_message_iterator_status +/* + * Validate that the default clock snapshot in `msg` doesn't make us go back in + * time. + */ + +BT_ASSERT_POST_FUNC +static +bool clock_snapshots_are_monotonic_one( + struct bt_self_component_port_input_message_iterator *iterator, + const bt_message *msg) +{ + const struct bt_clock_snapshot *clock_snapshot = NULL; + bt_message_type message_type = bt_message_get_type(msg); + int64_t ns_from_origin; + enum bt_clock_snapshot_get_ns_from_origin_status clock_snapshot_status; + + /* + * The default is true: if we can't figure out the clock snapshot + * (or there is none), assume it is fine. + */ + bool result = true; + + switch (message_type) { + case BT_MESSAGE_TYPE_EVENT: + { + struct bt_message_event *event_msg = (struct bt_message_event *) msg; + clock_snapshot = event_msg->default_cs; + break; + } + case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY: + { + struct bt_message_message_iterator_inactivity *inactivity_msg = + (struct bt_message_message_iterator_inactivity *) msg; + clock_snapshot = inactivity_msg->default_cs; + break; + } + case BT_MESSAGE_TYPE_PACKET_BEGINNING: + case BT_MESSAGE_TYPE_PACKET_END: + { + struct bt_message_packet *packet_msg = (struct bt_message_packet *) msg; + clock_snapshot = packet_msg->default_cs; + break; + } + case BT_MESSAGE_TYPE_STREAM_BEGINNING: + case BT_MESSAGE_TYPE_STREAM_END: + { + struct bt_message_stream *stream_msg = (struct bt_message_stream *) msg; + if (stream_msg->default_cs_state != BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_KNOWN) { + goto end; + } + + clock_snapshot = stream_msg->default_cs; + break; + } + case BT_MESSAGE_TYPE_DISCARDED_EVENTS: + case BT_MESSAGE_TYPE_DISCARDED_PACKETS: + { + struct bt_message_discarded_items *discarded_msg = + (struct bt_message_discarded_items *) msg; + + clock_snapshot = discarded_msg->default_begin_cs; + break; + } + } + + if (!clock_snapshot) { + goto end; + } + + clock_snapshot_status = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, &ns_from_origin); + if (clock_snapshot_status != BT_FUNC_STATUS_OK) { + goto end; + } + + result = ns_from_origin >= iterator->last_ns_from_origin; + iterator->last_ns_from_origin = ns_from_origin; +end: + return result; +} + +BT_ASSERT_POST_FUNC +static +bool clock_snapshots_are_monotonic( + struct bt_self_component_port_input_message_iterator *iterator, + bt_message_array_const msgs, uint64_t msg_count) +{ + uint64_t i; + bool result; + + for (i = 0; i < msg_count; i++) { + if (!clock_snapshots_are_monotonic_one(iterator, msgs[i])) { + result = false; + goto end; + } + } + + result = true; + +end: + return result; +} + +/* + * When a new stream begins, verify that the clock class tied to this + * stream is compatible with what we've seen before. + */ + +BT_ASSERT_POST_FUNC +static +bool clock_classes_are_compatible_one(struct bt_self_component_port_input_message_iterator *iterator, + const struct bt_message *msg) +{ + enum bt_message_type message_type = bt_message_get_type(msg); + bool result; + + if (message_type == BT_MESSAGE_TYPE_STREAM_BEGINNING) { + const struct bt_message_stream *stream_msg = (struct bt_message_stream *) msg; + const struct bt_clock_class *clock_class = stream_msg->stream->class->default_clock_class; + bt_uuid clock_class_uuid = NULL; + + if (clock_class) { + clock_class_uuid = bt_clock_class_get_uuid(clock_class); + } + + switch (iterator->clock_expectation.type) { + case CLOCK_EXPECTATION_UNSET: + /* + * This is the first time we see a message with a clock + * snapshot: record the properties of that clock, against + * which we'll compare the clock properties of the following + * messages. + */ + + if (!clock_class) { + iterator->clock_expectation.type = CLOCK_EXPECTATION_NONE; + } else if (bt_clock_class_origin_is_unix_epoch(clock_class)) { + iterator->clock_expectation.type = CLOCK_EXPECTATION_ORIGIN_UNIX; + } else if (clock_class_uuid) { + iterator->clock_expectation.type = CLOCK_EXPECTATION_ORIGIN_OTHER_UUID; + bt_uuid_copy(iterator->clock_expectation.uuid, clock_class_uuid); + } else { + iterator->clock_expectation.type = CLOCK_EXPECTATION_ORIGIN_OTHER_NO_UUID; + } + break; + + case CLOCK_EXPECTATION_NONE: + if (clock_class) { + BT_ASSERT_POST_MSG("Expecting no clock class, got one: %![cc-]+K", + clock_class); + result = false; + goto end; + } + + break; + + case CLOCK_EXPECTATION_ORIGIN_UNIX: + if (!clock_class) { + BT_ASSERT_POST_MSG("Expecting a clock class, got none."); + result = false; + goto end; + } + + if (!bt_clock_class_origin_is_unix_epoch(clock_class)) { + BT_ASSERT_POST_MSG("Expecting a clock class with Unix epoch origin: %![cc-]+K", + clock_class); + result = false; + goto end; + } + break; + + case CLOCK_EXPECTATION_ORIGIN_OTHER_UUID: + if (!clock_class) { + BT_ASSERT_POST_MSG("Expecting a clock class, got none."); + result = false; + goto end; + } + + if (bt_clock_class_origin_is_unix_epoch(clock_class)) { + BT_ASSERT_POST_MSG("Expecting a clock class without Unix epoch origin: %![cc-]+K", + clock_class); + result = false; + goto end; + } + + if (!clock_class_uuid) { + BT_ASSERT_POST_MSG("Expecting a clock class with UUID: %![cc-]+K", + clock_class); + result = false; + goto end; + } + + if (bt_uuid_compare(iterator->clock_expectation.uuid, clock_class_uuid)) { + BT_ASSERT_POST_MSG("Expecting a clock class with UUID, got one " + "with a different UUID: %![cc-]+K, expected-uuid=%!u", + clock_class, iterator->clock_expectation.uuid); + result = false; + goto end; + } + break; + + case CLOCK_EXPECTATION_ORIGIN_OTHER_NO_UUID: + if (!clock_class) { + BT_ASSERT_POST_MSG("Expecting a clock class, got none."); + result = false; + goto end; + } + + if (bt_clock_class_origin_is_unix_epoch(clock_class)) { + BT_ASSERT_POST_MSG("Expecting a clock class without Unix epoch origin: %![cc-]+K", + clock_class); + result = false; + goto end; + } + + if (clock_class_uuid) { + BT_ASSERT_POST_MSG("Expecting a clock class without UUID: %![cc-]+K", + clock_class); + result = false; + goto end; + } + break; + } + } + + result = true; + +end: + return result; +} + +BT_ASSERT_POST_FUNC +static +bool clock_classes_are_compatible( + struct bt_self_component_port_input_message_iterator *iterator, + bt_message_array_const msgs, uint64_t msg_count) +{ + uint64_t i; + bool result; + + for (i = 0; i < msg_count; i++) { + if (!clock_classes_are_compatible_one(iterator, msgs[i])) { + result = false; + goto end; + } + } + + result = true; + +end: + return result; +} + +/* + * Call the `next` method of the iterator. Do some validation on the returned + * messages. + */ + +static +enum bt_component_class_message_iterator_next_method_status +call_iterator_next_method( + struct bt_self_component_port_input_message_iterator *iterator, + bt_message_array_const msgs, uint64_t capacity, uint64_t *user_count) +{ + enum bt_component_class_message_iterator_next_method_status status; + + BT_ASSERT(iterator->methods.next); + BT_LOGD_STR("Calling user's \"next\" method."); + status = iterator->methods.next(iterator, msgs, capacity, user_count); + BT_LOGD("User method returned: status=%s, msg-count=%" PRIu64, + bt_common_func_status_string(status), *user_count); + + if (status == BT_FUNC_STATUS_OK) { + BT_ASSERT_POST(clock_classes_are_compatible(iterator, msgs, *user_count), + "Clocks are not compatible"); + BT_ASSERT_POST(clock_snapshots_are_monotonic(iterator, msgs, *user_count), + "Clock snapshots are not monotonic"); + } + + return status; +} + +enum bt_message_iterator_next_status bt_self_component_port_input_message_iterator_next( struct bt_self_component_port_input_message_iterator *iterator, bt_message_array_const *msgs, uint64_t *user_count) { - int status = BT_MESSAGE_ITERATOR_STATUS_OK; + enum bt_message_iterator_next_status status = BT_FUNC_STATUS_OK; BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); BT_ASSERT_PRE_NON_NULL(msgs, "Message array (output)"); @@ -550,16 +852,17 @@ bt_self_component_port_input_message_iterator_next( * Call the user's "next" method to get the next messages * and status. */ - BT_ASSERT(iterator->methods.next); - BT_LOGD_STR("Calling user's \"next\" method."); *user_count = 0; - status = iterator->methods.next(iterator, + status = (int) call_iterator_next_method(iterator, (void *) iterator->base.msgs->pdata, MSG_BATCH_SIZE, user_count); BT_LOGD("User method returned: status=%s, msg-count=%" PRIu64, - bt_message_iterator_status_string(status), *user_count); + bt_common_func_status_string(status), *user_count); if (status < 0) { - BT_LOGW_STR("User method failed."); + BT_LIB_LOGW_APPEND_CAUSE( + "Component input port message iterator's \"next\" method failed: " + "%![iter-]+i, status=%s", + iterator, bt_common_func_status_string(status)); goto end; } @@ -576,16 +879,16 @@ bt_self_component_port_input_message_iterator_next( BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE); switch (status) { - case BT_MESSAGE_ITERATOR_STATUS_OK: - BT_ASSERT_PRE(*user_count <= MSG_BATCH_SIZE, + case BT_FUNC_STATUS_OK: + BT_ASSERT_POST(*user_count <= MSG_BATCH_SIZE, "Invalid returned message count: greater than " "batch size: count=%" PRIu64 ", batch-size=%u", *user_count, MSG_BATCH_SIZE); *msgs = (void *) iterator->base.msgs->pdata; break; - case BT_MESSAGE_ITERATOR_STATUS_AGAIN: + case BT_FUNC_STATUS_AGAIN: goto end; - case BT_MESSAGE_ITERATOR_STATUS_END: + case BT_FUNC_STATUS_END: set_self_comp_port_input_msg_iterator_state(iterator, BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ENDED); goto end; @@ -598,13 +901,13 @@ end: return status; } -enum bt_message_iterator_status bt_port_output_message_iterator_next( +enum bt_message_iterator_next_status bt_port_output_message_iterator_next( struct bt_port_output_message_iterator *iterator, bt_message_array_const *msgs_to_user, uint64_t *count_to_user) { - enum bt_message_iterator_status status; - enum bt_graph_status graph_status; + enum bt_message_iterator_next_status status; + int graph_status; BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); BT_ASSERT_PRE_NON_NULL(msgs_to_user, "Message array (output)"); @@ -614,14 +917,14 @@ enum bt_message_iterator_status bt_port_output_message_iterator_next( graph_status = bt_graph_consume_sink_no_check(iterator->graph, iterator->colander); switch (graph_status) { - case BT_GRAPH_STATUS_CANCELED: - case BT_GRAPH_STATUS_AGAIN: - case BT_GRAPH_STATUS_END: - case BT_GRAPH_STATUS_NOMEM: + case BT_FUNC_STATUS_CANCELED: + case BT_FUNC_STATUS_AGAIN: + case BT_FUNC_STATUS_END: + case BT_FUNC_STATUS_MEMORY_ERROR: status = (int) graph_status; break; - case BT_GRAPH_STATUS_OK: - status = BT_MESSAGE_ITERATOR_STATUS_OK; + case BT_FUNC_STATUS_OK: + status = BT_FUNC_STATUS_OK; /* * On success, the colander sink moves the messages @@ -633,7 +936,7 @@ enum bt_message_iterator_status bt_port_output_message_iterator_next( break; default: /* Other errors */ - status = BT_MESSAGE_ITERATOR_STATUS_ERROR; + status = BT_FUNC_STATUS_ERROR; } return status; @@ -697,7 +1000,7 @@ bt_port_output_message_iterator_create(struct bt_graph *graph, struct bt_component_class_sink *colander_comp_cls = NULL; struct bt_component *output_port_comp = NULL; struct bt_component_sink *colander_comp; - enum bt_graph_status graph_status; + int graph_status; struct bt_port_input *colander_in_port = NULL; struct bt_component_class_sink_colander_data colander_data; int ret; @@ -720,7 +1023,8 @@ bt_port_output_message_iterator_create(struct bt_graph *graph, "%![port-]+p, %![comp-]+c", output_port, output_port_comp); iterator = g_new0(struct bt_port_output_message_iterator, 1); if (!iterator) { - BT_LOGE_STR("Failed to allocate one output port message iterator."); + BT_LIB_LOGE_APPEND_CAUSE( + "Failed to allocate one output port message iterator."); goto error; } @@ -736,7 +1040,9 @@ bt_port_output_message_iterator_create(struct bt_graph *graph, /* Create colander component */ colander_comp_cls = bt_component_class_sink_colander_get(); if (!colander_comp_cls) { - BT_LOGW("Cannot get colander sink component class."); + /* bt_component_class_sink_colander_get() logs errors */ + BT_LIB_LOGE_APPEND_CAUSE( + "Cannot get colander sink component class."); goto error; } @@ -752,16 +1058,16 @@ bt_port_output_message_iterator_create(struct bt_graph *graph, * class module does not use this level anyway since it belongs * to the library. */ - graph_status = - bt_graph_add_sink_component_with_init_method_data( + graph_status = bt_graph_add_sink_component_with_init_method_data( (void *) graph, colander_comp_cls, "colander-36ac3409-b1a8-4d60-ab1f-4fdf341a8fb1", NULL, &colander_data, BT_LOGGING_LEVEL_NONE, (void *) &iterator->colander); - if (graph_status != BT_GRAPH_STATUS_OK) { - BT_LIB_LOGW("Cannot add colander sink component to graph: " - "%1[graph-]+g, status=%s", graph, - bt_graph_status_string(graph_status)); + if (graph_status != BT_FUNC_STATUS_OK) { + BT_LIB_LOGE_APPEND_CAUSE( + "Cannot add colander sink component to graph: " + "%![graph-]+g, status=%s", graph, + bt_common_func_status_string(graph_status)); goto error; } @@ -775,11 +1081,12 @@ bt_port_output_message_iterator_create(struct bt_graph *graph, BT_ASSERT(colander_in_port); graph_status = bt_graph_connect_ports(graph, output_port, colander_in_port, NULL); - if (graph_status != BT_GRAPH_STATUS_OK) { - BT_LIB_LOGW("Cannot add colander sink component to graph: " + if (graph_status != BT_FUNC_STATUS_OK) { + BT_LIB_LOGW_APPEND_CAUSE( + "Cannot connect colander sink's port: " "%![graph-]+g, %![comp-]+c, status=%s", graph, iterator->colander, - bt_graph_status_string(graph_status)); + bt_common_func_status_string(graph_status)); goto error; } @@ -795,10 +1102,13 @@ bt_port_output_message_iterator_create(struct bt_graph *graph, /* Also set the graph as being configured. */ graph_status = bt_graph_configure(graph); - if (graph_status != BT_GRAPH_STATUS_OK) { - BT_LIB_LOGW("Cannot configure graph after having added colander: " - "%![graph-]+g, status=%s", graph, - bt_graph_status_string(graph_status)); + if (graph_status != BT_FUNC_STATUS_OK) { + BT_LIB_LOGW_APPEND_CAUSE( + "Cannot configure graph after having " + "added and connected colander sink: " + "%![graph-]+g, %![comp-]+c, status=%s", graph, + iterator->colander, + bt_common_func_status_string(graph_status)); goto error; } goto end; @@ -889,23 +1199,23 @@ bt_bool bt_self_component_port_input_message_iterator_can_seek_beginning( static inline void set_iterator_state_after_seeking( struct bt_self_component_port_input_message_iterator *iterator, - enum bt_message_iterator_status status) + int status) { enum bt_self_component_port_input_message_iterator_state new_state = 0; /* Set iterator's state depending on seeking status */ switch (status) { - case BT_MESSAGE_ITERATOR_STATUS_OK: + case BT_FUNC_STATUS_OK: new_state = BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE; break; - case BT_MESSAGE_ITERATOR_STATUS_AGAIN: + case BT_FUNC_STATUS_AGAIN: new_state = BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_LAST_SEEKING_RETURNED_AGAIN; break; - case BT_MESSAGE_ITERATOR_STATUS_ERROR: - case BT_MESSAGE_ITERATOR_STATUS_NOMEM: + case BT_FUNC_STATUS_ERROR: + case BT_FUNC_STATUS_MEMORY_ERROR: new_state = BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_LAST_SEEKING_RETURNED_ERROR; break; - case BT_MESSAGE_ITERATOR_STATUS_END: + case BT_FUNC_STATUS_END: new_state = BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ENDED; break; default: @@ -915,7 +1225,15 @@ void set_iterator_state_after_seeking( set_self_comp_port_input_msg_iterator_state(iterator, new_state); } -enum bt_message_iterator_status +static +void reset_iterator_expectations( + struct bt_self_component_port_input_message_iterator *iterator) +{ + iterator->last_ns_from_origin = INT64_MIN; + iterator->clock_expectation.type = CLOCK_EXPECTATION_UNSET; +} + +enum bt_message_iterator_seek_beginning_status bt_self_component_port_input_message_iterator_seek_beginning( struct bt_self_component_port_input_message_iterator *iterator) { @@ -932,29 +1250,115 @@ bt_self_component_port_input_message_iterator_seek_beginning( bt_self_component_port_input_message_iterator_can_seek_beginning( iterator), "Message iterator cannot seek beginning: %!+i", iterator); + + /* + * We are seeking, reset our expectations about how the following + * messages should look like. + */ + reset_iterator_expectations(iterator); + BT_LIB_LOGD("Calling user's \"seek beginning\" method: %!+i", iterator); set_self_comp_port_input_msg_iterator_state(iterator, BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_SEEKING); status = iterator->methods.seek_beginning(iterator); BT_LOGD("User method returned: status=%s", - bt_message_iterator_status_string(status)); - BT_ASSERT_PRE(status == BT_MESSAGE_ITERATOR_STATUS_OK || - status == BT_MESSAGE_ITERATOR_STATUS_ERROR || - status == BT_MESSAGE_ITERATOR_STATUS_NOMEM || - status == BT_MESSAGE_ITERATOR_STATUS_AGAIN, + bt_common_func_status_string(status)); + BT_ASSERT_POST(status == BT_FUNC_STATUS_OK || + status == BT_FUNC_STATUS_ERROR || + status == BT_FUNC_STATUS_MEMORY_ERROR || + status == BT_FUNC_STATUS_AGAIN, "Unexpected status: %![iter-]+i, status=%s", - iterator, bt_common_self_message_iterator_status_string(status)); + iterator, bt_common_func_status_string(status)); + if (status < 0) { + BT_LIB_LOGW_APPEND_CAUSE( + "Component input port message iterator's \"seek beginning\" method failed: " + "%![iter-]+i, status=%s", + iterator, bt_common_func_status_string(status)); + } + set_iterator_state_after_seeking(iterator, status); return status; } +/* + * Structure used to record the state of a given stream during the fast-forward + * phase of an auto-seek. + */ +struct auto_seek_stream_state { + /* + * Value representing which step of this timeline we are at. + * + * time ---> + * [SB] 1 [PB] 2 [PE] 1 [SE] + * + * At each point in the timeline, the messages we need to replicate are: + * + * 1: Stream beginning + * 2: Stream beginning, packet beginning + * + * Before "Stream beginning" and after "Stream end", we don't need to + * replicate anything as the stream doesn't exist. + */ + enum { + AUTO_SEEK_STREAM_STATE_STREAM_BEGAN, + AUTO_SEEK_STREAM_STATE_PACKET_BEGAN, + } state; + + /* + * If `state` is AUTO_SEEK_STREAM_STATE_PACKET_BEGAN, the packet we are + * in. This is a weak reference, since the packet will always be + * alive by the time we use it. + */ + struct bt_packet *packet; + + /* Have we see a message with a clock snapshot yet? */ + bool seen_clock_snapshot; +}; + +static +struct auto_seek_stream_state *create_auto_seek_stream_state(void) +{ + return g_new0(struct auto_seek_stream_state, 1); +} + +static +void destroy_auto_seek_stream_state(void *ptr) +{ + g_free(ptr); +} + +static +GHashTable *create_auto_seek_stream_states(void) +{ + return g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, + destroy_auto_seek_stream_state); +} + +static +void destroy_auto_seek_stream_states(GHashTable *stream_states) +{ + g_hash_table_destroy(stream_states); +} + +/* + * Handle one message while we are in the fast-forward phase of an auto-seek. + * + * Sets `*got_first` to true if the message's timestamp is greater or equal to + * `ns_from_origin`. In other words, if this is the first message after our + * seek point. + * + * `stream_states` is an hash table of `bt_stream *` (weak reference) to + * `struct auto_seek_stream_state` used to keep the state of each stream + * during the fast-forward. + */ + static inline -enum bt_message_iterator_status auto_seek_handle_message( +int auto_seek_handle_message( struct bt_self_component_port_input_message_iterator *iterator, int64_t ns_from_origin, const struct bt_message *msg, - bool *got_first) + bool *got_first, GHashTable *stream_states) { - enum bt_message_iterator_status status = BT_MESSAGE_ITERATOR_STATUS_OK; + int status = BT_FUNC_STATUS_OK; int64_t msg_ns_from_origin; const struct bt_clock_snapshot *clk_snapshot = NULL; int ret; @@ -969,7 +1373,7 @@ enum bt_message_iterator_status auto_seek_handle_message( (const void *) msg; clk_snapshot = event_msg->default_cs; - BT_ASSERT_PRE(clk_snapshot, + BT_ASSERT_POST(clk_snapshot, "Event message has no default clock snapshot: %!+n", event_msg); break; @@ -990,7 +1394,7 @@ enum bt_message_iterator_status auto_seek_handle_message( (const void *) msg; clk_snapshot = packet_msg->default_cs; - BT_ASSERT_PRE(clk_snapshot, + BT_ASSERT_POST(clk_snapshot, "Packet message has no default clock snapshot: %!+n", packet_msg); break; @@ -1001,7 +1405,7 @@ enum bt_message_iterator_status auto_seek_handle_message( struct bt_message_discarded_items *msg_disc_items = (void *) msg; - BT_ASSERT_PRE(msg_disc_items->default_begin_cs && + BT_ASSERT_POST(msg_disc_items->default_begin_cs && msg_disc_items->default_end_cs, "Discarded events/packets message has no default clock snapshots: %!+n", msg_disc_items); @@ -1009,7 +1413,7 @@ enum bt_message_iterator_status auto_seek_handle_message( msg_disc_items->default_begin_cs, &msg_ns_from_origin); if (ret) { - status = BT_MESSAGE_ITERATOR_STATUS_ERROR; + status = BT_FUNC_STATUS_ERROR; goto end; } @@ -1022,7 +1426,7 @@ enum bt_message_iterator_status auto_seek_handle_message( msg_disc_items->default_end_cs, &msg_ns_from_origin); if (ret) { - status = BT_MESSAGE_ITERATOR_STATUS_ERROR; + status = BT_FUNC_STATUS_ERROR; goto end; } @@ -1042,7 +1446,7 @@ enum bt_message_iterator_status auto_seek_handle_message( msg_disc_items->default_end_cs->clock_class, ns_from_origin, &new_begin_raw_value); if (ret) { - status = BT_MESSAGE_ITERATOR_STATUS_ERROR; + status = BT_FUNC_STATUS_ERROR; goto end; } @@ -1061,63 +1465,20 @@ enum bt_message_iterator_status auto_seek_handle_message( goto skip_msg; } } - case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING: - { - const struct bt_message_stream_activity *stream_act_msg = - (const void *) msg; - - switch (stream_act_msg->default_cs_state) { - case BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN: - case BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE: - /* - * -inf is always less than any requested time, - * and we can't assume any specific time for an - * unknown clock snapshot, so skip this. - */ - goto skip_msg; - case BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN: - clk_snapshot = stream_act_msg->default_cs; - BT_ASSERT(clk_snapshot); - break; - default: - abort(); - } - - break; - } - case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END: + case BT_MESSAGE_TYPE_STREAM_BEGINNING: + case BT_MESSAGE_TYPE_STREAM_END: { - const struct bt_message_stream_activity *stream_act_msg = - (const void *) msg; + struct bt_message_stream *stream_msg = + (struct bt_message_stream *) msg; - switch (stream_act_msg->default_cs_state) { - case BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN: - /* - * We can't assume any specific time for an - * unknown clock snapshot, so skip this. - */ + if (stream_msg->default_cs_state != BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_KNOWN) { + /* Ignore */ goto skip_msg; - case BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE: - /* - * +inf is always greater than any requested - * time. - */ - *got_first = true; - goto push_msg; - case BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN: - clk_snapshot = stream_act_msg->default_cs; - BT_ASSERT(clk_snapshot); - break; - default: - abort(); } + clk_snapshot = stream_msg->default_cs; break; } - case BT_MESSAGE_TYPE_STREAM_BEGINNING: - case BT_MESSAGE_TYPE_STREAM_END: - /* Ignore */ - goto skip_msg; default: abort(); } @@ -1126,7 +1487,7 @@ enum bt_message_iterator_status auto_seek_handle_message( ret = bt_clock_snapshot_get_ns_from_origin(clk_snapshot, &msg_ns_from_origin); if (ret) { - status = BT_MESSAGE_ITERATOR_STATUS_ERROR; + status = BT_FUNC_STATUS_ERROR; goto end; } @@ -1136,23 +1497,138 @@ enum bt_message_iterator_status auto_seek_handle_message( } skip_msg: + /* This message won't be sent downstream. */ + switch (msg->type) { + case BT_MESSAGE_TYPE_STREAM_BEGINNING: + { + const struct bt_message_stream *stream_msg = (const void *) msg; + struct auto_seek_stream_state *stream_state; + + /* Update stream's state: stream began. */ + stream_state = create_auto_seek_stream_state(); + if (!stream_state) { + status = BT_FUNC_STATUS_MEMORY_ERROR; + goto end; + } + + stream_state->state = AUTO_SEEK_STREAM_STATE_STREAM_BEGAN; + + if (stream_msg->default_cs_state == BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_KNOWN) { + stream_state->seen_clock_snapshot = true; + } + + BT_ASSERT(!bt_g_hash_table_contains(stream_states, stream_msg->stream)); + g_hash_table_insert(stream_states, stream_msg->stream, stream_state); + break; + } + case BT_MESSAGE_TYPE_PACKET_BEGINNING: + { + const struct bt_message_packet *packet_msg = + (const void *) msg; + struct auto_seek_stream_state *stream_state; + + /* Update stream's state: packet began. */ + stream_state = g_hash_table_lookup(stream_states, packet_msg->packet->stream); + BT_ASSERT(stream_state); + + BT_ASSERT(stream_state->state == AUTO_SEEK_STREAM_STATE_STREAM_BEGAN); + stream_state->state = AUTO_SEEK_STREAM_STATE_PACKET_BEGAN; + BT_ASSERT(!stream_state->packet); + stream_state->packet = packet_msg->packet; + + if (packet_msg->packet->stream->class->packets_have_beginning_default_clock_snapshot) { + stream_state->seen_clock_snapshot = true; + } + + break; + } + case BT_MESSAGE_TYPE_EVENT: + { + const struct bt_message_event *event_msg = (const void *) msg; + struct auto_seek_stream_state *stream_state; + + stream_state = g_hash_table_lookup(stream_states, + event_msg->event->packet->stream); + BT_ASSERT(stream_state); + + // HELPME: are we sure that event messages have clock snapshots at this point? + stream_state->seen_clock_snapshot = true; + + break; + } + case BT_MESSAGE_TYPE_PACKET_END: + { + const struct bt_message_packet *packet_msg = + (const void *) msg; + struct auto_seek_stream_state *stream_state; + + /* Update stream's state: packet ended. */ + stream_state = g_hash_table_lookup(stream_states, packet_msg->packet->stream); + BT_ASSERT(stream_state); + + BT_ASSERT(stream_state->state == AUTO_SEEK_STREAM_STATE_PACKET_BEGAN); + stream_state->state = AUTO_SEEK_STREAM_STATE_STREAM_BEGAN; + BT_ASSERT(stream_state->packet); + stream_state->packet = NULL; + + if (packet_msg->packet->stream->class->packets_have_end_default_clock_snapshot) { + stream_state->seen_clock_snapshot = true; + } + + break; + } + case BT_MESSAGE_TYPE_STREAM_END: + { + const struct bt_message_stream *stream_msg = (const void *) msg; + struct auto_seek_stream_state *stream_state; + + stream_state = g_hash_table_lookup(stream_states, stream_msg->stream); + BT_ASSERT(stream_state); + BT_ASSERT(stream_state->state == AUTO_SEEK_STREAM_STATE_STREAM_BEGAN); + BT_ASSERT(!stream_state->packet); + + /* Update stream's state: this stream doesn't exist anymore. */ + g_hash_table_remove(stream_states, stream_msg->stream); + break; + } + case BT_MESSAGE_TYPE_DISCARDED_EVENTS: + case BT_MESSAGE_TYPE_DISCARDED_PACKETS: + { + const struct bt_message_discarded_items *discarded_msg = + (const void *) msg; + struct auto_seek_stream_state *stream_state; + + stream_state = g_hash_table_lookup(stream_states, discarded_msg->stream); + BT_ASSERT(stream_state); + + if ((msg->type == BT_MESSAGE_TYPE_DISCARDED_EVENTS && discarded_msg->stream->class->discarded_events_have_default_clock_snapshots) || + (msg->type == BT_MESSAGE_TYPE_DISCARDED_PACKETS && discarded_msg->stream->class->discarded_packets_have_default_clock_snapshots)) { + stream_state->seen_clock_snapshot = true; + } + + break; + } + default: + break; + } + bt_object_put_no_null_check(msg); msg = NULL; goto end; push_msg: - g_queue_push_tail(iterator->auto_seek_msgs, (void *) msg); + g_queue_push_tail(iterator->auto_seek.msgs, (void *) msg); msg = NULL; end: - BT_ASSERT(!msg || status != BT_MESSAGE_ITERATOR_STATUS_OK); + BT_ASSERT(!msg || status != BT_FUNC_STATUS_OK); return status; } static -enum bt_message_iterator_status find_message_ge_ns_from_origin( +int find_message_ge_ns_from_origin( struct bt_self_component_port_input_message_iterator *iterator, - int64_t ns_from_origin) + int64_t ns_from_origin, GHashTable *stream_states) { int status; enum bt_self_component_port_input_message_iterator_state init_state = @@ -1179,11 +1655,16 @@ enum bt_message_iterator_status find_message_ge_ns_from_origin( * Call the user's "next" method to get the next * messages and status. */ - BT_LOGD_STR("Calling user's \"next\" method."); - status = iterator->methods.next(iterator, + status = call_iterator_next_method(iterator, &messages[0], MSG_BATCH_SIZE, &user_count); BT_LOGD("User method returned: status=%s", - bt_message_iterator_status_string(status)); + bt_common_func_status_string(status)); + if (status < 0) { + BT_LIB_LOGW_APPEND_CAUSE( + "Component input port message iterator's \"next\" method failed: " + "%![iter-]+i, status=%s", + iterator, bt_common_func_status_string(status)); + } /* * The user's "next" method must not do any action which @@ -1193,16 +1674,16 @@ enum bt_message_iterator_status find_message_ge_ns_from_origin( BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE); switch (status) { - case BT_MESSAGE_ITERATOR_STATUS_OK: - BT_ASSERT_PRE(user_count <= MSG_BATCH_SIZE, + case BT_FUNC_STATUS_OK: + BT_ASSERT_POST(user_count <= MSG_BATCH_SIZE, "Invalid returned message count: greater than " "batch size: count=%" PRIu64 ", batch-size=%u", user_count, MSG_BATCH_SIZE); break; - case BT_MESSAGE_ITERATOR_STATUS_AGAIN: - case BT_MESSAGE_ITERATOR_STATUS_ERROR: - case BT_MESSAGE_ITERATOR_STATUS_NOMEM: - case BT_MESSAGE_ITERATOR_STATUS_END: + case BT_FUNC_STATUS_AGAIN: + case BT_FUNC_STATUS_ERROR: + case BT_FUNC_STATUS_MEMORY_ERROR: + case BT_FUNC_STATUS_END: goto end; default: abort(); @@ -1210,15 +1691,16 @@ enum bt_message_iterator_status find_message_ge_ns_from_origin( for (i = 0; i < user_count; i++) { if (got_first) { - g_queue_push_tail(iterator->auto_seek_msgs, + g_queue_push_tail(iterator->auto_seek.msgs, (void *) messages[i]); messages[i] = NULL; continue; } status = auto_seek_handle_message(iterator, - ns_from_origin, messages[i], &got_first); - if (status == BT_MESSAGE_ITERATOR_STATUS_OK) { + ns_from_origin, messages[i], &got_first, + stream_states); + if (status == BT_FUNC_STATUS_OK) { /* Message was either pushed or moved */ messages[i] = NULL; } else { @@ -1238,64 +1720,65 @@ end: return status; } +/* + * This function is installed as the iterator's next callback after we have + * auto-seeked (seeked to the beginning and fast-forwarded) to send the + * messages saved in iterator->auto_seek.msgs. Once this is done, the original + * next callback is put back. + */ + static -enum bt_self_message_iterator_status post_auto_seek_next( +enum bt_component_class_message_iterator_next_method_status post_auto_seek_next( struct bt_self_component_port_input_message_iterator *iterator, bt_message_array_const msgs, uint64_t capacity, uint64_t *count) { - BT_ASSERT(!g_queue_is_empty(iterator->auto_seek_msgs)); + BT_ASSERT(!g_queue_is_empty(iterator->auto_seek.msgs)); *count = 0; /* * Move auto-seek messages to the output array (which is this * iterator's base message array). */ - while (capacity > 0 && !g_queue_is_empty(iterator->auto_seek_msgs)) { - msgs[*count] = g_queue_pop_head(iterator->auto_seek_msgs); + while (capacity > 0 && !g_queue_is_empty(iterator->auto_seek.msgs)) { + msgs[*count] = g_queue_pop_head(iterator->auto_seek.msgs); capacity--; (*count)++; } BT_ASSERT(*count > 0); - if (g_queue_is_empty(iterator->auto_seek_msgs)) { - /* No more auto-seek messages */ - switch (iterator->upstream_component->class->type) { - case BT_COMPONENT_CLASS_TYPE_SOURCE: - { - struct bt_component_class_source *src_comp_cls = - (void *) iterator->upstream_component->class; + if (g_queue_is_empty(iterator->auto_seek.msgs)) { + /* No more auto-seek messages, restore user's next callback. */ + BT_ASSERT(iterator->auto_seek.original_next_callback); + iterator->methods.next = iterator->auto_seek.original_next_callback; + iterator->auto_seek.original_next_callback = NULL; + } - iterator->methods.next = - (bt_self_component_port_input_message_iterator_next_method) - src_comp_cls->methods.msg_iter_next; - break; - } - case BT_COMPONENT_CLASS_TYPE_FILTER: - { - struct bt_component_class_filter *flt_comp_cls = - (void *) iterator->upstream_component->class; + return BT_FUNC_STATUS_OK; +} - iterator->methods.next = - (bt_self_component_port_input_message_iterator_next_method) - flt_comp_cls->methods.msg_iter_next; - break; - } - default: - abort(); - } - } +static inline +int clock_raw_value_from_ns_from_origin(const bt_clock_class *clock_class, + int64_t ns_from_origin, uint64_t *raw_value) +{ - return BT_SELF_MESSAGE_ITERATOR_STATUS_OK; + int64_t cc_offset_s = clock_class->offset_seconds; + uint64_t cc_offset_cycles = clock_class->offset_cycles; + uint64_t cc_freq = clock_class->frequency; + + return bt_common_clock_value_from_ns_from_origin(cc_offset_s, + cc_offset_cycles, cc_freq, ns_from_origin, raw_value); } -enum bt_message_iterator_status + +enum bt_message_iterator_seek_ns_from_origin_status bt_self_component_port_input_message_iterator_seek_ns_from_origin( struct bt_self_component_port_input_message_iterator *iterator, int64_t ns_from_origin) { int status; + GHashTable *stream_states = NULL; BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); BT_ASSERT_PRE_ITER_HAS_STATE_TO_SEEK(iterator); @@ -1312,42 +1795,63 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin( set_self_comp_port_input_msg_iterator_state(iterator, BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_SEEKING); + /* + * We are seeking, reset our expectations about how the following + * messages should look like. + */ + reset_iterator_expectations(iterator); + if (iterator->methods.seek_ns_from_origin) { + /* The iterator knows how to seek to a particular time, let it handle this. */ BT_LIB_LOGD("Calling user's \"seek nanoseconds from origin\" method: " "%![iter-]+i, ns=%" PRId64, iterator, ns_from_origin); status = iterator->methods.seek_ns_from_origin(iterator, ns_from_origin); BT_LOGD("User method returned: status=%s", - bt_message_iterator_status_string(status)); - BT_ASSERT_PRE(status == BT_MESSAGE_ITERATOR_STATUS_OK || - status == BT_MESSAGE_ITERATOR_STATUS_ERROR || - status == BT_MESSAGE_ITERATOR_STATUS_NOMEM || - status == BT_MESSAGE_ITERATOR_STATUS_AGAIN, + bt_common_func_status_string(status)); + BT_ASSERT_POST(status == BT_FUNC_STATUS_OK || + status == BT_FUNC_STATUS_ERROR || + status == BT_FUNC_STATUS_MEMORY_ERROR || + status == BT_FUNC_STATUS_AGAIN, "Unexpected status: %![iter-]+i, status=%s", - iterator, - bt_common_self_message_iterator_status_string(status)); + iterator, bt_common_func_status_string(status)); + if (status < 0) { + BT_LIB_LOGW_APPEND_CAUSE( + "Component input port message iterator's \"seek nanoseconds from origin\" method failed: " + "%![iter-]+i, status=%s", + iterator, bt_common_func_status_string(status)); + } } else { - /* Start automatic seeking: seek beginning first */ + /* + * The iterator doesn't know how to seek to a particular time. We will + * seek to the beginning and fast forward to the right place. + */ BT_ASSERT(iterator->methods.can_seek_beginning(iterator)); BT_ASSERT(iterator->methods.seek_beginning); BT_LIB_LOGD("Calling user's \"seek beginning\" method: %!+i", iterator); status = iterator->methods.seek_beginning(iterator); BT_LOGD("User method returned: status=%s", - bt_message_iterator_status_string(status)); - BT_ASSERT_PRE(status == BT_MESSAGE_ITERATOR_STATUS_OK || - status == BT_MESSAGE_ITERATOR_STATUS_ERROR || - status == BT_MESSAGE_ITERATOR_STATUS_NOMEM || - status == BT_MESSAGE_ITERATOR_STATUS_AGAIN, + bt_common_func_status_string(status)); + BT_ASSERT_POST(status == BT_FUNC_STATUS_OK || + status == BT_FUNC_STATUS_ERROR || + status == BT_FUNC_STATUS_MEMORY_ERROR || + status == BT_FUNC_STATUS_AGAIN, "Unexpected status: %![iter-]+i, status=%s", - iterator, - bt_common_self_message_iterator_status_string(status)); + iterator, bt_common_func_status_string(status)); + if (status < 0) { + BT_LIB_LOGW_APPEND_CAUSE( + "Component input port message iterator's \"seek beginning\" method failed: " + "%![iter-]+i, status=%s", + iterator, bt_common_func_status_string(status)); + } + switch (status) { - case BT_MESSAGE_ITERATOR_STATUS_OK: + case BT_FUNC_STATUS_OK: break; - case BT_MESSAGE_ITERATOR_STATUS_ERROR: - case BT_MESSAGE_ITERATOR_STATUS_NOMEM: - case BT_MESSAGE_ITERATOR_STATUS_AGAIN: + case BT_FUNC_STATUS_ERROR: + case BT_FUNC_STATUS_MEMORY_ERROR: + case BT_FUNC_STATUS_AGAIN: goto end; default: abort(); @@ -1360,47 +1864,156 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin( * this point in the batch to this iterator's auto-seek * message queue. */ - while (!g_queue_is_empty(iterator->auto_seek_msgs)) { + while (!g_queue_is_empty(iterator->auto_seek.msgs)) { bt_object_put_no_null_check( - g_queue_pop_tail(iterator->auto_seek_msgs)); + g_queue_pop_tail(iterator->auto_seek.msgs)); + } + + stream_states = create_auto_seek_stream_states(); + if (!stream_states) { + BT_LIB_LOGE_APPEND_CAUSE( + "Failed to allocate one GHashTable."); + status = BT_FUNC_STATUS_MEMORY_ERROR; + goto end; } status = find_message_ge_ns_from_origin(iterator, - ns_from_origin); + ns_from_origin, stream_states); switch (status) { - case BT_MESSAGE_ITERATOR_STATUS_OK: - case BT_MESSAGE_ITERATOR_STATUS_END: + case BT_FUNC_STATUS_OK: + case BT_FUNC_STATUS_END: + { + GHashTableIter iter; + gpointer key, value; + + /* + * If some streams exist at the seek time, prepend the + * required messages to put those streams in the right + * state. + */ + g_hash_table_iter_init(&iter, stream_states); + while (g_hash_table_iter_next (&iter, &key, &value)) { + const bt_stream *stream = key; + struct auto_seek_stream_state *stream_state = + (struct auto_seek_stream_state *) value; + bt_message *msg; + const bt_clock_class *clock_class = bt_stream_class_borrow_default_clock_class_const( + bt_stream_borrow_class_const(stream)); + /* Initialize to silence maybe-uninitialized warning. */ + uint64_t raw_value = 0; + + /* + * If we haven't seen a message with a clock snapshot, we don't know if our seek time is within + * the clock's range, so it wouldn't be safe to try to convert ns_from_origin to a clock value. + * + * Also, it would be a bit of a lie to generate a stream begin message with the seek time as its + * clock snapshot, because we don't really know if the stream existed at that time. If we have + * seen a message with a clock snapshot in our seeking, then we are sure that the + * seek time is not below the clock range, and we know the stream was active at that + * time (and that we cut it short). + */ + if (stream_state->seen_clock_snapshot) { + if (clock_raw_value_from_ns_from_origin(clock_class, ns_from_origin, &raw_value) != 0) { + BT_LIB_LOGW("Could not convert nanoseconds from origin to clock value: ns-from-origin=%" PRId64 ", %![cc-]+K", + ns_from_origin, clock_class); + status = BT_FUNC_STATUS_ERROR; + goto end; + } + } + + switch (stream_state->state) { + case AUTO_SEEK_STREAM_STATE_PACKET_BEGAN: + BT_ASSERT(stream_state->packet); + BT_LIB_LOGD("Creating packet message: %![packet-]+a", stream_state->packet); + + if (stream->class->packets_have_beginning_default_clock_snapshot) { + /* + * If we are in the PACKET_BEGAN state, it means we have seen a "packet beginning" + * message. If "packet beginning" packets have clock snapshots, then we must have + * seen a clock snapshot. + */ + BT_ASSERT(stream_state->seen_clock_snapshot); + + msg = bt_message_packet_beginning_create_with_default_clock_snapshot( + (bt_self_message_iterator *) iterator, stream_state->packet, raw_value); + } else { + msg = bt_message_packet_beginning_create((bt_self_message_iterator *) iterator, + stream_state->packet); + } + + if (!msg) { + status = BT_FUNC_STATUS_MEMORY_ERROR; + goto end; + } + + g_queue_push_head(iterator->auto_seek.msgs, msg); + msg = NULL; + /* fall-thru */ + + case AUTO_SEEK_STREAM_STATE_STREAM_BEGAN: + msg = bt_message_stream_beginning_create( + (bt_self_message_iterator *) iterator, stream); + if (!msg) { + status = BT_FUNC_STATUS_MEMORY_ERROR; + goto end; + } + + if (stream_state->seen_clock_snapshot) { + bt_message_stream_beginning_set_default_clock_snapshot(msg, raw_value); + } + + g_queue_push_head(iterator->auto_seek.msgs, msg); + msg = NULL; + break; + } + } + /* * If there are messages in the auto-seek * message queue, replace the user's "next" * method with a custom, temporary "next" method * which returns them. */ - if (!g_queue_is_empty(iterator->auto_seek_msgs)) { + if (!g_queue_is_empty(iterator->auto_seek.msgs)) { + BT_ASSERT(!iterator->auto_seek.original_next_callback); + iterator->auto_seek.original_next_callback = iterator->methods.next; + iterator->methods.next = (bt_self_component_port_input_message_iterator_next_method) post_auto_seek_next; } /* - * `BT_MESSAGE_ITERATOR_STATUS_END` becomes - * `BT_MESSAGE_ITERATOR_STATUS_OK`: the next + * `BT_FUNC_STATUS_END` becomes + * `BT_FUNC_STATUS_OK`: the next * time this iterator's "next" method is called, * it will return - * `BT_MESSAGE_ITERATOR_STATUS_END`. + * `BT_FUNC_STATUS_END`. */ - status = BT_MESSAGE_ITERATOR_STATUS_OK; + status = BT_FUNC_STATUS_OK; break; - case BT_MESSAGE_ITERATOR_STATUS_ERROR: - case BT_MESSAGE_ITERATOR_STATUS_NOMEM: - case BT_MESSAGE_ITERATOR_STATUS_AGAIN: + } + case BT_FUNC_STATUS_ERROR: + case BT_FUNC_STATUS_MEMORY_ERROR: + case BT_FUNC_STATUS_AGAIN: goto end; default: abort(); } } + /* + * The following messages returned by the next method (including + * post_auto_seek_next) must be after (or at) `ns_from_origin`. + */ + iterator->last_ns_from_origin = ns_from_origin; + end: + if (stream_states) { + destroy_auto_seek_stream_states(stream_states); + stream_states = NULL; + } + set_iterator_state_after_seeking(iterator, status); return status; } @@ -1438,7 +2051,8 @@ bt_bool bt_port_output_message_iterator_can_seek_beginning( iterator)); } -enum bt_message_iterator_status bt_port_output_message_iterator_seek_ns_from_origin( +enum bt_message_iterator_seek_ns_from_origin_status +bt_port_output_message_iterator_seek_ns_from_origin( struct bt_port_output_message_iterator *iterator, int64_t ns_from_origin) { @@ -1448,7 +2062,8 @@ enum bt_message_iterator_status bt_port_output_message_iterator_seek_ns_from_ori ns_from_origin); } -enum bt_message_iterator_status bt_port_output_message_iterator_seek_beginning( +enum bt_message_iterator_seek_beginning_status +bt_port_output_message_iterator_seek_beginning( struct bt_port_output_message_iterator *iterator) { BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");