lib: iterator auto-seeking: handle intersecting discarded items messages
authorPhilippe Proulx <eeppeliteloop@gmail.com>
Wed, 27 Feb 2019 19:16:51 +0000 (14:16 -0500)
committerFrancis Deslauriers <francis.deslauriers@efficios.com>
Thu, 2 May 2019 20:50:15 +0000 (20:50 +0000)
This patch makes the message iterator auto-seeking algorithm handle
discarded events and packets messages of which the seeking time
intersects the message's time range, for example:

    discarded events
    msg
    v                  |
    ===*=========*=*===|==*== *      * *
       ^ event msg     |
                       ^ seeking time

In this scenario, we obviously don't want to keep the three event
messages which occur before the seeking time, but we want to keep a part
of the discarded events message because discarding it entirely (as it
was done before this patch) leads to information loss, i.e.:

                       |
                       |  *   *      * *
                       |

Instead, when this happens, we set the discarded events/packets
message's beginning time to the seeking time, make its count
unavailable, and we keep the message:

                       |
                       |==*== *      * *
                       |

The message iterator's internal auto-seeking message array is replaced
with a message queue as there can be more messages in this queue than
the iterator's message array capacity now. Because of this, the
post-seeking temporary "next" method fills the output message array as
long as messages exist in the auto-seeking message queue. When the
message queue is finally empty, it resets the iterator's "next" method
to the original user method.

Because setting a discarded items message's beginning time to a seeking
time (in nanoseconds from origin) requires a conversion from nanoseconds
from origin to a raw clock value, the internal
bt_clock_class_clock_value_from_ns_from_origin() utility is added. This
function can fail if there's an overflow in arithmetic operations.

Signed-off-by: Philippe Proulx <eeppeliteloop@gmail.com>
include/babeltrace/babeltrace-internal.h
include/babeltrace/graph/message-iterator-internal.h
include/babeltrace/trace-ir/clock-class-internal.h
lib/graph/iterator.c

index d58b9860c8b5c65a71a811a7799c1da495a1aaa9..981f84b186910cd7145fe04cb93c30805752e36c 100644 (file)
@@ -26,6 +26,7 @@
 #include <glib.h>
 #include <stdint.h>
 #include <stdlib.h>
+#include <stdbool.h>
 #include <errno.h>
 #include <babeltrace/compat/string-internal.h>
 #include <babeltrace/types.h>
        ((type) (a) > (type) (b) ? (type) (a) : (type) (b))
 #endif
 
+static inline
+bool bt_safe_to_mul_int64(int64_t a, int64_t b)
+{
+       if (a == 0 || b == 0) {
+               return true;
+       }
+
+       return a < INT64_MAX / b;
+}
+
+static inline
+bool bt_safe_to_mul_uint64(uint64_t a, uint64_t b)
+{
+       if (a == 0 || b == 0) {
+               return true;
+       }
+
+       return a < UINT64_MAX / b;
+}
+
+static inline
+bool bt_safe_to_add_int64(int64_t a, int64_t b)
+{
+       return a <= INT64_MAX - b;
+}
+
+static inline
+bool bt_safe_to_add_uint64(uint64_t a, uint64_t b)
+{
+       return a <= UINT64_MAX - b;
+}
+
 /*
  * Memory allocation zeroed
  */
index 3a57f5dadbd678edc5d4b305b524d853f58e6baa..00ce436ee8c71645b054cae52b2228f52cb0474e 100644 (file)
@@ -112,8 +112,7 @@ struct bt_self_component_port_input_message_iterator {
        } methods;
 
        enum bt_self_component_port_input_message_iterator_state state;
-       uint64_t auto_seek_msg_count;
-       GPtrArray *auto_seek_msgs;
+       GQueue *auto_seek_msgs;
        void *user_data;
 };
 
index d91d549f7f4d4558bdfafe00f7bbf862c1703594..80147a639f2dc696163a5b25bd48224c0d22c56c 100644 (file)
 #include <babeltrace/compat/uuid-internal.h>
 #include <babeltrace/types.h>
 #include <babeltrace/property-internal.h>
+#include <babeltrace/assert-internal.h>
 #include <stdbool.h>
 #include <stdint.h>
 #include <glib.h>
 
+#define NS_PER_S_I     INT64_C(1000000000)
+#define NS_PER_S_U     UINT64_C(1000000000)
+
 struct bt_clock_class {
        struct bt_object base;
 
@@ -102,4 +106,92 @@ void _bt_clock_class_freeze(const struct bt_clock_class *clock_class);
 BT_HIDDEN
 bt_bool bt_clock_class_is_valid(struct bt_clock_class *clock_class);
 
+static inline
+int bt_clock_class_clock_value_from_ns_from_origin(
+               struct bt_clock_class *cc, int64_t ns_from_origin,
+               uint64_t *raw_value)
+{
+       int ret = 0;
+       int64_t offset_in_ns;
+       uint64_t value_in_ns;
+       uint64_t rem_value_in_ns;
+       uint64_t value_periods;
+       uint64_t value_period_cycles;
+       int64_t ns_to_add;
+
+       BT_ASSERT(cc);
+       BT_ASSERT(raw_value);
+
+       /* Compute offset part of requested value, in nanoseconds */
+       if (!bt_safe_to_mul_int64(cc->offset_seconds, NS_PER_S_I)) {
+               ret = -1;
+               goto end;
+       }
+
+       offset_in_ns = cc->offset_seconds * NS_PER_S_I;
+
+       if (cc->frequency == NS_PER_S_U) {
+               ns_to_add = (int64_t) cc->offset_cycles;
+       } else {
+               if (!bt_safe_to_mul_int64((int64_t) cc->offset_cycles,
+                               NS_PER_S_I)) {
+                       ret = -1;
+                       goto end;
+               }
+
+               ns_to_add = ((int64_t) cc->offset_cycles * NS_PER_S_I) /
+                       (int64_t) cc->frequency;
+       }
+
+       if (!bt_safe_to_add_int64(offset_in_ns, ns_to_add)) {
+               ret = -1;
+               goto end;
+       }
+
+       offset_in_ns += ns_to_add;
+
+       /* Value part in nanoseconds */
+       if (ns_from_origin < offset_in_ns) {
+               ret = -1;
+               goto end;
+       }
+
+       value_in_ns = (uint64_t) (ns_from_origin - offset_in_ns);
+
+       /* Number of whole clock periods in `value_in_ns` */
+       value_periods = value_in_ns / NS_PER_S_U;
+
+       /* Remaining nanoseconds in cycles + whole clock periods in cycles */
+       rem_value_in_ns = value_in_ns - value_periods * NS_PER_S_U;
+
+       if (value_periods > UINT64_MAX / cc->frequency) {
+               ret = -1;
+               goto end;
+       }
+
+       if (!bt_safe_to_mul_uint64(value_periods, cc->frequency)) {
+               ret = -1;
+               goto end;
+       }
+
+       value_period_cycles = value_periods * cc->frequency;
+
+       if (!bt_safe_to_mul_uint64(cc->frequency, rem_value_in_ns)) {
+               ret = -1;
+               goto end;
+       }
+
+       if (!bt_safe_to_add_uint64(cc->frequency * rem_value_in_ns / NS_PER_S_U,
+                       value_period_cycles)) {
+               ret = -1;
+               goto end;
+       }
+
+       *raw_value = cc->frequency * rem_value_in_ns / NS_PER_S_U +
+               value_period_cycles;
+
+end:
+       return ret;
+}
+
 #endif /* BABELTRACE_TRACE_IR_CLOCK_CLASS_INTERNAL_H */
index a7c9ea17ea9ca0cb2017879ee6da1b79391edeaf..5cfda703252e185f1cb4646f844909ba8ab055f2 100644 (file)
@@ -25,6 +25,8 @@
 #include <babeltrace/lib-logging-internal.h>
 
 #include <babeltrace/compiler-internal.h>
+#include <babeltrace/trace-ir/clock-class-internal.h>
+#include <babeltrace/trace-ir/clock-snapshot-internal.h>
 #include <babeltrace/trace-ir/field.h>
 #include <babeltrace/trace-ir/event-const.h>
 #include <babeltrace/trace-ir/event-internal.h>
@@ -148,17 +150,12 @@ void bt_self_component_port_input_message_iterator_destroy(struct bt_object *obj
        }
 
        if (iterator->auto_seek_msgs) {
-               uint64_t i;
-
-               /* Put any remaining message in the auto-seek array */
-               for (i = 0; i < iterator->auto_seek_msgs->len; i++) {
-                       if (iterator->auto_seek_msgs->pdata[i]) {
-                               bt_object_put_no_null_check(
-                                       iterator->auto_seek_msgs->pdata[i]);
-                       }
+               while (!g_queue_is_empty(iterator->auto_seek_msgs)) {
+                       bt_object_put_no_null_check(
+                               g_queue_pop_tail(iterator->auto_seek_msgs));
                }
 
-               g_ptr_array_free(iterator->auto_seek_msgs, TRUE);
+               g_queue_free(iterator->auto_seek_msgs);
                iterator->auto_seek_msgs = NULL;
        }
 
@@ -324,14 +321,13 @@ bt_self_component_port_input_message_iterator_create_initial(
                goto end;
        }
 
-       iterator->auto_seek_msgs = g_ptr_array_new();
+       iterator->auto_seek_msgs = g_queue_new();
        if (!iterator->auto_seek_msgs) {
-               BT_LOGE_STR("Failed to allocate a GPtrArray.");
+               BT_LOGE_STR("Failed to allocate a GQueue.");
                ret = -1;
                goto end;
        }
 
-       g_ptr_array_set_size(iterator->auto_seek_msgs, MSG_BATCH_SIZE);
        iterator->upstream_component = upstream_comp;
        iterator->upstream_port = upstream_port;
        iterator->connection = iterator->upstream_port->connection;
@@ -985,11 +981,18 @@ bt_self_component_port_input_message_iterator_seek_beginning(
 }
 
 static inline
-int get_message_ns_from_origin(const struct bt_message *msg,
-               int64_t *ns_from_origin, bool *ignore)
+enum bt_message_iterator_status auto_seek_handle_message(
+               struct bt_self_component_port_input_message_iterator *iterator,
+               int64_t ns_from_origin, const struct bt_message *msg,
+               bool *got_first)
 {
+       enum bt_message_iterator_status status = BT_MESSAGE_ITERATOR_STATUS_OK;
+       int64_t msg_ns_from_origin;
        const struct bt_clock_snapshot *clk_snapshot = NULL;
-       int ret = 0;
+       int ret;
+
+       BT_ASSERT(msg);
+       BT_ASSERT(got_first);
 
        switch (msg->type) {
        case BT_MESSAGE_TYPE_EVENT:
@@ -1027,14 +1030,68 @@ int get_message_ns_from_origin(const struct bt_message *msg,
        case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
        case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
        {
-               const struct bt_message_discarded_items *disc_items_msg =
-                       (const void *) msg;
+               struct bt_message_discarded_items *msg_disc_items =
+                       (void *) msg;
+
+               BT_ASSERT_PRE(msg_disc_items->default_begin_cs &&
+                       msg_disc_items->default_end_cs,
+                       "Discarded events/packets message has no default clock snapshots: %!+n",
+                       msg_disc_items);
+               ret = bt_clock_snapshot_get_ns_from_origin(
+                       msg_disc_items->default_begin_cs,
+                       &msg_ns_from_origin);
+               if (ret) {
+                       status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
+                       goto end;
+               }
 
-               clk_snapshot = disc_items_msg->default_begin_cs;
-               BT_ASSERT_PRE(clk_snapshot,
-                       "Discarded events/packets message has no default clock snapshot: %!+n",
-                       msg);
-               break;
+               if (msg_ns_from_origin >= ns_from_origin) {
+                       *got_first = true;
+                       goto push_msg;
+               }
+
+               ret = bt_clock_snapshot_get_ns_from_origin(
+                       msg_disc_items->default_end_cs,
+                       &msg_ns_from_origin);
+               if (ret) {
+                       status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
+                       goto end;
+               }
+
+               if (msg_ns_from_origin >= ns_from_origin) {
+                       /*
+                        * The discarded items message's beginning time
+                        * is before the requested seeking time, but its
+                        * end time is after. Modify the message so as
+                        * to set its beginning time to the requested
+                        * seeking time, and make its item count unknown
+                        * as we don't know if items were really
+                        * discarded within the new time range.
+                        */
+                       uint64_t new_begin_raw_value;
+
+                       ret = bt_clock_class_clock_value_from_ns_from_origin(
+                               msg_disc_items->default_end_cs->clock_class,
+                               ns_from_origin, &new_begin_raw_value);
+                       if (ret) {
+                               status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
+                               goto end;
+                       }
+
+                       bt_clock_snapshot_set_raw_value(
+                               msg_disc_items->default_begin_cs,
+                               new_begin_raw_value);
+                       msg_disc_items->count.base.avail =
+                               BT_PROPERTY_AVAILABILITY_NOT_AVAILABLE;
+
+                       /*
+                        * It is safe to push it because its beginning
+                        * time is exactly the requested seeking time.
+                        */
+                       goto push_msg;
+               } else {
+                       goto skip_msg;
+               }
        }
        case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
        {
@@ -1049,7 +1106,7 @@ int get_message_ns_from_origin(const struct bt_message *msg,
                         * and we can't assume any specific time for an
                         * unknown clock snapshot, so skip this.
                         */
-                       goto set_ignore;
+                       goto skip_msg;
                case BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN:
                        clk_snapshot = stream_act_msg->default_cs;
                        BT_ASSERT(clk_snapshot);
@@ -1071,14 +1128,14 @@ int get_message_ns_from_origin(const struct bt_message *msg,
                         * We can't assume any specific time for an
                         * unknown clock snapshot, so skip this.
                         */
-                       goto set_ignore;
+                       goto skip_msg;
                case BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE:
                        /*
                         * +inf is always greater than any requested
                         * time.
                         */
-                       *ns_from_origin = INT64_MAX;
-                       goto end;
+                       *got_first = true;
+                       goto push_msg;
                case BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN:
                        clk_snapshot = stream_act_msg->default_cs;
                        BT_ASSERT(clk_snapshot);
@@ -1092,22 +1149,35 @@ int get_message_ns_from_origin(const struct bt_message *msg,
        case BT_MESSAGE_TYPE_STREAM_BEGINNING:
        case BT_MESSAGE_TYPE_STREAM_END:
                /* Ignore */
-               break;
+               goto skip_msg;
        default:
                abort();
        }
 
-set_ignore:
-       if (!clk_snapshot) {
-               *ignore = true;
+       BT_ASSERT(clk_snapshot);
+       ret = bt_clock_snapshot_get_ns_from_origin(clk_snapshot,
+               &msg_ns_from_origin);
+       if (ret) {
+               status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
                goto end;
        }
 
-       ret = bt_clock_snapshot_get_ns_from_origin(clk_snapshot,
-               ns_from_origin);
+       if (msg_ns_from_origin >= ns_from_origin) {
+               *got_first = true;
+               goto push_msg;
+       }
+
+skip_msg:
+       bt_object_put_no_null_check(msg);
+       goto end;
+
+push_msg:
+       g_queue_push_head(iterator->auto_seek_msgs, (void *) msg);
+       msg = NULL;
 
 end:
-       return ret;
+       BT_ASSERT(!msg || status != BT_MESSAGE_ITERATOR_STATUS_OK);
+       return status;
 }
 
 static
@@ -1121,6 +1191,7 @@ enum bt_message_iterator_status find_message_ge_ns_from_origin(
        const struct bt_message *messages[MSG_BATCH_SIZE];
        uint64_t user_count = 0;
        uint64_t i;
+       bool got_first = false;
 
        BT_ASSERT(iterator);
        memset(&messages[0], 0, sizeof(messages[0]) * MSG_BATCH_SIZE);
@@ -1170,60 +1241,22 @@ enum bt_message_iterator_status find_message_ge_ns_from_origin(
                        abort();
                }
 
-               /*
-                * Find first message which has a default clock snapshot
-                * that is greater than or equal to the requested value.
-                *
-                * For event and message iterator inactivity messages, compare with the
-                * default clock snapshot.
-                *
-                * For packet beginning messages, compare with the
-                * default beginning clock snapshot, if any.
-                *
-                * For packet end messages, compare with the default end
-                * clock snapshot, if any.
-                *
-                * For stream beginning, stream end, ignore.
-                */
                for (i = 0; i < user_count; i++) {
-                       const struct bt_message *msg = messages[i];
-                       int64_t msg_ns_from_origin;
-                       bool ignore = false;
-                       int ret;
-
-                       BT_ASSERT(msg);
-                       ret = get_message_ns_from_origin(msg, &msg_ns_from_origin,
-                               &ignore);
-                       if (ret) {
-                               status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
-                               goto end;
-                       }
-
-                       if (ignore) {
-                               /* Skip message without a clock snapshot */
+                       if (got_first) {
+                               g_queue_push_head(iterator->auto_seek_msgs,
+                                       (void *) messages[i]);
+                               messages[i] = NULL;
                                continue;
                        }
 
-                       if (msg_ns_from_origin >= ns_from_origin) {
-                               /*
-                                * We found it: move this message and
-                                * the following ones to the iterator's
-                                * auto-seek message array.
-                                */
-                               uint64_t j;
-
-                               for (j = i; j < user_count; j++) {
-                                       iterator->auto_seek_msgs->pdata[j - i] =
-                                               (void *) messages[j];
-                                       messages[j] = NULL;
-                               }
-
-                               iterator->auto_seek_msg_count = user_count - i;
+                       status = auto_seek_handle_message(iterator,
+                               ns_from_origin, messages[i], &got_first);
+                       if (status == BT_MESSAGE_ITERATOR_STATUS_OK) {
+                               /* Message was either put or moved */
+                               messages[i] = NULL;
+                       } else {
                                goto end;
                        }
-
-                       bt_object_put_no_null_check(msg);
-                       messages[i] = NULL;
                }
        }
 
@@ -1244,44 +1277,47 @@ enum bt_self_message_iterator_status post_auto_seek_next(
                bt_message_array_const msgs, uint64_t capacity,
                uint64_t *count)
 {
-       BT_ASSERT(iterator->auto_seek_msg_count <= capacity);
-       BT_ASSERT(iterator->auto_seek_msg_count > 0);
+       BT_ASSERT(!g_queue_is_empty(iterator->auto_seek_msgs));
+       *count = 0;
 
        /*
         * Move auto-seek messages to the output array (which is this
-        * iterator's base message array.
+        * iterator's base message array).
         */
-       memcpy(&msgs[0], &iterator->auto_seek_msgs->pdata[0],
-               sizeof(msgs[0]) * iterator->auto_seek_msg_count);
-       memset(&iterator->auto_seek_msgs->pdata[0], 0,
-               sizeof(iterator->auto_seek_msgs->pdata[0]) *
-               iterator->auto_seek_msg_count);
-       *count = iterator->auto_seek_msg_count;
-
-       /* Restore real user's "next" method */
-       switch (iterator->upstream_component->class->type) {
-       case BT_COMPONENT_CLASS_TYPE_SOURCE:
-       {
-               struct bt_component_class_source *src_comp_cls =
-                       (void *) iterator->upstream_component->class;
-
-               iterator->methods.next =
-                       (bt_self_component_port_input_message_iterator_next_method)
-                               src_comp_cls->methods.msg_iter_next;
-               break;
+       while (capacity > 0 && !g_queue_is_empty(iterator->auto_seek_msgs)) {
+               msgs[*count] = g_queue_pop_tail(iterator->auto_seek_msgs);
+               capacity--;
+               (*count)++;
        }
-       case BT_COMPONENT_CLASS_TYPE_FILTER:
-       {
-               struct bt_component_class_filter *flt_comp_cls =
-                       (void *) iterator->upstream_component->class;
 
-               iterator->methods.next =
-                       (bt_self_component_port_input_message_iterator_next_method)
-                               flt_comp_cls->methods.msg_iter_next;
-               break;
-       }
-       default:
-               abort();
+       BT_ASSERT(*count > 0);
+
+       if (g_queue_is_empty(iterator->auto_seek_msgs)) {
+               /* No more auto-seek messages */
+               switch (iterator->upstream_component->class->type) {
+               case BT_COMPONENT_CLASS_TYPE_SOURCE:
+               {
+                       struct bt_component_class_source *src_comp_cls =
+                               (void *) iterator->upstream_component->class;
+
+                       iterator->methods.next =
+                               (bt_self_component_port_input_message_iterator_next_method)
+                                       src_comp_cls->methods.msg_iter_next;
+                       break;
+               }
+               case BT_COMPONENT_CLASS_TYPE_FILTER:
+               {
+                       struct bt_component_class_filter *flt_comp_cls =
+                               (void *) iterator->upstream_component->class;
+
+                       iterator->methods.next =
+                               (bt_self_component_port_input_message_iterator_next_method)
+                                       flt_comp_cls->methods.msg_iter_next;
+                       break;
+               }
+               default:
+                       abort();
+               }
        }
 
        return BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
@@ -1353,36 +1389,45 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin(
                /*
                 * Find the first message which has a default clock
                 * snapshot greater than or equal to the requested
-                * nanoseconds from origin, and move the received
-                * messages from this point in the batch to this
-                * iterator's auto-seek message array.
+                * seeking time, and move the received messages from
+                * this point in the batch to this iterator's auto-seek
+                * message queue.
                 */
+               while (!g_queue_is_empty(iterator->auto_seek_msgs)) {
+                       bt_object_put_no_null_check(
+                               g_queue_pop_tail(iterator->auto_seek_msgs));
+               }
+
                status = find_message_ge_ns_from_origin(iterator,
                        ns_from_origin);
                switch (status) {
                case BT_MESSAGE_ITERATOR_STATUS_OK:
+               case BT_MESSAGE_ITERATOR_STATUS_END:
                        /*
-                        * Replace the user's "next" method with a
-                        * custom, temporary "next" method which returns
-                        * the messages in the iterator's message array.
+                        * If there are messages in the auto-seek
+                        * message queue, replace the user's "next"
+                        * method with a custom, temporary "next" method
+                        * which returns them.
                         */
-                       iterator->methods.next =
-                               (bt_self_component_port_input_message_iterator_next_method)
-                                       post_auto_seek_next;
+                       if (!g_queue_is_empty(iterator->auto_seek_msgs)) {
+                               iterator->methods.next =
+                                       (bt_self_component_port_input_message_iterator_next_method)
+                                               post_auto_seek_next;
+                       }
+
+                       /*
+                        * `BT_MESSAGE_ITERATOR_STATUS_END` becomes
+                        * `BT_MESSAGE_ITERATOR_STATUS_OK`: the next
+                        * time this iterator's "next" method is called,
+                        * it will return
+                        * `BT_MESSAGE_ITERATOR_STATUS_END`.
+                        */
+                       status = BT_MESSAGE_ITERATOR_STATUS_OK;
                        break;
                case BT_MESSAGE_ITERATOR_STATUS_ERROR:
                case BT_MESSAGE_ITERATOR_STATUS_NOMEM:
                case BT_MESSAGE_ITERATOR_STATUS_AGAIN:
                        goto end;
-               case BT_MESSAGE_ITERATOR_STATUS_END:
-                       /*
-                        * The iterator reached the end: just return
-                        * `BT_MESSAGE_ITERATOR_STATUS_OK` here, as if
-                        * the seeking operation occured: the next
-                        * "next" method will return
-                        * `BT_MESSAGE_ITERATOR_STATUS_END` itself.
-                        */
-                       break;
                default:
                        abort();
                }
@@ -1390,11 +1435,6 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin(
 
 end:
        set_iterator_state_after_seeking(iterator, status);
-
-       if (status == BT_MESSAGE_ITERATOR_STATUS_END) {
-               status = BT_MESSAGE_ITERATOR_STATUS_OK;
-       }
-
        return status;
 }
 
This page took 0.033195 seconds and 4 git commands to generate.