src.ctf.fs: merge all indexes to the fs_ds_group level
authorFrancis Deslauriers <francis.deslauriers@efficios.com>
Wed, 15 May 2019 18:59:10 +0000 (14:59 -0400)
committerPhilippe Proulx <eeppeliteloop@gmail.com>
Thu, 6 Jun 2019 21:20:55 +0000 (17:20 -0400)
Background
==========
The index contains information on the packets of the underlying CTF
trace (e.g. timestamp begin and end). It is currently used to compute
the time boundaries of the all streams during the `trace-info` query. It
will probably be used for seeking in the future. Currently, each
datastream file has its own index.

To find the time boundaries of a given stream, we need to get the first
entry of the first datastream file and the last entry of the last
datastream file.

Why merge all the indexes ?
===========================
There are two reasons why we want to merge the file indexes into a
single stream index:
* It's slightly simpler to extract the time boundaries of the stream.

* Changes to overcome various packet timestamps tracer bugs are about to
  be introduced and will need to iterate over all sorted index entries of
  each stream.

Approach
========
Move the index from `struct ctf_fs_ds_file_info` to
`struct ctf_fs_ds_file_group` and merge the indexes of a given stream
when merging the different chunks of the same trace.

Signed-off-by: Francis Deslauriers <francis.deslauriers@efficios.com>
Change-Id: Iabee86c7c76c69b56cbf86449f8930e6d35969e2
Reviewed-on: https://review.lttng.org/c/babeltrace/+/1362
Tested-by: jenkins
Reviewed-by: Philippe Proulx <eeppeliteloop@gmail.com>
plugins/ctf/fs-src/data-stream-file.c
plugins/ctf/fs-src/data-stream-file.h
plugins/ctf/fs-src/fs.c
plugins/ctf/fs-src/fs.h
plugins/ctf/fs-src/query.c

index 924c579f5227b38b3105235a29fc9f95f9d2c94b..1f75eaa4caa7908bb24358e007f893c0e09c1d35 100644 (file)
@@ -256,40 +256,6 @@ struct bt_msg_iter_medium_ops ctf_fs_ds_file_medops = {
        .seek = medop_seek,
 };
 
-static
-struct ctf_fs_ds_index *ctf_fs_ds_index_create(size_t length)
-{
-       struct ctf_fs_ds_index *index = g_new0(struct ctf_fs_ds_index, 1);
-
-       if (!index) {
-               BT_LOGE_STR("Failed to allocate index");
-               goto error;
-       }
-
-       index->entries = g_array_sized_new(FALSE, TRUE,
-                       sizeof(struct ctf_fs_ds_index_entry), length);
-       if (!index->entries) {
-               BT_LOGE("Failed to allocate %zu index entries.", length);
-               goto error;
-       }
-       g_array_set_size(index->entries, length);
-end:
-       return index;
-error:
-       ctf_fs_ds_index_destroy(index);
-       goto end;
-}
-
-/* Returns a new, zeroed, index entry. */
-static
-struct ctf_fs_ds_index_entry *ctf_fs_ds_index_add_new_entry(
-               struct ctf_fs_ds_index *index)
-{
-       g_array_set_size(index->entries, index->entries->len + 1);
-       return &g_array_index(index->entries, struct ctf_fs_ds_index_entry,
-                       index->entries->len - 1);
-}
-
 static
 int convert_cycles_to_ns(struct ctf_clock_class *clock_class,
                uint64_t cycles, int64_t *ns)
@@ -400,13 +366,11 @@ struct ctf_fs_ds_index *build_index_from_idx_file(
                goto error;
        }
 
-       index = ctf_fs_ds_index_create(file_entry_count);
+       index = ctf_fs_ds_index_create();
        if (!index) {
                goto error;
        }
 
-       index_entry = (struct ctf_fs_ds_index_entry *) &g_array_index(
-                       index->entries, struct ctf_fs_ds_index_entry, 0);
        for (i = 0; i < file_entry_count; i++) {
                struct ctf_packet_index *file_index =
                                (struct ctf_packet_index *) file_pos;
@@ -417,6 +381,11 @@ struct ctf_fs_ds_index *build_index_from_idx_file(
                        goto error;
                }
 
+               index_entry = g_new0(struct ctf_fs_ds_index_entry, 1);
+               if (!index_entry) {
+                       goto error;
+               }
+
                /* Convert size in bits to bytes. */
                packet_size /= CHAR_BIT;
                index_entry->packet_size = packet_size;
@@ -457,7 +426,8 @@ struct ctf_fs_ds_index *build_index_from_idx_file(
 
                total_packets_size += packet_size;
                file_pos += file_index_entry_size;
-               index_entry++;
+
+               g_ptr_array_add(index->entries, index_entry);
        }
 
        /* Validate that the index addresses the complete stream. */
@@ -480,6 +450,7 @@ end:
        return index;
 error:
        ctf_fs_ds_index_destroy(index);
+       g_free(index_entry);
        index = NULL;
        goto end;
 }
@@ -541,14 +512,14 @@ struct ctf_fs_ds_index *build_index_from_stream_file(
 
        BT_LOGD("Indexing stream file %s", ds_file->file->path->str);
 
-       index = ctf_fs_ds_index_create(0);
+       index = ctf_fs_ds_index_create();
        if (!index) {
                goto error;
        }
 
        do {
                off_t current_packet_size_bytes;
-               struct ctf_fs_ds_index_entry *entry;
+               struct ctf_fs_ds_index_entry *index_entry;
                struct bt_msg_iter_packet_properties props;
 
                if (current_packet_offset_bytes < 0) {
@@ -598,17 +569,22 @@ struct ctf_fs_ds_index *build_index_from_stream_file(
                        "next-packet-offset=%jd",
                        current_packet_offset_bytes - current_packet_size_bytes,
                        current_packet_offset_bytes);
-               entry = ctf_fs_ds_index_add_new_entry(index);
-               if (!entry) {
+
+               index_entry = g_new0(struct ctf_fs_ds_index_entry, 1);
+               if (!index_entry) {
                        BT_LOGE_STR("Failed to allocate a new index entry.");
                        goto error;
                }
 
-               ret = init_index_entry(entry, ds_file, &props,
+               ret = init_index_entry(index_entry, ds_file, &props,
                        current_packet_size_bytes, current_packet_offset_bytes);
                if (ret) {
+                       g_free(index_entry);
                        goto error;
                }
+
+               g_ptr_array_add(index->entries, index_entry);
+
        } while (iter_status == BT_MSG_ITER_STATUS_OK);
 
        if (iter_status != BT_MSG_ITER_STATUS_OK) {
@@ -691,6 +667,31 @@ end:
        return index;
 }
 
+BT_HIDDEN
+struct ctf_fs_ds_index *ctf_fs_ds_index_create()
+{
+       struct ctf_fs_ds_index *index = g_new0(struct ctf_fs_ds_index, 1);
+
+       if (!index) {
+               BT_LOGE_STR("Failed to allocate index");
+               goto error;
+       }
+
+       index->entries = g_ptr_array_new_with_free_func((GDestroyNotify) g_free);
+       if (!index->entries) {
+               BT_LOGE("Failed to allocate index entries.");
+               goto error;
+       }
+
+       goto end;
+
+error:
+       ctf_fs_ds_index_destroy(index);
+       index = NULL;
+end:
+       return index;
+}
+
 BT_HIDDEN
 void ctf_fs_ds_file_destroy(struct ctf_fs_ds_file *ds_file)
 {
@@ -750,7 +751,7 @@ void ctf_fs_ds_index_destroy(struct ctf_fs_ds_index *index)
        }
 
        if (index->entries) {
-               g_array_free(index->entries, TRUE);
+               g_ptr_array_free(index->entries, TRUE);
        }
        g_free(index);
 }
index 61007f817e4e78f48d464a1e7e9b9d13a32437b3..1231be1910c20f16a60c6215269e8e77834e906c 100644 (file)
@@ -37,46 +37,7 @@ struct ctf_fs_file;
 struct ctf_fs_trace;
 struct ctf_fs_ds_file;
 
-struct ctf_fs_ds_index_entry {
-       /* Position, in bytes, of the packet from the beginning of the file. */
-       uint64_t offset;
-
-       /* Size of the packet, in bytes. */
-       uint64_t packet_size;
-
-       /*
-        * Extracted from the packet context, relative to the respective fields'
-        * mapped clock classes (in cycles).
-        */
-       uint64_t timestamp_begin, timestamp_end;
-
-       /*
-        * Converted from the packet context, relative to the trace's EPOCH
-        * (in ns since EPOCH).
-        */
-       int64_t timestamp_begin_ns, timestamp_end_ns;
-};
-
-struct ctf_fs_ds_index {
-       /* Array of struct ctf_fs_fd_index_entry. */
-       GArray *entries;
-};
-
 struct ctf_fs_ds_file_info {
-       /*
-        * Owned by this. May be NULL.
-        *
-        * A stream cannot be assumed to be indexed as the indexing might have
-        * been skipped. Moreover, the index's fields may not all be available
-        * depending on the producer (e.g. timestamp_begin/end are not
-        * mandatory).
-        *
-        * FIXME In such cases (missing fields), the indexing is aborted as
-        * no the index entries don't have a concept of fields being present
-        * or not.
-        */
-       struct ctf_fs_ds_index *index;
-
        /* Owned by this. */
        GString *path;
 
@@ -144,6 +105,9 @@ BT_HIDDEN
 struct ctf_fs_ds_index *ctf_fs_ds_file_build_index(
                struct ctf_fs_ds_file *ds_file);
 
+BT_HIDDEN
+struct ctf_fs_ds_index *ctf_fs_ds_index_create();
+
 BT_HIDDEN
 void ctf_fs_ds_index_destroy(struct ctf_fs_ds_index *index);
 
index 9cab8cf46cf6eb2683ad5bd979470253f83ad782..b0ca793b031a89f597dde81e9a0d0fdbae4a7d80 100644 (file)
@@ -524,13 +524,12 @@ void ctf_fs_ds_file_info_destroy(struct ctf_fs_ds_file_info *ds_file_info)
                g_string_free(ds_file_info->path, TRUE);
        }
 
-       ctf_fs_ds_index_destroy(ds_file_info->index);
        g_free(ds_file_info);
 }
 
 static
 struct ctf_fs_ds_file_info *ctf_fs_ds_file_info_create(const char *path,
-               int64_t begin_ns, struct ctf_fs_ds_index *index)
+               int64_t begin_ns)
 {
        struct ctf_fs_ds_file_info *ds_file_info;
 
@@ -547,11 +546,8 @@ struct ctf_fs_ds_file_info *ctf_fs_ds_file_info_create(const char *path,
        }
 
        ds_file_info->begin_ns = begin_ns;
-       ds_file_info->index = index;
-       index = NULL;
 
 end:
-       ctf_fs_ds_index_destroy(index);
        return ds_file_info;
 }
 
@@ -566,6 +562,13 @@ void ctf_fs_ds_file_group_destroy(struct ctf_fs_ds_file_group *ds_file_group)
                g_ptr_array_free(ds_file_group->ds_file_infos, TRUE);
        }
 
+       if (ds_file_group->index) {
+               if (ds_file_group->index->entries) {
+                       g_ptr_array_free(ds_file_group->index->entries, TRUE);
+               }
+               g_free(ds_file_group->index);
+       }
+
        bt_stream_put_ref(ds_file_group->stream);
        g_free(ds_file_group);
 }
@@ -574,7 +577,8 @@ static
 struct ctf_fs_ds_file_group *ctf_fs_ds_file_group_create(
                struct ctf_fs_trace *ctf_fs_trace,
                struct ctf_stream_class *sc,
-               uint64_t stream_instance_id)
+               uint64_t stream_instance_id,
+               struct ctf_fs_ds_index *index)
 {
        struct ctf_fs_ds_file_group *ds_file_group;
 
@@ -589,6 +593,8 @@ struct ctf_fs_ds_file_group *ctf_fs_ds_file_group_create(
                goto error;
        }
 
+       ds_file_group->index = index;
+
        ds_file_group->stream_id = stream_instance_id;
        BT_ASSERT(sc);
        ds_file_group->sc = sc;
@@ -597,6 +603,7 @@ struct ctf_fs_ds_file_group *ctf_fs_ds_file_group_create(
 
 error:
        ctf_fs_ds_file_group_destroy(ds_file_group);
+       ctf_fs_ds_index_destroy(index);
        ds_file_group = NULL;
 
 end:
@@ -648,6 +655,26 @@ void ds_file_group_insert_ds_file_info_sorted(
        array_insert(ds_file_group->ds_file_infos, ds_file_info, i);
 }
 
+static
+void ds_file_group_insert_ds_index_entry_sorted(
+       struct ctf_fs_ds_file_group *ds_file_group,
+       struct ctf_fs_ds_index_entry *entry)
+{
+       guint i;
+
+       /* Find the spot where to insert this index entry. */
+       for (i = 0; i < ds_file_group->index->entries->len; i++) {
+               struct ctf_fs_ds_index_entry *other_entry = g_ptr_array_index(
+                       ds_file_group->index->entries, i);
+
+               if (entry->timestamp_begin_ns < other_entry->timestamp_begin_ns) {
+                       break;
+               }
+       }
+
+       array_insert(ds_file_group->index->entries, entry, i);
+}
+
 /*
  * Create a new ds_file_info using the provided path, begin_ns and index, then
  * add it to ds_file_group's list of ds_file_infos.
@@ -656,15 +683,12 @@ void ds_file_group_insert_ds_file_info_sorted(
 static
 int ctf_fs_ds_file_group_add_ds_file_info(
                struct ctf_fs_ds_file_group *ds_file_group,
-               const char *path, int64_t begin_ns,
-               struct ctf_fs_ds_index *index)
+               const char *path, int64_t begin_ns)
 {
        struct ctf_fs_ds_file_info *ds_file_info;
        int ret = 0;
 
-       /* Onwership of index is transferred. */
-       ds_file_info = ctf_fs_ds_file_info_create(path, begin_ns, index);
-       index = NULL;
+       ds_file_info = ctf_fs_ds_file_info_create(path, begin_ns);
        if (!ds_file_info) {
                goto error;
        }
@@ -676,7 +700,6 @@ int ctf_fs_ds_file_group_add_ds_file_info(
 
 error:
        ctf_fs_ds_file_info_destroy(ds_file_info);
-       ctf_fs_ds_index_destroy(index);
        ret = -1;
 end:
        return ret;
@@ -761,15 +784,16 @@ int add_ds_file_to_ds_file_group(struct ctf_fs_trace *ctf_fs_trace,
                 * group.
                 */
                ds_file_group = ctf_fs_ds_file_group_create(ctf_fs_trace,
-                       sc, UINT64_C(-1));
+                       sc, UINT64_C(-1), index);
+               /* Ownership of index is transferred. */
+               index = NULL;
+
                if (!ds_file_group) {
                        goto error;
                }
 
                ret = ctf_fs_ds_file_group_add_ds_file_info(ds_file_group,
-                       path, begin_ns, index);
-               /* Ownership of index is transferred. */
-               index = NULL;
+                       path, begin_ns);
                if (ret) {
                        goto error;
                }
@@ -797,7 +821,9 @@ int add_ds_file_to_ds_file_group(struct ctf_fs_trace *ctf_fs_trace,
 
        if (!ds_file_group) {
                ds_file_group = ctf_fs_ds_file_group_create(ctf_fs_trace,
-                       sc, stream_instance_id);
+                       sc, stream_instance_id, index);
+               /* Ownership of index is transferred. */
+               index = NULL;
                if (!ds_file_group) {
                        goto error;
                }
@@ -806,8 +832,7 @@ int add_ds_file_to_ds_file_group(struct ctf_fs_trace *ctf_fs_trace,
        }
 
        ret = ctf_fs_ds_file_group_add_ds_file_info(ds_file_group, path,
-               begin_ns, index);
-       index = NULL;
+               begin_ns);
        if (ret) {
                goto error;
        }
@@ -1387,8 +1412,21 @@ void merge_ctf_fs_ds_file_groups(struct ctf_fs_ds_file_group *dest, struct ctf_f
 
                ds_file_group_insert_ds_file_info_sorted(dest, ds_file_info);
        }
-}
 
+       /* Merge both indexes. */
+       for (i = 0; i < src->index->entries->len; i++) {
+               struct ctf_fs_ds_index_entry *entry = g_ptr_array_index(
+                       src->index->entries, i);
+
+               /*
+                * Ownership of the ctf_fs_ds_index_entry is transferred to
+                * dest.
+                */
+               g_ptr_array_index(src->index->entries, i) = NULL;
+
+               ds_file_group_insert_ds_index_entry_sorted(dest, entry);
+       }
+}
 /* Merge src_trace's data stream file groups into dest_trace's. */
 
 static
@@ -1443,17 +1481,28 @@ int merge_matching_ctf_fs_ds_file_groups(
 
                /*
                 * Didn't find a friend in dest to merge our src_group into?
-                * Create a new empty one.
+                * Create a new empty one. This can happen if a stream was
+                * active in the source trace chunk but not in the destination
+                * trace chunk.
                 */
                if (!dest_group) {
                        struct ctf_stream_class *sc;
+                       struct ctf_fs_ds_index *index;
 
                        sc = ctf_trace_class_borrow_stream_class_by_id(
                                dest_trace->metadata->tc, src_group->sc->id);
                        BT_ASSERT(sc);
 
+                       index = ctf_fs_ds_index_create();
+                       if (!index) {
+                               ret = -1;
+                               goto end;
+                       }
+
                        dest_group = ctf_fs_ds_file_group_create(dest_trace, sc,
-                               src_group->stream_id);
+                               src_group->stream_id, index);
+                       /* Ownership of index is transferred. */
+                       index = NULL;
                        if (!dest_group) {
                                ret = -1;
                                goto end;
index ed65aa3c446fe21864f9e73e0fcd2ec36a61872c..9364af22c1db71f722b1732a95ffae8773494414 100644 (file)
@@ -99,6 +99,31 @@ struct ctf_fs_trace {
        uint64_t next_stream_id;
 };
 
+struct ctf_fs_ds_index_entry {
+       /* Position, in bytes, of the packet from the beginning of the file. */
+       uint64_t offset;
+
+       /* Size of the packet, in bytes. */
+       uint64_t packet_size;
+
+       /*
+        * Extracted from the packet context, relative to the respective fields'
+        * mapped clock classes (in cycles).
+        */
+       uint64_t timestamp_begin, timestamp_end;
+
+       /*
+        * Converted from the packet context, relative to the trace's EPOCH
+        * (in ns since EPOCH).
+        */
+       int64_t timestamp_begin_ns, timestamp_end_ns;
+};
+
+struct ctf_fs_ds_index {
+       /* Array of pointer to struct ctf_fs_fd_index_entry. */
+       GPtrArray *entries;
+};
+
 struct ctf_fs_ds_file_group {
        /*
         * Array of struct ctf_fs_ds_file_info, owned by this.
@@ -122,6 +147,20 @@ struct ctf_fs_ds_file_group {
 
        /* Weak, belongs to component */
        struct ctf_fs_trace *ctf_fs_trace;
+
+       /*
+        * Owned by this. May be NULL.
+        *
+        * A stream cannot be assumed to be indexed as the indexing might have
+        * been skipped. Moreover, the index's fields may not all be available
+        * depending on the producer (e.g. timestamp_begin/end are not
+        * mandatory).
+        *
+        * FIXME In such cases (missing fields), the indexing is aborted as
+        * no the index entries don't have a concept of fields being present
+        * or not.
+        */
+       struct ctf_fs_ds_index *index;
 };
 
 struct ctf_fs_port_data {
index d666e1c4f6f9baab495027975ac29dacfc5a6a08..9e30aeab57848f89fc692196c65d7b780201d67a 100644 (file)
@@ -271,7 +271,6 @@ int populate_stream_info(struct ctf_fs_ds_file_group *group,
        size_t file_idx;
        bt_value_status status;
        bt_value *file_paths;
-       struct ctf_fs_ds_file_info *first_file_info, *last_file_info;
        struct ctf_fs_ds_index_entry *first_ds_index_entry, *last_ds_index_entry;
        gchar *port_name = NULL;
 
@@ -295,33 +294,22 @@ int populate_stream_info(struct ctf_fs_ds_file_group *group,
        }
 
        /*
-        * Since `struct ctf_fs_ds_file_info` elements are sorted by value of
-        * `begin_ns` within the `ds_file_groups` array and `struct
-        * ctf_fs_ds_index_entry` elements are sorted by time within their
-        * respective `struct ctf_fs_ds_file_info`, we can compute the stream
-        * range from timestamp_begin of the first index entry of the first
-        * file to the timestamp_end of the last index entry of the last file.
+        * Since each `struct ctf_fs_ds_file_group` has a sorted array of
+        * `struct ctf_fs_ds_index_entry`, we can compute the stream range from
+        * the timestamp_begin of the first index entry and the timestamp_end
+        * of the last index entry.
         */
-       BT_ASSERT(group->ds_file_infos->len > 0);
+       BT_ASSERT(group->index);
+       BT_ASSERT(group->index->entries);
+       BT_ASSERT(group->index->entries->len > 0);
 
-       first_file_info = g_ptr_array_index(group->ds_file_infos, 0);
-       last_file_info = g_ptr_array_index(group->ds_file_infos,
-               group->ds_file_infos->len - 1);
+       /* First entry. */
+       first_ds_index_entry = (struct ctf_fs_ds_index_entry *) g_ptr_array_index(
+               group->index->entries, 0);
 
-       BT_ASSERT(first_file_info->index);
-       BT_ASSERT(first_file_info->index->entries);
-       BT_ASSERT(first_file_info->index->entries->len > 0);
-
-       first_ds_index_entry = (struct ctf_fs_ds_index_entry *) &g_array_index(
-               first_file_info->index->entries, struct ctf_fs_ds_index_entry, 0);
-
-       BT_ASSERT(last_file_info->index);
-       BT_ASSERT(last_file_info->index->entries);
-       BT_ASSERT(last_file_info->index->entries->len > 0);
-
-       last_ds_index_entry = (struct ctf_fs_ds_index_entry *) &g_array_index(
-               last_file_info->index->entries, struct ctf_fs_ds_index_entry,
-               last_file_info->index->entries->len - 1);
+       /* Last entry. */
+       last_ds_index_entry = (struct ctf_fs_ds_index_entry *) g_ptr_array_index(
+               group->index->entries, group->index->entries->len - 1);
 
        stream_range->begin_ns = first_ds_index_entry->timestamp_begin_ns;
        stream_range->end_ns = last_ds_index_entry->timestamp_end_ns;
This page took 0.032734 seconds and 4 git commands to generate.