X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Flib%2Fgraph%2Fiterator.c;h=a53a89d8f31e4c0628255a7450462f23c9b5c2c1;hb=5b7b55be46763e1786bb0be0e188c21e669f81b7;hp=aac7bfc197872f737b7e3b32af5f94a8e615a451;hpb=572075a867d130d6f997188a8c01aff6337f0bd7;p=babeltrace.git diff --git a/src/lib/graph/iterator.c b/src/lib/graph/iterator.c index aac7bfc1..a53a89d8 100644 --- a/src/lib/graph/iterator.c +++ b/src/lib/graph/iterator.c @@ -41,8 +41,14 @@ #include #include #include +#include +#include #include #include +#include +#include +#include +#include #include #include #include @@ -948,11 +954,83 @@ bt_self_component_port_input_message_iterator_seek_beginning( return status; } + +/* + * Structure used to record the state of a given stream during the fast-forward + * phase of an auto-seek. + */ +struct auto_seek_stream_state { + /* + * Value representing which step of this timeline we are at. + * + * time ---> + * [SB] 1 [SAB] 2 [PB] 3 [PE] 2 [SAE] 1 [SE] + * + * At each point in the timeline, the messages we need to replicate are: + * + * 1: Stream beginning + * 2: Stream beginning, stream activity beginning + * 3: Stream beginning, stream activity beginning, packet beginning + * + * Before "Stream beginning" and after "Stream end", we don't need to + * replicate anything as the stream doesn't exist. + */ + enum { + AUTO_SEEK_STREAM_STATE_STREAM_BEGAN, + AUTO_SEEK_STREAM_STATE_STREAM_ACTIVITY_BEGAN, + AUTO_SEEK_STREAM_STATE_PACKET_BEGAN, + } state; + + /* + * If `state` is AUTO_SEEK_STREAM_STATE_PACKET_BEGAN, the packet we are + * in. This is a weak reference, since the packet will always be + * alive by the time we use it. + */ + struct bt_packet *packet; +}; + +static +struct auto_seek_stream_state *create_auto_seek_stream_state(void) +{ + return g_new0(struct auto_seek_stream_state, 1); +} + +static +void destroy_auto_seek_stream_state(void *ptr) +{ + g_free(ptr); +} + +static +GHashTable *create_auto_seek_stream_states(void) +{ + return g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, + destroy_auto_seek_stream_state); +} + +static +void destroy_auto_seek_stream_states(GHashTable *stream_states) +{ + g_hash_table_destroy(stream_states); +} + +/* + * Handle one message while we are in the fast-forward phase of an auto-seek. + * + * Sets `*got_first` to true if the message's timestamp is greater or equal to + * `ns_from_origin`. In other words, if this is the first message after our + * seek point. + * + * `stream_states` is an hash table of `bt_stream *` (weak reference) to + * `struct auto_seek_stream_state` used to keep the state of each stream + * during the fast-forward. + */ + static inline enum bt_message_iterator_status auto_seek_handle_message( struct bt_self_component_port_input_message_iterator *iterator, int64_t ns_from_origin, const struct bt_message *msg, - bool *got_first) + bool *got_first, GHashTable *stream_states) { enum bt_message_iterator_status status = BT_MESSAGE_ITERATOR_STATUS_OK; int64_t msg_ns_from_origin; @@ -1136,6 +1214,106 @@ enum bt_message_iterator_status auto_seek_handle_message( } skip_msg: + /* This message won't be sent downstream. */ + switch (msg->type) { + case BT_MESSAGE_TYPE_STREAM_BEGINNING: + { + const struct bt_message_stream *stream_msg = (const void *) msg; + struct auto_seek_stream_state *stream_state; + gboolean did_not_exist; + + /* Update stream's state: stream began. */ + stream_state = create_auto_seek_stream_state(); + if (!stream_state) { + status = BT_MESSAGE_ITERATOR_STATUS_NOMEM; + goto end; + } + + stream_state->state = AUTO_SEEK_STREAM_STATE_STREAM_BEGAN; + did_not_exist = g_hash_table_insert(stream_states, stream_msg->stream, stream_state); + BT_ASSERT(did_not_exist); + break; + } + case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING: + { + const struct bt_message_stream_activity *stream_act_msg = + (const void *) msg; + struct auto_seek_stream_state *stream_state; + + /* Update stream's state: stream activity began. */ + stream_state = g_hash_table_lookup(stream_states, stream_act_msg->stream); + BT_ASSERT(stream_state); + + BT_ASSERT(stream_state->state == AUTO_SEEK_STREAM_STATE_STREAM_BEGAN); + stream_state->state = AUTO_SEEK_STREAM_STATE_STREAM_ACTIVITY_BEGAN; + BT_ASSERT(!stream_state->packet); + break; + } + case BT_MESSAGE_TYPE_PACKET_BEGINNING: + { + const struct bt_message_packet *packet_msg = + (const void *) msg; + struct auto_seek_stream_state *stream_state; + + /* Update stream's state: packet began. */ + stream_state = g_hash_table_lookup(stream_states, packet_msg->packet->stream); + BT_ASSERT(stream_state); + + BT_ASSERT(stream_state->state == AUTO_SEEK_STREAM_STATE_STREAM_ACTIVITY_BEGAN); + stream_state->state = AUTO_SEEK_STREAM_STATE_PACKET_BEGAN; + BT_ASSERT(!stream_state->packet); + stream_state->packet = packet_msg->packet; + break; + } + case BT_MESSAGE_TYPE_PACKET_END: + { + const struct bt_message_packet *packet_msg = + (const void *) msg; + struct auto_seek_stream_state *stream_state; + + /* Update stream's state: packet ended. */ + stream_state = g_hash_table_lookup(stream_states, packet_msg->packet->stream); + BT_ASSERT(stream_state); + + BT_ASSERT(stream_state->state == AUTO_SEEK_STREAM_STATE_PACKET_BEGAN); + stream_state->state = AUTO_SEEK_STREAM_STATE_STREAM_ACTIVITY_BEGAN; + BT_ASSERT(stream_state->packet); + stream_state->packet = NULL; + break; + } + case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END: + { + const struct bt_message_stream_activity *stream_act_msg = + (const void *) msg; + struct auto_seek_stream_state *stream_state; + + /* Update stream's state: stream activity ended. */ + stream_state = g_hash_table_lookup(stream_states, stream_act_msg->stream); + BT_ASSERT(stream_state); + + BT_ASSERT(stream_state->state == AUTO_SEEK_STREAM_STATE_STREAM_ACTIVITY_BEGAN); + stream_state->state = AUTO_SEEK_STREAM_STATE_STREAM_BEGAN; + BT_ASSERT(!stream_state->packet); + break; + } + case BT_MESSAGE_TYPE_STREAM_END: + { + const struct bt_message_stream *stream_msg = (const void *) msg; + struct auto_seek_stream_state *stream_state; + + stream_state = g_hash_table_lookup(stream_states, stream_msg->stream); + BT_ASSERT(stream_state); + BT_ASSERT(stream_state->state == AUTO_SEEK_STREAM_STATE_STREAM_BEGAN); + BT_ASSERT(!stream_state->packet); + + /* Update stream's state: this stream doesn't exist anymore. */ + g_hash_table_remove(stream_states, stream_msg->stream); + break; + } + default: + break; + } + bt_object_put_no_null_check(msg); msg = NULL; goto end; @@ -1152,7 +1330,7 @@ end: static enum bt_message_iterator_status find_message_ge_ns_from_origin( struct bt_self_component_port_input_message_iterator *iterator, - int64_t ns_from_origin) + int64_t ns_from_origin, GHashTable *stream_states) { int status; enum bt_self_component_port_input_message_iterator_state init_state = @@ -1217,7 +1395,8 @@ enum bt_message_iterator_status find_message_ge_ns_from_origin( } status = auto_seek_handle_message(iterator, - ns_from_origin, messages[i], &got_first); + ns_from_origin, messages[i], &got_first, + stream_states); if (status == BT_MESSAGE_ITERATOR_STATUS_OK) { /* Message was either pushed or moved */ messages[i] = NULL; @@ -1238,6 +1417,13 @@ end: return status; } +/* + * This function is installed as the iterator's next callback after we have + * auto-seeked (seeked to the beginning and fast-forwarded) to send the + * messages saved in iterator->auto_seek.msgs. Once this is done, the original + * next callback is put back. + */ + static enum bt_self_message_iterator_status post_auto_seek_next( struct bt_self_component_port_input_message_iterator *iterator, @@ -1269,12 +1455,27 @@ enum bt_self_message_iterator_status post_auto_seek_next( return BT_SELF_MESSAGE_ITERATOR_STATUS_OK; } +static inline +int clock_raw_value_from_ns_from_origin(const bt_clock_class *clock_class, + int64_t ns_from_origin, uint64_t *raw_value) +{ + + int64_t cc_offset_s = clock_class->offset_seconds; + uint64_t cc_offset_cycles = clock_class->offset_cycles; + uint64_t cc_freq = clock_class->frequency; + + return bt_common_clock_value_from_ns_from_origin(cc_offset_s, + cc_offset_cycles, cc_freq, ns_from_origin, raw_value); +} + + enum bt_message_iterator_status bt_self_component_port_input_message_iterator_seek_ns_from_origin( struct bt_self_component_port_input_message_iterator *iterator, int64_t ns_from_origin) { int status; + GHashTable *stream_states = NULL; BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); BT_ASSERT_PRE_ITER_HAS_STATE_TO_SEEK(iterator); @@ -1292,6 +1493,7 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin( BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_SEEKING); if (iterator->methods.seek_ns_from_origin) { + /* The iterator knows how to seek to a particular time, let it handle this. */ BT_LIB_LOGD("Calling user's \"seek nanoseconds from origin\" method: " "%![iter-]+i, ns=%" PRId64, iterator, ns_from_origin); status = iterator->methods.seek_ns_from_origin(iterator, @@ -1306,7 +1508,10 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin( iterator, bt_common_self_message_iterator_status_string(status)); } else { - /* Start automatic seeking: seek beginning first */ + /* + * The iterator doesn't know how to seek to a particular time. We will + * seek to the beginning and fast forward to the right place. + */ BT_ASSERT(iterator->methods.can_seek_beginning(iterator)); BT_ASSERT(iterator->methods.seek_beginning); BT_LIB_LOGD("Calling user's \"seek beginning\" method: %!+i", @@ -1344,11 +1549,85 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin( g_queue_pop_tail(iterator->auto_seek.msgs)); } + stream_states = create_auto_seek_stream_states(); + if (!stream_states) { + BT_LOGE_STR("Failed to allocate one GHashTable."); + status = BT_MESSAGE_ITERATOR_STATUS_NOMEM; + goto end; + } + status = find_message_ge_ns_from_origin(iterator, - ns_from_origin); + ns_from_origin, stream_states); switch (status) { case BT_MESSAGE_ITERATOR_STATUS_OK: case BT_MESSAGE_ITERATOR_STATUS_END: + { + GHashTableIter iter; + gpointer key, value; + + /* + * If some streams exist at the seek time, prepend the + * required messages to put those streams in the right + * state. + */ + g_hash_table_iter_init(&iter, stream_states); + while (g_hash_table_iter_next (&iter, &key, &value)) { + const bt_stream *stream = key; + struct auto_seek_stream_state *stream_state = + (struct auto_seek_stream_state *) value; + bt_message *msg; + const bt_clock_class *clock_class = bt_stream_class_borrow_default_clock_class_const( + bt_stream_borrow_class_const(stream)); + uint64_t raw_value; + + if (clock_raw_value_from_ns_from_origin(clock_class, ns_from_origin, &raw_value) != 0) { + BT_LIB_LOGW("Could not convert nanoseconds from origin to clock value: ns-from-origin=%" PRId64 ", %![cc-]+K", + ns_from_origin, clock_class); + status = BT_MESSAGE_ITERATOR_STATUS_ERROR; + goto end; + } + + switch (stream_state->state) { + case AUTO_SEEK_STREAM_STATE_PACKET_BEGAN: + BT_ASSERT(stream_state->packet); + BT_LIB_LOGD("Creating packet message: %![packet-]+a", stream_state->packet); + msg = bt_message_packet_beginning_create_with_default_clock_snapshot( + (bt_self_message_iterator *) iterator, stream_state->packet, raw_value); + if (!msg) { + status = BT_MESSAGE_ITERATOR_STATUS_NOMEM; + goto end; + } + + g_queue_push_head(iterator->auto_seek.msgs, msg); + msg = NULL; + /* fall-thru */ + case AUTO_SEEK_STREAM_STATE_STREAM_ACTIVITY_BEGAN: + msg = bt_message_stream_activity_beginning_create( + (bt_self_message_iterator *) iterator, stream); + if (!msg) { + status = BT_MESSAGE_ITERATOR_STATUS_NOMEM; + goto end; + } + + bt_message_stream_activity_beginning_set_default_clock_snapshot(msg, raw_value); + + g_queue_push_head(iterator->auto_seek.msgs, msg); + msg = NULL; + /* fall-thru */ + case AUTO_SEEK_STREAM_STATE_STREAM_BEGAN: + msg = bt_message_stream_beginning_create( + (bt_self_message_iterator *) iterator, stream); + if (!msg) { + status = BT_MESSAGE_ITERATOR_STATUS_NOMEM; + goto end; + } + + g_queue_push_head(iterator->auto_seek.msgs, msg); + msg = NULL; + break; + } + } + /* * If there are messages in the auto-seek * message queue, replace the user's "next" @@ -1373,6 +1652,7 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin( */ status = BT_MESSAGE_ITERATOR_STATUS_OK; break; + } case BT_MESSAGE_ITERATOR_STATUS_ERROR: case BT_MESSAGE_ITERATOR_STATUS_NOMEM: case BT_MESSAGE_ITERATOR_STATUS_AGAIN: @@ -1383,6 +1663,10 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin( } end: + if (stream_states) { + destroy_auto_seek_stream_states(stream_states); + stream_states = NULL; + } set_iterator_state_after_seeking(iterator, status); return status; }