lib: validate monotonicity of messages from upstream component
authorSimon Marchi <simon.marchi@efficios.com>
Mon, 17 Jun 2019 23:18:13 +0000 (19:18 -0400)
committerPhilippe Proulx <eeppeliteloop@gmail.com>
Fri, 28 Jun 2019 20:08:01 +0000 (16:08 -0400)
This patch adds two checks after having called an iterator's next
method, to verify the validity of what is returned.

Clock class compatibility
-------------------------

Check that the clock classes of new streams created through this
iterator are compatible with the clock classes of streams previously
created through the same iterator.

The first time a stream is created through an iterator, we save some
properties of that clock class (whether its origin is the Unix epoch,
and if it isn't, its uuid).  When new streams appear, we compare those
properties to make sure the clock classes of the new streams are
compatible with those properties.

Clock snapshots increase
------------------------

Check that the clock snapshots of all messages (that have one) don't go
back in time.

For each incoming message, if it has a clock snapshot, we compare it
with the clock snapshot of the previous message and assert that it isn't
lower (the "previous message time" is initialized to INT64_MIN).

Some messages don't have a clock snapshot, for those we do nothing.

I am not considering the "infinity" value that stream activity end
messages can have.  It would add a bit more complexity to validate them
propertly, because it's possible to have something like this:

    Stream 1: ...  EV(1)       SAE(inf)  SE
    Stream 2: ...        EV(2)               EV(10) ...

As you can see, it is possible to have a finite value (10) on stream 2
after having received an SAE message at infinity on stream 1.  We could,
validate that nothing with a finite clock snapshot comes on stream 1
specifically.  It would however require some per-stream data, which adds
a bit of complexity.  And since we plan on removing these messages
anyway, it doesn't seem to be worth the effort.

Change-Id: If18f475ed056fda77a539d700e6ad310111f6d6b
Signed-off-by: Simon Marchi <simon.marchi@efficios.com>
Reviewed-on: https://review.lttng.org/c/babeltrace/+/1486
Reviewed-by: Philippe Proulx <eeppeliteloop@gmail.com>
src/lib/graph/iterator.c
src/lib/graph/message/iterator.h

index 31c4ec3ffc24da8be54f30b8b25f96aa9a3d7bd6..08a49cfeee34136967256f7cbb2e4848e07692af 100644 (file)
@@ -323,6 +323,8 @@ bt_self_component_port_input_message_iterator_create_initial(
                goto end;
        }
 
+       iterator->last_ns_from_origin = INT64_MIN;
+
        iterator->auto_seek.msgs = g_queue_new();
        if (!iterator->auto_seek.msgs) {
                BT_LOGE_STR("Failed to allocate a GQueue.");
@@ -528,6 +530,289 @@ void bt_self_message_iterator_set_data(
                "%!+i, user-data-addr=%p", iterator, data);
 }
 
+/*
+ * Validate that the default clock snapshot in `msg` doesn't make us go back in
+ * time.
+ */
+
+BT_ASSERT_PRE_FUNC
+static
+bool clock_snapshots_are_monotonic_one(
+               struct bt_self_component_port_input_message_iterator *iterator,
+               const bt_message *msg)
+{
+       const struct bt_clock_snapshot *clock_snapshot = NULL;
+       bt_message_type message_type = bt_message_get_type(msg);
+       int64_t ns_from_origin;
+       enum bt_clock_snapshot_status clock_snapshot_status;
+
+       /*
+        * The default is true: if we can't figure out the clock snapshot
+        * (or there is none), assume it is fine.
+        */
+       bool result = true;
+
+       switch (message_type) {
+       case BT_MESSAGE_TYPE_EVENT:
+       {
+               struct bt_message_event *event_msg = (struct bt_message_event *) msg;
+               clock_snapshot = event_msg->default_cs;
+               break;
+       }
+       case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
+       {
+               struct bt_message_message_iterator_inactivity *inactivity_msg =
+                       (struct bt_message_message_iterator_inactivity *) msg;
+               clock_snapshot = inactivity_msg->default_cs;
+               break;
+       }
+       case BT_MESSAGE_TYPE_PACKET_BEGINNING:
+       case BT_MESSAGE_TYPE_PACKET_END:
+       {
+               struct bt_message_packet *packet_msg = (struct bt_message_packet *) msg;
+               clock_snapshot = packet_msg->default_cs;
+               break;
+       }
+       case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
+       case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
+       {
+               struct bt_message_stream_activity *str_act_msg =
+                       (struct bt_message_stream_activity *) msg;
+
+               if (str_act_msg->default_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
+                       clock_snapshot = str_act_msg->default_cs;
+               }
+               break;
+       }
+       case BT_MESSAGE_TYPE_STREAM_BEGINNING:
+       case BT_MESSAGE_TYPE_STREAM_END:
+               /* These messages don't have clock snapshots. */
+               goto end;
+       case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
+       case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
+       {
+               struct bt_message_discarded_items *discarded_msg =
+                       (struct bt_message_discarded_items *) msg;
+
+               clock_snapshot = discarded_msg->default_begin_cs;
+               break;
+       }
+       }
+
+       if (!clock_snapshot) {
+               goto end;
+       }
+
+       clock_snapshot_status = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, &ns_from_origin);
+       if (clock_snapshot_status != BT_CLOCK_SNAPSHOT_STATUS_OK) {
+               goto end;
+       }
+
+       result = ns_from_origin >= iterator->last_ns_from_origin;
+       iterator->last_ns_from_origin = ns_from_origin;
+end:
+       return result;
+}
+
+BT_ASSERT_PRE_FUNC
+static
+bool clock_snapshots_are_monotonic(
+               struct bt_self_component_port_input_message_iterator *iterator,
+               bt_message_array_const msgs, uint64_t msg_count)
+{
+       uint64_t i;
+       bool result;
+
+       for (i = 0; i < msg_count; i++) {
+               if (!clock_snapshots_are_monotonic_one(iterator, msgs[i])) {
+                       result = false;
+                       goto end;
+               }
+       }
+
+       result = true;
+
+end:
+       return result;
+}
+
+/*
+ * When a new stream begins, verify that the clock class tied to this
+ * stream is compatible with what we've seen before.
+ */
+
+BT_ASSERT_PRE_FUNC
+static
+bool clock_classes_are_compatible_one(struct bt_self_component_port_input_message_iterator *iterator,
+               const struct bt_message *msg)
+{
+       enum bt_message_type message_type = bt_message_get_type(msg);
+       bool result;
+
+       if (message_type == BT_MESSAGE_TYPE_STREAM_BEGINNING) {
+               const struct bt_message_stream *stream_msg = (struct bt_message_stream *) msg;
+               const struct bt_clock_class *clock_class = stream_msg->stream->class->default_clock_class;
+               bt_uuid clock_class_uuid = NULL;
+
+               if (clock_class) {
+                       clock_class_uuid = bt_clock_class_get_uuid(clock_class);
+               }
+
+               switch (iterator->clock_expectation.type) {
+               case CLOCK_EXPECTATION_UNSET:
+                       /*
+                        * This is the first time we see a message with a clock
+                        * snapshot: record the properties of that clock, against
+                        * which we'll compare the clock properties of the following
+                        * messages.
+                        */
+
+                       if (!clock_class) {
+                               iterator->clock_expectation.type = CLOCK_EXPECTATION_NONE;
+                       } else if (bt_clock_class_origin_is_unix_epoch(clock_class)) {
+                               iterator->clock_expectation.type = CLOCK_EXPECTATION_ORIGIN_UNIX;
+                       } else if (clock_class_uuid) {
+                               iterator->clock_expectation.type = CLOCK_EXPECTATION_ORIGIN_OTHER_UUID;
+                               memcpy(iterator->clock_expectation.uuid, clock_class_uuid, BABELTRACE_UUID_LEN);
+                       } else {
+                               iterator->clock_expectation.type = CLOCK_EXPECTATION_ORIGIN_OTHER_NO_UUID;
+                       }
+                       break;
+
+               case CLOCK_EXPECTATION_NONE:
+                       if (clock_class) {
+                               BT_ASSERT_PRE_MSG("Expecting no clock class, got one: %![cc-]+K",
+                                       clock_class);
+                               result = false;
+                               goto end;
+                       }
+
+                       break;
+
+               case CLOCK_EXPECTATION_ORIGIN_UNIX:
+                       if (!clock_class) {
+                               BT_ASSERT_PRE_MSG("Expecting a clock class, got none.");
+                               result = false;
+                               goto end;
+                       }
+
+                       if (!bt_clock_class_origin_is_unix_epoch(clock_class)) {
+                               BT_ASSERT_PRE_MSG("Expecting a clock class with Unix epoch origin: %![cc-]+K",
+                                       clock_class);
+                               result = false;
+                               goto end;
+                       }
+                       break;
+
+               case CLOCK_EXPECTATION_ORIGIN_OTHER_UUID:
+                       if (!clock_class) {
+                               BT_ASSERT_PRE_MSG("Expecting a clock class, got none.");
+                               result = false;
+                               goto end;
+                       }
+
+                       if (bt_clock_class_origin_is_unix_epoch(clock_class)) {
+                               BT_ASSERT_PRE_MSG("Expecting a clock class without Unix epoch origin: %![cc-]+K",
+                                       clock_class);
+                               result = false;
+                               goto end;
+                       }
+
+                       if (!clock_class_uuid) {
+                               BT_ASSERT_PRE_MSG("Expecting a clock class with UUID: %![cc-]+K",
+                                       clock_class);
+                               result = false;
+                               goto end;
+                       }
+
+                       if (bt_uuid_compare(iterator->clock_expectation.uuid, clock_class_uuid)) {
+                               BT_ASSERT_PRE_MSG("Expecting a clock class with UUID, got one "
+                                       "with a different UUID: %![cc-]+K, expected-uuid=%!u",
+                                       clock_class, iterator->clock_expectation.uuid);
+                               result = false;
+                               goto end;
+                       }
+                       break;
+
+               case CLOCK_EXPECTATION_ORIGIN_OTHER_NO_UUID:
+                       if (!clock_class) {
+                               BT_ASSERT_PRE_MSG("Expecting a clock class, got none.");
+                               result = false;
+                               goto end;
+                       }
+
+                       if (bt_clock_class_origin_is_unix_epoch(clock_class)) {
+                               BT_ASSERT_PRE_MSG("Expecting a clock class without Unix epoch origin: %![cc-]+K",
+                                       clock_class);
+                               result = false;
+                               goto end;
+                       }
+
+                       if (clock_class_uuid) {
+                               BT_ASSERT_PRE_MSG("Expecting a clock class without UUID: %![cc-]+K",
+                                       clock_class);
+                               result = false;
+                               goto end;
+                       }
+                       break;
+               }
+       }
+
+       result = true;
+
+end:
+       return result;
+}
+
+BT_ASSERT_PRE_FUNC
+static
+bool clock_classes_are_compatible(
+               struct bt_self_component_port_input_message_iterator *iterator,
+               bt_message_array_const msgs, uint64_t msg_count)
+{
+       uint64_t i;
+       bool result;
+
+       for (i = 0; i < msg_count; i++) {
+               if (!clock_classes_are_compatible_one(iterator, msgs[i])) {
+                       result = false;
+                       goto end;
+               }
+       }
+
+       result = true;
+
+end:
+       return result;
+}
+
+/*
+ * Call the `next` method of the iterator.  Do some validation on the returned
+ * messages.
+ */
+
+static
+bt_message_iterator_status call_iterator_next_method(
+               struct bt_self_component_port_input_message_iterator *iterator,
+               bt_message_array_const msgs, uint64_t capacity, uint64_t *user_count)
+{
+       bt_message_iterator_status status;
+
+       BT_ASSERT(iterator->methods.next);
+       BT_LOGD_STR("Calling user's \"next\" method.");
+
+       status = iterator->methods.next(iterator, msgs, capacity, user_count);
+
+       if (status == BT_MESSAGE_ITERATOR_STATUS_OK) {
+               BT_ASSERT_PRE(clock_classes_are_compatible(iterator, msgs, *user_count),
+                       "Clocks are not compatible");
+               BT_ASSERT_PRE(clock_snapshots_are_monotonic(iterator, msgs, *user_count),
+                       "Clock snapshots are not monotonic");
+       }
+
+       return status;
+}
+
 enum bt_message_iterator_status
 bt_self_component_port_input_message_iterator_next(
                struct bt_self_component_port_input_message_iterator *iterator,
@@ -557,10 +842,8 @@ bt_self_component_port_input_message_iterator_next(
         * Call the user's "next" method to get the next messages
         * and status.
         */
-       BT_ASSERT(iterator->methods.next);
-       BT_LOGD_STR("Calling user's \"next\" method.");
        *user_count = 0;
-       status = iterator->methods.next(iterator,
+       status = call_iterator_next_method(iterator,
                (void *) iterator->base.msgs->pdata, MSG_BATCH_SIZE,
                user_count);
        BT_LOGD("User method returned: status=%s, msg-count=%" PRIu64,
@@ -922,6 +1205,14 @@ void set_iterator_state_after_seeking(
        set_self_comp_port_input_msg_iterator_state(iterator, new_state);
 }
 
+static
+void reset_iterator_expectations(
+               struct bt_self_component_port_input_message_iterator *iterator)
+{
+       iterator->last_ns_from_origin = INT64_MIN;
+       iterator->clock_expectation.type = CLOCK_EXPECTATION_UNSET;
+}
+
 enum bt_message_iterator_status
 bt_self_component_port_input_message_iterator_seek_beginning(
                struct bt_self_component_port_input_message_iterator *iterator)
@@ -939,6 +1230,13 @@ bt_self_component_port_input_message_iterator_seek_beginning(
                bt_self_component_port_input_message_iterator_can_seek_beginning(
                        iterator),
                "Message iterator cannot seek beginning: %!+i", iterator);
+
+       /*
+        * We are seeking, reset our expectations about how the following
+        * messages should look like.
+        */
+       reset_iterator_expectations(iterator);
+
        BT_LIB_LOGD("Calling user's \"seek beginning\" method: %!+i", iterator);
        set_self_comp_port_input_msg_iterator_state(iterator,
                BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_SEEKING);
@@ -1358,8 +1656,7 @@ enum bt_message_iterator_status find_message_ge_ns_from_origin(
                 * Call the user's "next" method to get the next
                 * messages and status.
                 */
-               BT_LOGD_STR("Calling user's \"next\" method.");
-               status = iterator->methods.next(iterator,
+               status = call_iterator_next_method(iterator,
                        &messages[0], MSG_BATCH_SIZE, &user_count);
                BT_LOGD("User method returned: status=%s",
                        bt_message_iterator_status_string(status));
@@ -1493,6 +1790,12 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin(
        set_self_comp_port_input_msg_iterator_state(iterator,
                BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_SEEKING);
 
+       /*
+        * We are seeking, reset our expectations about how the following
+        * messages should look like.
+        */
+       reset_iterator_expectations(iterator);
+
        if (iterator->methods.seek_ns_from_origin) {
                /* The iterator knows how to seek to a particular time, let it handle this. */
                BT_LIB_LOGD("Calling user's \"seek nanoseconds from origin\" method: "
@@ -1663,6 +1966,12 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin(
                }
        }
 
+       /*
+        * The following messages returned by the next method (including
+        * post_auto_seek_next) must be after (or at) `ns_from_origin`.
+        */
+       iterator->last_ns_from_origin = ns_from_origin;
+
 end:
        if (stream_states) {
                destroy_auto_seek_stream_states(stream_states);
index a174f3e9c17f99c4fb879c7db4c067c314250f5a..36795bd083baa4e7902ada7a788d87604e37a3c7 100644 (file)
@@ -32,6 +32,7 @@
 #include <babeltrace2/types.h>
 #include "common/assert.h"
 #include <stdbool.h>
+#include <compat/uuid.h>
 
 struct bt_port;
 struct bt_graph;
@@ -113,6 +114,40 @@ struct bt_self_component_port_input_message_iterator {
 
        enum bt_self_component_port_input_message_iterator_state state;
 
+       /*
+        * Timestamp of the last received message (or INT64_MIN in the
+        * beginning, or after a seek to beginning).
+        */
+       int64_t last_ns_from_origin;
+
+       struct {
+               enum {
+                       /* We haven't recorded clock properties yet. */
+                       CLOCK_EXPECTATION_UNSET,
+
+                       /* Expect to have no clock. */
+                       CLOCK_EXPECTATION_NONE,
+
+                       /* Clock with origin_is_unix_epoch true.*/
+                       CLOCK_EXPECTATION_ORIGIN_UNIX,
+
+                       /* Clock with origin_is_unix_epoch false, with a UUID.*/
+                       CLOCK_EXPECTATION_ORIGIN_OTHER_UUID,
+
+                       /* Clock with origin_is_unix_epoch false, without a UUID.*/
+                       CLOCK_EXPECTATION_ORIGIN_OTHER_NO_UUID,
+               } type;
+
+               /*
+                * Expected UUID of the clock, if `type`is CLOCK_EXPECTATION_ORIGIN_OTHER_UUID.
+                *
+                * If the clock's origin is the unix epoch, the UUID is
+                * irrelevant (as the clock will be correlatable with other
+                * clocks having the same origin).
+                */
+               uint8_t uuid[BABELTRACE_UUID_LEN];
+       } clock_expectation;
+
        /*
         * Data necessary for auto seek (the seek-to-beginning then fast-forward
         * seek strategy).
This page took 0.029746 seconds and 4 git commands to generate.