lib: add logic in auto-seek to recreate stream state
authorSimon Marchi <simon.marchi@efficios.com>
Mon, 17 Jun 2019 16:58:31 +0000 (12:58 -0400)
committerPhilippe Proulx <eeppeliteloop@gmail.com>
Thu, 27 Jun 2019 19:06:29 +0000 (15:06 -0400)
The trimmer component uses the seek_ns_from_origin iterator method to
seek its input iterator to the beginning of its trim window.  After,
that, it's possible for the first message after the seek time to be, for
example, an event message.  If so, it needs to re-create some messages
(possibly stream begin, stream activity begin and packet begin) to put
the stream in the right state.  Otherwise, it would emit messages in an
invalid sequence.

A problem arises however with the following sequence:

1. The trim window starts at timestamp 50
2. Trimmer gets event message with timestamp 100 on stream 0. The
   trimmer will send the following messages: stream 0 beginning, stream
   0 activity beginning at timestamp 50, packet beginning message on
   stream 0 at timestamp 50, event message on stream 0 at timestamp 100.
3. Trimmer gets event message with timestamp 120 on stream 1. The
   trimmer will send the following messages: stream 1 beginning, stream
   1 activity beginning at timestamp 50, packet beginning message on
   stream 0 at timestamp 50, event message on stream 1 at timestamp 120.

The "stream 1 activity beginning at timestamp 50" message is invalid,
because it is earlier than the previously sent message "event message on
stream 0 at timestamp 100".

To avoid this going back in time, we need to emit the made up messages
first thing after seeking, instead of waiting for an event message (or
some other message requiring a stream and/or packet to be open) to emit
them.  The trimmer can't do that itself, because it doesn't know the
state of the streams right after seeking (it doesn't know what messages
would have come before the seek point).

Therefore, the patch moves the functionality of emitting the missing
messages to the iterator code.  When seeking (and using the go to
beginning then fast forward strategy, AKA auto-seek), the iterator code
goes through (and drops) all messages before the seek point.  It can
therefore maintain a table of the state of all the streams.  Once it has
reached the seek time, it can generate the messages required to bring
the streams in the state they are supposed to be, before transmitting
the messages coming from upstream.

Moving this functionality to the iterator also has the advantage that if
other components use the seek_ns_from_origin method of the iterator,
they'll receive a valid flow of messages.  They won't need to do the
same complex work that the trimmer was doing to try re-create messages.

Since the trimmer can now assume a valid message flow from its input
iterator, it doesn't need anymore to generate its own stream beginning,
stream activity beginning and packet beginning messages before
transmitting a message that is supposed to come after one of these.
This means that the ensure_stream_state_is_inited and
ensure_cur_packet_exists functions can be removed.

Change-Id: I548f6ceafa5ad315c65be48077898595882e4511
Signed-off-by: Simon Marchi <simon.marchi@efficios.com>
Reviewed-on: https://review.lttng.org/c/babeltrace/+/1450
CI-Build: Philippe Proulx <eeppeliteloop@gmail.com>
Reviewed-by: Philippe Proulx <eeppeliteloop@gmail.com>
src/lib/graph/iterator.c
src/plugins/utils/trimmer/trimmer.c

index aac7bfc197872f737b7e3b32af5f94a8e615a451..a53a89d8f31e4c0628255a7450462f23c9b5c2c1 100644 (file)
 #include <babeltrace2/graph/self-component-port-input-message-iterator.h>
 #include <babeltrace2/graph/port-output-message-iterator.h>
 #include <babeltrace2/graph/message-event-const.h>
+#include <babeltrace2/graph/message-message-iterator-inactivity-const.h>
+#include <babeltrace2/graph/message-packet-beginning.h>
 #include <babeltrace2/graph/message-packet-beginning-const.h>
 #include <babeltrace2/graph/message-packet-end-const.h>
+#include <babeltrace2/graph/message-stream-activity-beginning.h>
+#include <babeltrace2/graph/message-stream-activity-beginning-const.h>
+#include <babeltrace2/graph/message-stream-activity-end-const.h>
+#include <babeltrace2/graph/message-stream-beginning.h>
 #include <babeltrace2/graph/message-stream-beginning-const.h>
 #include <babeltrace2/graph/message-stream-end-const.h>
 #include <babeltrace2/graph/port-const.h>
@@ -948,11 +954,83 @@ bt_self_component_port_input_message_iterator_seek_beginning(
        return status;
 }
 
+
+/*
+ * Structure used to record the state of a given stream during the fast-forward
+ * phase of an auto-seek.
+ */
+struct auto_seek_stream_state {
+       /*
+        * Value representing which step of this timeline we are at.
+        *
+        *      time --->
+        *   [SB]  1  [SAB]  2  [PB]  3  [PE]  2  [SAE]  1  [SE]
+        *
+        * At each point in the timeline, the messages we need to replicate are:
+        *
+        *   1: Stream beginning
+        *   2: Stream beginning, stream activity beginning
+        *   3: Stream beginning, stream activity beginning, packet beginning
+        *
+        * Before "Stream beginning" and after "Stream end", we don't need to
+        * replicate anything as the stream doesn't exist.
+        */
+       enum {
+               AUTO_SEEK_STREAM_STATE_STREAM_BEGAN,
+               AUTO_SEEK_STREAM_STATE_STREAM_ACTIVITY_BEGAN,
+               AUTO_SEEK_STREAM_STATE_PACKET_BEGAN,
+       } state;
+
+       /*
+        * If `state` is AUTO_SEEK_STREAM_STATE_PACKET_BEGAN, the packet we are
+        * in.  This is a weak reference, since the packet will always be
+        * alive by the time we use it.
+        */
+       struct bt_packet *packet;
+};
+
+static
+struct auto_seek_stream_state *create_auto_seek_stream_state(void)
+{
+       return g_new0(struct auto_seek_stream_state, 1);
+}
+
+static
+void destroy_auto_seek_stream_state(void *ptr)
+{
+       g_free(ptr);
+}
+
+static
+GHashTable *create_auto_seek_stream_states(void)
+{
+       return g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL,
+               destroy_auto_seek_stream_state);
+}
+
+static
+void destroy_auto_seek_stream_states(GHashTable *stream_states)
+{
+       g_hash_table_destroy(stream_states);
+}
+
+/*
+ * Handle one message while we are in the fast-forward phase of an auto-seek.
+ *
+ * Sets `*got_first` to true if the message's timestamp is greater or equal to
+ * `ns_from_origin`.  In other words, if this is the first message after our
+ * seek point.
+ *
+ * `stream_states` is an hash table of `bt_stream *` (weak reference) to
+ * `struct auto_seek_stream_state` used to keep the state of each stream
+ * during the fast-forward.
+ */
+
 static inline
 enum bt_message_iterator_status auto_seek_handle_message(
                struct bt_self_component_port_input_message_iterator *iterator,
                int64_t ns_from_origin, const struct bt_message *msg,
-               bool *got_first)
+               bool *got_first, GHashTable *stream_states)
 {
        enum bt_message_iterator_status status = BT_MESSAGE_ITERATOR_STATUS_OK;
        int64_t msg_ns_from_origin;
@@ -1136,6 +1214,106 @@ enum bt_message_iterator_status auto_seek_handle_message(
        }
 
 skip_msg:
+       /* This message won't be sent downstream. */
+       switch (msg->type) {
+       case BT_MESSAGE_TYPE_STREAM_BEGINNING:
+       {
+               const struct bt_message_stream *stream_msg = (const void *) msg;
+               struct auto_seek_stream_state *stream_state;
+               gboolean did_not_exist;
+
+               /* Update stream's state: stream began. */
+               stream_state = create_auto_seek_stream_state();
+               if (!stream_state) {
+                       status = BT_MESSAGE_ITERATOR_STATUS_NOMEM;
+                       goto end;
+               }
+
+               stream_state->state = AUTO_SEEK_STREAM_STATE_STREAM_BEGAN;
+               did_not_exist = g_hash_table_insert(stream_states, stream_msg->stream, stream_state);
+               BT_ASSERT(did_not_exist);
+               break;
+       }
+       case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
+       {
+               const struct bt_message_stream_activity *stream_act_msg =
+                       (const void *) msg;
+               struct auto_seek_stream_state *stream_state;
+
+               /* Update stream's state: stream activity began. */
+               stream_state = g_hash_table_lookup(stream_states, stream_act_msg->stream);
+               BT_ASSERT(stream_state);
+
+               BT_ASSERT(stream_state->state == AUTO_SEEK_STREAM_STATE_STREAM_BEGAN);
+               stream_state->state = AUTO_SEEK_STREAM_STATE_STREAM_ACTIVITY_BEGAN;
+               BT_ASSERT(!stream_state->packet);
+               break;
+       }
+       case BT_MESSAGE_TYPE_PACKET_BEGINNING:
+       {
+               const struct bt_message_packet *packet_msg =
+                       (const void *) msg;
+               struct auto_seek_stream_state *stream_state;
+
+               /* Update stream's state: packet began. */
+               stream_state = g_hash_table_lookup(stream_states, packet_msg->packet->stream);
+               BT_ASSERT(stream_state);
+
+               BT_ASSERT(stream_state->state == AUTO_SEEK_STREAM_STATE_STREAM_ACTIVITY_BEGAN);
+               stream_state->state = AUTO_SEEK_STREAM_STATE_PACKET_BEGAN;
+               BT_ASSERT(!stream_state->packet);
+               stream_state->packet = packet_msg->packet;
+               break;
+       }
+       case BT_MESSAGE_TYPE_PACKET_END:
+       {
+               const struct bt_message_packet *packet_msg =
+                       (const void *) msg;
+               struct auto_seek_stream_state *stream_state;
+
+               /* Update stream's state: packet ended. */
+               stream_state = g_hash_table_lookup(stream_states, packet_msg->packet->stream);
+               BT_ASSERT(stream_state);
+
+               BT_ASSERT(stream_state->state == AUTO_SEEK_STREAM_STATE_PACKET_BEGAN);
+               stream_state->state = AUTO_SEEK_STREAM_STATE_STREAM_ACTIVITY_BEGAN;
+               BT_ASSERT(stream_state->packet);
+               stream_state->packet = NULL;
+               break;
+       }
+       case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
+       {
+               const struct bt_message_stream_activity *stream_act_msg =
+                       (const void *) msg;
+               struct auto_seek_stream_state *stream_state;
+
+               /* Update stream's state: stream activity ended. */
+               stream_state = g_hash_table_lookup(stream_states, stream_act_msg->stream);
+               BT_ASSERT(stream_state);
+
+               BT_ASSERT(stream_state->state == AUTO_SEEK_STREAM_STATE_STREAM_ACTIVITY_BEGAN);
+               stream_state->state = AUTO_SEEK_STREAM_STATE_STREAM_BEGAN;
+               BT_ASSERT(!stream_state->packet);
+               break;
+       }
+       case BT_MESSAGE_TYPE_STREAM_END:
+       {
+               const struct bt_message_stream *stream_msg = (const void *) msg;
+               struct auto_seek_stream_state *stream_state;
+
+               stream_state = g_hash_table_lookup(stream_states, stream_msg->stream);
+               BT_ASSERT(stream_state);
+               BT_ASSERT(stream_state->state == AUTO_SEEK_STREAM_STATE_STREAM_BEGAN);
+               BT_ASSERT(!stream_state->packet);
+
+               /* Update stream's state: this stream doesn't exist anymore. */
+               g_hash_table_remove(stream_states, stream_msg->stream);
+               break;
+       }
+       default:
+               break;
+       }
+
        bt_object_put_no_null_check(msg);
        msg = NULL;
        goto end;
@@ -1152,7 +1330,7 @@ end:
 static
 enum bt_message_iterator_status find_message_ge_ns_from_origin(
                struct bt_self_component_port_input_message_iterator *iterator,
-               int64_t ns_from_origin)
+               int64_t ns_from_origin, GHashTable *stream_states)
 {
        int status;
        enum bt_self_component_port_input_message_iterator_state init_state =
@@ -1217,7 +1395,8 @@ enum bt_message_iterator_status find_message_ge_ns_from_origin(
                        }
 
                        status = auto_seek_handle_message(iterator,
-                               ns_from_origin, messages[i], &got_first);
+                               ns_from_origin, messages[i], &got_first,
+                               stream_states);
                        if (status == BT_MESSAGE_ITERATOR_STATUS_OK) {
                                /* Message was either pushed or moved */
                                messages[i] = NULL;
@@ -1238,6 +1417,13 @@ end:
        return status;
 }
 
+/*
+ * This function is installed as the iterator's next callback after we have
+ * auto-seeked (seeked to the beginning and fast-forwarded) to send the
+ * messages saved in iterator->auto_seek.msgs.  Once this is done, the original
+ * next callback is put back.
+ */
+
 static
 enum bt_self_message_iterator_status post_auto_seek_next(
                struct bt_self_component_port_input_message_iterator *iterator,
@@ -1269,12 +1455,27 @@ enum bt_self_message_iterator_status post_auto_seek_next(
        return BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
 }
 
+static inline
+int clock_raw_value_from_ns_from_origin(const bt_clock_class *clock_class,
+               int64_t ns_from_origin, uint64_t *raw_value)
+{
+
+       int64_t cc_offset_s = clock_class->offset_seconds;
+       uint64_t cc_offset_cycles = clock_class->offset_cycles;
+       uint64_t cc_freq = clock_class->frequency;
+
+       return bt_common_clock_value_from_ns_from_origin(cc_offset_s,
+               cc_offset_cycles, cc_freq, ns_from_origin, raw_value);
+}
+
+
 enum bt_message_iterator_status
 bt_self_component_port_input_message_iterator_seek_ns_from_origin(
                struct bt_self_component_port_input_message_iterator *iterator,
                int64_t ns_from_origin)
 {
        int status;
+       GHashTable *stream_states = NULL;
 
        BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
        BT_ASSERT_PRE_ITER_HAS_STATE_TO_SEEK(iterator);
@@ -1292,6 +1493,7 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin(
                BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_SEEKING);
 
        if (iterator->methods.seek_ns_from_origin) {
+               /* The iterator knows how to seek to a particular time, let it handle this. */
                BT_LIB_LOGD("Calling user's \"seek nanoseconds from origin\" method: "
                        "%![iter-]+i, ns=%" PRId64, iterator, ns_from_origin);
                status = iterator->methods.seek_ns_from_origin(iterator,
@@ -1306,7 +1508,10 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin(
                        iterator,
                        bt_common_self_message_iterator_status_string(status));
        } else {
-               /* Start automatic seeking: seek beginning first */
+               /*
+                * The iterator doesn't know how to seek to a particular time.  We will
+                * seek to the beginning and fast forward to the right place.
+                */
                BT_ASSERT(iterator->methods.can_seek_beginning(iterator));
                BT_ASSERT(iterator->methods.seek_beginning);
                BT_LIB_LOGD("Calling user's \"seek beginning\" method: %!+i",
@@ -1344,11 +1549,85 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin(
                                g_queue_pop_tail(iterator->auto_seek.msgs));
                }
 
+               stream_states = create_auto_seek_stream_states();
+               if (!stream_states) {
+                       BT_LOGE_STR("Failed to allocate one GHashTable.");
+                       status = BT_MESSAGE_ITERATOR_STATUS_NOMEM;
+                       goto end;
+               }
+
                status = find_message_ge_ns_from_origin(iterator,
-                       ns_from_origin);
+                       ns_from_origin, stream_states);
                switch (status) {
                case BT_MESSAGE_ITERATOR_STATUS_OK:
                case BT_MESSAGE_ITERATOR_STATUS_END:
+               {
+                       GHashTableIter iter;
+                       gpointer key, value;
+
+                       /*
+                        * If some streams exist at the seek time, prepend the
+                        * required messages to put those streams in the right
+                        * state.
+                        */
+                       g_hash_table_iter_init(&iter, stream_states);
+                       while (g_hash_table_iter_next (&iter, &key, &value)) {
+                               const bt_stream *stream = key;
+                               struct auto_seek_stream_state *stream_state =
+                                       (struct auto_seek_stream_state *) value;
+                               bt_message *msg;
+                               const bt_clock_class *clock_class = bt_stream_class_borrow_default_clock_class_const(
+                                       bt_stream_borrow_class_const(stream));
+                               uint64_t raw_value;
+
+                               if (clock_raw_value_from_ns_from_origin(clock_class, ns_from_origin, &raw_value) != 0) {
+                                       BT_LIB_LOGW("Could not convert nanoseconds from origin to clock value: ns-from-origin=%" PRId64 ", %![cc-]+K",
+                                               ns_from_origin, clock_class);
+                                       status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
+                                       goto end;
+                               }
+
+                               switch (stream_state->state) {
+                               case AUTO_SEEK_STREAM_STATE_PACKET_BEGAN:
+                                       BT_ASSERT(stream_state->packet);
+                                       BT_LIB_LOGD("Creating packet message: %![packet-]+a", stream_state->packet);
+                                       msg = bt_message_packet_beginning_create_with_default_clock_snapshot(
+                                               (bt_self_message_iterator *) iterator, stream_state->packet, raw_value);
+                                       if (!msg) {
+                                               status = BT_MESSAGE_ITERATOR_STATUS_NOMEM;
+                                               goto end;
+                                       }
+
+                                       g_queue_push_head(iterator->auto_seek.msgs, msg);
+                                       msg = NULL;
+                                       /* fall-thru */
+                               case AUTO_SEEK_STREAM_STATE_STREAM_ACTIVITY_BEGAN:
+                                       msg = bt_message_stream_activity_beginning_create(
+                                               (bt_self_message_iterator *) iterator, stream);
+                                       if (!msg) {
+                                               status = BT_MESSAGE_ITERATOR_STATUS_NOMEM;
+                                               goto end;
+                                       }
+
+                                       bt_message_stream_activity_beginning_set_default_clock_snapshot(msg, raw_value);
+
+                                       g_queue_push_head(iterator->auto_seek.msgs, msg);
+                                       msg = NULL;
+                                       /* fall-thru */
+                               case AUTO_SEEK_STREAM_STATE_STREAM_BEGAN:
+                                       msg = bt_message_stream_beginning_create(
+                                               (bt_self_message_iterator *) iterator, stream);
+                                       if (!msg) {
+                                               status = BT_MESSAGE_ITERATOR_STATUS_NOMEM;
+                                               goto end;
+                                       }
+
+                                       g_queue_push_head(iterator->auto_seek.msgs, msg);
+                                       msg = NULL;
+                                       break;
+                               }
+                       }
+
                        /*
                         * If there are messages in the auto-seek
                         * message queue, replace the user's "next"
@@ -1373,6 +1652,7 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin(
                         */
                        status = BT_MESSAGE_ITERATOR_STATUS_OK;
                        break;
+               }
                case BT_MESSAGE_ITERATOR_STATUS_ERROR:
                case BT_MESSAGE_ITERATOR_STATUS_NOMEM:
                case BT_MESSAGE_ITERATOR_STATUS_AGAIN:
@@ -1383,6 +1663,10 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin(
        }
 
 end:
+       if (stream_states) {
+               destroy_auto_seek_stream_states(stream_states);
+               stream_states = NULL;
+       }
        set_iterator_state_after_seeking(iterator, status);
        return status;
 }
index 87cdb3e104f220e120fae0189ca9af47766a7c87..218f7788ee8159f10e050f0bd4459208573ec35f 100644 (file)
@@ -130,12 +130,6 @@ struct trimmer_iterator {
 };
 
 struct trimmer_iterator_stream_state {
-       /*
-        * True if both stream beginning and initial stream activity
-        * beginning messages were pushed for this stream.
-        */
-       bool inited;
-
        /*
         * True if the last pushed message for this stream was a stream
         * activity end message.
@@ -153,9 +147,6 @@ struct trimmer_iterator_stream_state {
 
        /* Owned by this (`NULL` initially and between packets) */
        const bt_packet *cur_packet;
-
-       /* Owned by this */
-       const bt_message *stream_beginning_msg;
 };
 
 static
@@ -549,7 +540,6 @@ void destroy_trimmer_iterator_stream_state(
 {
        BT_ASSERT(sstate);
        BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
-       BT_MESSAGE_PUT_REF_AND_RESET(sstate->stream_beginning_msg);
        g_free(sstate);
 }
 
@@ -1154,150 +1144,6 @@ end:
        return status;
 }
 
-/*
- * Makes sure to initialize a stream state, pushing the appropriate
- * initial messages.
- *
- * `stream_act_beginning_msg` is an initial stream activity beginning
- * message to potentially use, depending on its clock snapshot state.
- * This function consumes `stream_act_beginning_msg` unconditionally.
- */
-static inline
-bt_self_message_iterator_status ensure_stream_state_is_inited(
-               struct trimmer_iterator *trimmer_it,
-               struct trimmer_iterator_stream_state *sstate,
-               const bt_message *stream_act_beginning_msg)
-{
-       bt_self_message_iterator_status status =
-               BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
-       bt_message *new_msg = NULL;
-       const bt_clock_class *clock_class =
-               bt_stream_class_borrow_default_clock_class_const(
-                       bt_stream_borrow_class_const(sstate->stream));
-
-       BT_ASSERT(!sstate->inited);
-
-       if (!sstate->stream_beginning_msg) {
-               /* No initial stream beginning message: create one */
-               sstate->stream_beginning_msg =
-                       bt_message_stream_beginning_create(
-                               trimmer_it->self_msg_iter, sstate->stream);
-               if (!sstate->stream_beginning_msg) {
-                       status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
-                       goto end;
-               }
-       }
-
-       /* Push initial stream beginning message */
-       BT_ASSERT(sstate->stream_beginning_msg);
-       push_message(trimmer_it, sstate->stream_beginning_msg);
-       sstate->stream_beginning_msg = NULL;
-
-       if (stream_act_beginning_msg) {
-               /*
-                * Initial stream activity beginning message exists: if
-                * its time is -inf, then create and push a new one
-                * having the trimming range's beginning time. Otherwise
-                * push it as is (known and unknown).
-                */
-               const bt_clock_snapshot *cs;
-               bt_message_stream_activity_clock_snapshot_state sa_cs_state;
-
-               sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
-                       stream_act_beginning_msg, &cs);
-               if (sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE &&
-                               !trimmer_it->begin.is_infinite) {
-                       /*
-                        * -inf time: use trimming range's beginning
-                        * time (which is not -inf).
-                        */
-                       status = create_stream_beginning_activity_message(
-                               trimmer_it, sstate->stream, clock_class,
-                               &new_msg);
-                       if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
-                               goto end;
-                       }
-
-                       push_message(trimmer_it, new_msg);
-                       new_msg = NULL;
-               } else {
-                       /* Known/unknown: push as is */
-                       push_message(trimmer_it, stream_act_beginning_msg);
-                       stream_act_beginning_msg = NULL;
-               }
-       } else {
-               BT_ASSERT(!trimmer_it->begin.is_infinite);
-
-               /*
-                * No stream beginning activity message: create and push
-                * a new message.
-                */
-               status = create_stream_beginning_activity_message(
-                       trimmer_it, sstate->stream, clock_class, &new_msg);
-               if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
-                       goto end;
-               }
-
-               push_message(trimmer_it, new_msg);
-               new_msg = NULL;
-       }
-
-       sstate->inited = true;
-
-end:
-       bt_message_put_ref(new_msg);
-       bt_message_put_ref(stream_act_beginning_msg);
-       return status;
-}
-
-static inline
-bt_self_message_iterator_status ensure_cur_packet_exists(
-       struct trimmer_iterator *trimmer_it,
-       struct trimmer_iterator_stream_state *sstate,
-       const bt_packet *packet)
-{
-       bt_self_message_iterator_status status =
-               BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
-       int ret;
-       const bt_clock_class *clock_class =
-               bt_stream_class_borrow_default_clock_class_const(
-                       bt_stream_borrow_class_const(sstate->stream));
-       bt_message *msg = NULL;
-       uint64_t raw_value;
-
-       BT_ASSERT(!trimmer_it->begin.is_infinite);
-       BT_ASSERT(!sstate->cur_packet);
-
-       /*
-        * Create and push an initial packet beginning message,
-        * making its time the trimming range's beginning time.
-        */
-       ret = clock_raw_value_from_ns_from_origin(clock_class,
-               trimmer_it->begin.ns_from_origin, &raw_value);
-       if (ret) {
-               status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
-               goto end;
-       }
-
-       msg = bt_message_packet_beginning_create_with_default_clock_snapshot(
-               trimmer_it->self_msg_iter, packet, raw_value);
-       if (!msg) {
-               status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
-               goto end;
-       }
-
-       push_message(trimmer_it, msg);
-       msg = NULL;
-
-       /* Set packet as this stream's current packet */
-       sstate->cur_packet = packet;
-       bt_packet_get_ref(sstate->cur_packet);
-
-end:
-       bt_message_put_ref(msg);
-       return status;
-}
-
 /*
  * Handles a message which is associated to a given stream state. This
  * _could_ make the iterator's output message queue grow; this could
@@ -1334,27 +1180,6 @@ bt_self_message_iterator_status handle_message_with_stream_state(
                        break;
                }
 
-               if (G_UNLIKELY(!sstate->inited)) {
-                       status = ensure_stream_state_is_inited(trimmer_it,
-                               sstate, NULL);
-                       if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
-                               goto end;
-                       }
-               }
-
-               if (G_UNLIKELY(!sstate->cur_packet)) {
-                       const bt_event *event =
-                               bt_message_event_borrow_event_const(msg);
-                       const bt_packet *packet = bt_event_borrow_packet_const(
-                               event);
-
-                       status = ensure_cur_packet_exists(trimmer_it, sstate,
-                               packet);
-                       if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
-                               goto end;
-                       }
-               }
-
                BT_ASSERT(sstate->cur_packet);
                push_message(trimmer_it, msg);
                msg = NULL;
@@ -1367,14 +1192,6 @@ bt_self_message_iterator_status handle_message_with_stream_state(
                        break;
                }
 
-               if (G_UNLIKELY(!sstate->inited)) {
-                       status = ensure_stream_state_is_inited(trimmer_it,
-                               sstate, NULL);
-                       if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
-                               goto end;
-                       }
-               }
-
                BT_ASSERT(!sstate->cur_packet);
                sstate->cur_packet =
                        bt_message_packet_beginning_borrow_packet_const(msg);
@@ -1392,25 +1209,6 @@ bt_self_message_iterator_status handle_message_with_stream_state(
                        break;
                }
 
-               if (G_UNLIKELY(!sstate->inited)) {
-                       status = ensure_stream_state_is_inited(trimmer_it,
-                               sstate, NULL);
-                       if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
-                               goto end;
-                       }
-               }
-
-               if (G_UNLIKELY(!sstate->cur_packet)) {
-                       const bt_packet *packet =
-                               bt_message_packet_end_borrow_packet_const(msg);
-
-                       status = ensure_cur_packet_exists(trimmer_it, sstate,
-                               packet);
-                       if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
-                               goto end;
-                       }
-               }
-
                BT_ASSERT(sstate->cur_packet);
                BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
                push_message(trimmer_it, msg);
@@ -1510,14 +1308,6 @@ bt_self_message_iterator_status handle_message_with_stream_state(
                        BT_MESSAGE_MOVE_REF(msg, new_msg);
                }
 
-               if (G_UNLIKELY(!sstate->inited)) {
-                       status = ensure_stream_state_is_inited(trimmer_it,
-                               sstate, NULL);
-                       if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
-                               goto end;
-                       }
-               }
-
                push_message(trimmer_it, msg);
                msg = NULL;
                break;
@@ -1537,18 +1327,8 @@ bt_self_message_iterator_status handle_message_with_stream_state(
                        break;
                }
 
-               if (!sstate->inited) {
-                       status = ensure_stream_state_is_inited(trimmer_it,
-                               sstate, msg);
-                       msg = NULL;
-                       if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
-                               goto end;
-                       }
-               } else {
-                       push_message(trimmer_it, msg);
-                       msg = NULL;
-               }
-
+               push_message(trimmer_it, msg);
+               msg = NULL;
                break;
        case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
                if (trimmer_it->end.is_infinite) {
@@ -1558,12 +1338,10 @@ bt_self_message_iterator_status handle_message_with_stream_state(
                }
 
                if (ns_from_origin == INT64_MIN) {
-                       /* Unknown: push as is if stream state is inited */
-                       if (sstate->inited) {
-                               push_message(trimmer_it, msg);
-                               msg = NULL;
-                               sstate->last_msg_is_stream_activity_end = true;
-                       }
+                       /* Unknown: consider it to be in the trimmer window. */
+                       push_message(trimmer_it, msg);
+                       msg = NULL;
+                       sstate->last_msg_is_stream_activity_end = true;
                } else if (ns_from_origin == INT64_MAX) {
                        /* Infinite: use trimming range's end time */
                        sstate->stream_act_end_ns_from_origin =
@@ -1578,21 +1356,6 @@ bt_self_message_iterator_status handle_message_with_stream_state(
                                break;
                        }
 
-                       if (!sstate->inited) {
-                               /*
-                                * First message for this stream is a
-                                * stream activity end: we can't deduce
-                                * anything about the stream activity
-                                * beginning's time, and using this
-                                * message's time would make a useless
-                                * pair of stream activity beginning/end
-                                * with the same time. Just skip this
-                                * message and wait for something
-                                * useful.
-                                */
-                               break;
-                       }
-
                        push_message(trimmer_it, msg);
                        msg = NULL;
                        sstate->last_msg_is_stream_activity_end = true;
@@ -1601,45 +1364,34 @@ bt_self_message_iterator_status handle_message_with_stream_state(
 
                break;
        case BT_MESSAGE_TYPE_STREAM_BEGINNING:
-               /*
-                * We don't know what follows at this point, so just
-                * keep this message until we know what to do with it
-                * (it will be used in ensure_stream_state_is_inited()).
-                */
-               BT_ASSERT(!sstate->inited);
-               BT_MESSAGE_MOVE_REF(sstate->stream_beginning_msg, msg);
+               push_message(trimmer_it,  msg);
+               msg = NULL;
                break;
        case BT_MESSAGE_TYPE_STREAM_END:
-               if (sstate->inited) {
-                       /*
-                        * This is the end of an inited stream: end this
-                        * stream if its stream activity end message
-                        * time is not the trimming range's end time
-                        * (which means the final stream activity end
-                        * message had an infinite time). end_stream()
-                        * will generate its own stream end message.
-                        */
-                       if (trimmer_it->end.is_infinite) {
-                               push_message(trimmer_it, msg);
-                               msg = NULL;
-                               g_hash_table_remove(trimmer_it->stream_states,
-                                       sstate->stream);
-                       } else if (sstate->stream_act_end_ns_from_origin <
-                                       trimmer_it->end.ns_from_origin) {
-                               status = end_stream(trimmer_it, sstate);
-                               if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
-                                       goto end;
-                               }
+               /*
+                * This is the end of a stream: end this
+                * stream if its stream activity end message
+                * time is not the trimming range's end time
+                * (which means the final stream activity end
+                * message had an infinite time). end_stream()
+                * will generate its own stream end message.
+                */
+               if (trimmer_it->end.is_infinite) {
+                       push_message(trimmer_it, msg);
+                       msg = NULL;
 
-                               /* We won't need this stream state again */
-                               g_hash_table_remove(trimmer_it->stream_states,
-                                       sstate->stream);
+                       /* We won't need this stream state again */
+                       g_hash_table_remove(trimmer_it->stream_states, sstate->stream);
+               } else if (sstate->stream_act_end_ns_from_origin <
+                               trimmer_it->end.ns_from_origin) {
+                       status = end_stream(trimmer_it, sstate);
+                       if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
+                               goto end;
                        }
-               } else {
-                       /* We dont't need this stream state anymore */
+
+                       /* We won't need this stream state again */
                        g_hash_table_remove(trimmer_it->stream_states, sstate->stream);
                }
-
                break;
        default:
                break;
This page took 0.03676 seconds and 4 git commands to generate.