src.ctf.fs: make ctf_fs_ds_file_group_create return a ctf_fs_ds_file_group::UP
[babeltrace.git] / src / plugins / ctf / fs-src / data-stream-file.cpp
index fe6a7c3e0c144054e9ff6e7db3943f7a3d5a7df6..15a7a368709957c563a7e1ac84dd1ce0ee2a3bf3 100644 (file)
@@ -340,7 +340,7 @@ ctf_fs_ds_group_medops_set_file(struct ctf_fs_ds_group_medops_data *data,
      * Ensure the right portion of the file will be returned on the next
      * request_bytes call.
      */
-    status = ds_file_mmap(data->file, index_entry->offset);
+    status = ds_file_mmap(data->file, index_entry->offset.bytes());
     if (status != CTF_MSG_ITER_MEDIUM_STATUS_OK) {
         goto end;
     }
@@ -442,9 +442,11 @@ static void ctf_fs_ds_index_entry_destroy(ctf_fs_ds_index_entry *entry)
     delete entry;
 }
 
-static struct ctf_fs_ds_index_entry *ctf_fs_ds_index_entry_create()
+static struct ctf_fs_ds_index_entry *ctf_fs_ds_index_entry_create(const bt2c::DataLen offset,
+                                                                  const bt2c::DataLen packetSize)
 {
-    ctf_fs_ds_index_entry *entry = new ctf_fs_ds_index_entry;
+    ctf_fs_ds_index_entry *entry = new ctf_fs_ds_index_entry {offset, packetSize};
+
     entry->packet_seq_num = UINT64_MAX;
 
     return entry;
@@ -472,7 +474,7 @@ static struct ctf_fs_ds_index *build_index_from_idx_file(struct ctf_fs_ds_file *
     const struct ctf_packet_index_file_hdr *header = NULL;
     struct ctf_fs_ds_index *index = NULL;
     struct ctf_fs_ds_index_entry *index_entry = NULL, *prev_index_entry = NULL;
-    uint64_t total_packets_size = 0;
+    auto totalPacketsSize = bt2c::DataLen::fromBytes(0);
     size_t file_index_entry_size;
     size_t file_entry_count;
     size_t i;
@@ -584,15 +586,26 @@ static struct ctf_fs_ds_index *build_index_from_idx_file(struct ctf_fs_ds_file *
 
     for (i = 0; i < file_entry_count; i++) {
         struct ctf_packet_index *file_index = (struct ctf_packet_index *) file_pos;
-        uint64_t packet_size = be64toh(file_index->packet_size);
+        const auto packetSize = bt2c::DataLen::fromBits(be64toh(file_index->packet_size));
 
-        if (packet_size % CHAR_BIT) {
+        if (packetSize.hasExtraBits()) {
             BT_CPPLOGW_SPEC(ds_file->logger,
                             "Invalid packet size encountered in LTTng trace index file");
             goto error;
         }
 
-        index_entry = ctf_fs_ds_index_entry_create();
+        const auto offset = bt2c::DataLen::fromBytes(be64toh(file_index->offset));
+
+        if (i != 0 && offset < prev_index_entry->offset) {
+            BT_CPPLOGW_SPEC(
+                ds_file->logger,
+                "Invalid, non-monotonic, packet offset encountered in LTTng trace index file: "
+                "previous offset={} bytes, current offset={} bytes",
+                prev_index_entry->offset.bytes(), offset.bytes());
+            goto error;
+        }
+
+        index_entry = ctf_fs_ds_index_entry_create(offset, packetSize);
         if (!index_entry) {
             BT_CPPLOGE_APPEND_CAUSE_SPEC(ds_file->logger,
                                          "Failed to create a ctf_fs_ds_index_entry.");
@@ -602,20 +615,6 @@ static struct ctf_fs_ds_index *build_index_from_idx_file(struct ctf_fs_ds_file *
         /* Set path to stream file. */
         index_entry->path = file_info->path->str;
 
-        /* Convert size in bits to bytes. */
-        packet_size /= CHAR_BIT;
-        index_entry->packet_size = packet_size;
-
-        index_entry->offset = be64toh(file_index->offset);
-        if (i != 0 && index_entry->offset < prev_index_entry->offset) {
-            BT_CPPLOGW_SPEC(
-                ds_file->logger,
-                "Invalid, non-monotonic, packet offset encountered in LTTng trace index file: "
-                "previous offset={}, current offset={}",
-                prev_index_entry->offset, index_entry->offset);
-            goto error;
-        }
-
         index_entry->timestamp_begin = be64toh(file_index->timestamp_begin);
         index_entry->timestamp_end = be64toh(file_index->timestamp_end);
         if (index_entry->timestamp_end < index_entry->timestamp_begin) {
@@ -649,7 +648,7 @@ static struct ctf_fs_ds_index *build_index_from_idx_file(struct ctf_fs_ds_file *
             index_entry->packet_seq_num = be64toh(file_index->packet_seq_num);
         }
 
-        total_packets_size += packet_size;
+        totalPacketsSize += packetSize;
         file_pos += file_index_entry_size;
 
         prev_index_entry = index_entry;
@@ -660,11 +659,11 @@ static struct ctf_fs_ds_index *build_index_from_idx_file(struct ctf_fs_ds_file *
     }
 
     /* Validate that the index addresses the complete stream. */
-    if (ds_file->file->size != total_packets_size) {
+    if (ds_file->file->size != totalPacketsSize.bytes()) {
         BT_CPPLOGW_SPEC(ds_file->logger,
                         "Invalid LTTng trace index file; indexed size != stream file size: "
-                        "file-size={}, total-packets-size={}",
-                        ds_file->file->size, total_packets_size);
+                        "file-size={} bytes, total-packets-size={} bytes",
+                        ds_file->file->size, totalPacketsSize.bytes());
         goto error;
     }
 end:
@@ -686,18 +685,13 @@ error:
 }
 
 static int init_index_entry(struct ctf_fs_ds_index_entry *entry, struct ctf_fs_ds_file *ds_file,
-                            struct ctf_msg_iter_packet_properties *props, off_t packet_size,
-                            off_t packet_offset)
+                            struct ctf_msg_iter_packet_properties *props)
 {
     int ret = 0;
     struct ctf_stream_class *sc;
 
     sc = ctf_trace_class_borrow_stream_class_by_id(ds_file->metadata->tc, props->stream_class_id);
     BT_ASSERT(sc);
-    BT_ASSERT(packet_offset >= 0);
-    entry->offset = packet_offset;
-    BT_ASSERT(packet_size >= 0);
-    entry->packet_size = packet_size;
 
     if (props->snapshots.beginning_clock != UINT64_C(-1)) {
         entry->timestamp_begin = props->snapshots.beginning_clock;
@@ -742,7 +736,7 @@ static struct ctf_fs_ds_index *build_index_from_stream_file(struct ctf_fs_ds_fil
     int ret;
     struct ctf_fs_ds_index *index = NULL;
     enum ctf_msg_iter_status iter_status = CTF_MSG_ITER_STATUS_OK;
-    off_t current_packet_offset_bytes = 0;
+    auto currentPacketOffset = bt2c::DataLen::fromBytes(0);
 
     BT_CPPLOGI_SPEC(ds_file->logger, "Indexing stream file {}", ds_file->file->path->str);
 
@@ -752,23 +746,18 @@ static struct ctf_fs_ds_index *build_index_from_stream_file(struct ctf_fs_ds_fil
     }
 
     while (true) {
-        off_t current_packet_size_bytes;
-        struct ctf_fs_ds_index_entry *index_entry;
         struct ctf_msg_iter_packet_properties props;
 
-        if (current_packet_offset_bytes < 0) {
-            BT_CPPLOGE_STR_SPEC(ds_file->logger, "Cannot get the current packet's offset.");
-            goto error;
-        } else if (current_packet_offset_bytes > ds_file->file->size) {
+        if (currentPacketOffset.bytes() > ds_file->file->size) {
             BT_CPPLOGE_STR_SPEC(ds_file->logger,
                                 "Unexpected current packet's offset (larger than file).");
             goto error;
-        } else if (current_packet_offset_bytes == ds_file->file->size) {
+        } else if (currentPacketOffset.bytes() == ds_file->file->size) {
             /* No more data */
             break;
         }
 
-        iter_status = ctf_msg_iter_seek(msg_iter, current_packet_offset_bytes);
+        iter_status = ctf_msg_iter_seek(msg_iter, currentPacketOffset.bytes());
         if (iter_status != CTF_MSG_ITER_STATUS_OK) {
             goto error;
         }
@@ -778,23 +767,27 @@ static struct ctf_fs_ds_index *build_index_from_stream_file(struct ctf_fs_ds_fil
             goto error;
         }
 
-        if (props.exp_packet_total_size >= 0) {
-            current_packet_size_bytes = (uint64_t) props.exp_packet_total_size / 8;
-        } else {
-            current_packet_size_bytes = ds_file->file->size;
-        }
+        /*
+         * Get the current packet size from the packet header, if set.  Else,
+         * assume there is a single packet in the file, so take the file size
+         * as the packet size.
+         */
+        const auto currentPacketSize = props.exp_packet_total_size >= 0 ?
+                                           bt2c::DataLen::fromBits(props.exp_packet_total_size) :
+                                           bt2c::DataLen::fromBytes(ds_file->file->size);
 
-        if (current_packet_offset_bytes + current_packet_size_bytes > ds_file->file->size) {
+        if ((currentPacketOffset + currentPacketSize).bytes() > ds_file->file->size) {
             BT_CPPLOGW_SPEC(ds_file->logger,
                             "Invalid packet size reported in file: stream=\"{}\", "
-                            "packet-offset={}, packet-size-bytes={}, "
-                            "file-size={}",
-                            ds_file->file->path->str, (intmax_t) current_packet_offset_bytes,
-                            (intmax_t) current_packet_size_bytes, (intmax_t) ds_file->file->size);
+                            "packet-offset-bytes={}, packet-size-bytes={}, "
+                            "file-size-bytes={}",
+                            ds_file->file->path->str, currentPacketOffset.bytes(),
+                            currentPacketSize.bytes(), ds_file->file->size);
             goto error;
         }
 
-        index_entry = ctf_fs_ds_index_entry_create();
+        const auto index_entry =
+            ctf_fs_ds_index_entry_create(currentPacketOffset, currentPacketSize);
         if (!index_entry) {
             BT_CPPLOGE_APPEND_CAUSE_SPEC(ds_file->logger,
                                          "Failed to create a ctf_fs_ds_index_entry.");
@@ -804,8 +797,7 @@ static struct ctf_fs_ds_index *build_index_from_stream_file(struct ctf_fs_ds_fil
         /* Set path to stream file. */
         index_entry->path = file_info->path->str;
 
-        ret = init_index_entry(index_entry, ds_file, &props, current_packet_size_bytes,
-                               current_packet_offset_bytes);
+        ret = init_index_entry(index_entry, ds_file, &props);
         if (ret) {
             ctf_fs_ds_index_entry_destroy(index_entry);
             goto error;
@@ -813,12 +805,12 @@ static struct ctf_fs_ds_index *build_index_from_stream_file(struct ctf_fs_ds_fil
 
         g_ptr_array_add(index->entries, index_entry);
 
-        current_packet_offset_bytes += current_packet_size_bytes;
+        currentPacketOffset += currentPacketSize;
         BT_CPPLOGD_SPEC(ds_file->logger,
-                        "Seeking to next packet: current-packet-offset={}, "
-                        "next-packet-offset={}",
-                        (intmax_t) (current_packet_offset_bytes - current_packet_size_bytes),
-                        (intmax_t) current_packet_offset_bytes);
+                        "Seeking to next packet: current-packet-offset-bytes={}, "
+                        "next-packet-offset-bytes={}",
+                        (currentPacketOffset - currentPacketSize).bytes(),
+                        currentPacketOffset.bytes());
     }
 
 end:
@@ -933,3 +925,82 @@ void ctf_fs_ds_index_destroy(struct ctf_fs_ds_index *index)
 
     delete index;
 }
+
+void ctf_fs_ds_file_info_destroy(struct ctf_fs_ds_file_info *ds_file_info)
+{
+    if (!ds_file_info) {
+        return;
+    }
+
+    if (ds_file_info->path) {
+        g_string_free(ds_file_info->path, TRUE);
+    }
+
+    delete ds_file_info;
+}
+
+struct ctf_fs_ds_file_info *ctf_fs_ds_file_info_create(const char *path, int64_t begin_ns)
+{
+    ctf_fs_ds_file_info *ds_file_info = new ctf_fs_ds_file_info;
+    ds_file_info->path = g_string_new(path);
+    if (!ds_file_info->path) {
+        ctf_fs_ds_file_info_destroy(ds_file_info);
+        ds_file_info = NULL;
+        goto end;
+    }
+
+    ds_file_info->begin_ns = begin_ns;
+
+end:
+    return ds_file_info;
+}
+
+void ctf_fs_ds_file_group_destroy(struct ctf_fs_ds_file_group *ds_file_group)
+{
+    if (!ds_file_group) {
+        return;
+    }
+
+    if (ds_file_group->ds_file_infos) {
+        g_ptr_array_free(ds_file_group->ds_file_infos, TRUE);
+    }
+
+    ctf_fs_ds_index_destroy(ds_file_group->index);
+
+    bt_stream_put_ref(ds_file_group->stream);
+    delete ds_file_group;
+}
+
+void ctf_fs_ds_file_group_deleter::operator()(ctf_fs_ds_file_group *group) noexcept
+{
+    ctf_fs_ds_file_group_destroy(group);
+}
+
+ctf_fs_ds_file_group::UP ctf_fs_ds_file_group_create(struct ctf_fs_trace *ctf_fs_trace,
+                                                     struct ctf_stream_class *sc,
+                                                     uint64_t stream_instance_id,
+                                                     struct ctf_fs_ds_index *index)
+{
+    ctf_fs_ds_file_group::UP ds_file_group {new ctf_fs_ds_file_group};
+
+    ds_file_group->ds_file_infos =
+        g_ptr_array_new_with_free_func((GDestroyNotify) ctf_fs_ds_file_info_destroy);
+    if (!ds_file_group->ds_file_infos) {
+        goto error;
+    }
+
+    ds_file_group->index = index;
+
+    ds_file_group->stream_id = stream_instance_id;
+    BT_ASSERT(sc);
+    ds_file_group->sc = sc;
+    ds_file_group->ctf_fs_trace = ctf_fs_trace;
+    goto end;
+
+error:
+    ds_file_group.reset();
+    ctf_fs_ds_index_destroy(index);
+
+end:
+    return ds_file_group;
+}
This page took 0.027654 seconds and 4 git commands to generate.