src.ctf.fs: add and use medops to iterate on a ds_file_group using the index
[babeltrace.git] / src / plugins / ctf / fs-src / data-stream-file.c
index 7fe4e6a52f1d2a12e6a359a39cc9bb28a0341629..fe14f8cc3622d99fb0688ee894ff15d74d936b47 100644 (file)
@@ -99,7 +99,9 @@ end:
  * mapping.  If the currently mmap-ed region already contains
  * `requested_offset_in_file`, the mapping is kept.
  *
- * Set `ds_file->requested_offset_in_mapping` based on `request_offset_in_file`
+ * Set `ds_file->requested_offset_in_mapping` based on `request_offset_in_file`,
+ * such that the next call to `request_bytes` will return bytes starting at that
+ * position.
  *
  * `requested_offset_in_file` must be a valid offset in the file.
  */
@@ -301,6 +303,217 @@ struct ctf_msg_iter_medium_ops ctf_fs_ds_file_medops = {
        .seek = medop_seek,
 };
 
+struct ctf_fs_ds_group_medops_data {
+       /* Weak, set once at creation time. */
+       struct ctf_fs_ds_file_group *ds_file_group;
+
+       /*
+        * Index (as in element rank) of the index entry of ds_file_groups'
+        * index we will read next (so, the one after the one we are reading
+        * right now).
+        */
+       guint next_index_entry_index;
+
+       /*
+        * File we are currently reading.  Changes whenever we switch to
+        * reading another data file.
+        *
+        * Owned by this.
+        */
+       struct ctf_fs_ds_file *file;
+
+       /* Weak, for context / logging / appending causes. */
+       bt_self_message_iterator *self_msg_iter;
+       bt_logging_level log_level;
+};
+
+static
+enum ctf_msg_iter_medium_status medop_group_request_bytes(
+               size_t request_sz,
+               uint8_t **buffer_addr,
+               size_t *buffer_sz,
+               void *void_data)
+{
+       struct ctf_fs_ds_group_medops_data *data = void_data;
+
+       /* Return bytes from the current file. */
+       return medop_request_bytes(request_sz, buffer_addr, buffer_sz, data->file);
+}
+
+static
+bt_stream *medop_group_borrow_stream(
+               bt_stream_class *stream_class,
+               int64_t stream_id,
+               void *void_data)
+{
+       struct ctf_fs_ds_group_medops_data *data = void_data;
+
+       return medop_borrow_stream(stream_class, stream_id, data->file);
+}
+
+/*
+ * Set `data->file` to prepare it to read the packet described
+ * by `index_entry`.
+ */
+
+static
+enum ctf_msg_iter_medium_status ctf_fs_ds_group_medops_set_file(
+               struct ctf_fs_ds_group_medops_data *data,
+               struct ctf_fs_ds_index_entry *index_entry,
+               bt_self_message_iterator *self_msg_iter,
+               bt_logging_level log_level)
+{
+       enum ctf_msg_iter_medium_status status;
+
+       BT_ASSERT(data);
+       BT_ASSERT(index_entry);
+
+       /* Check if that file is already the one mapped. */
+       if (!data->file || strcmp(index_entry->path, data->file->file->path->str) != 0) {
+               /* Destroy the previously used file. */
+               ctf_fs_ds_file_destroy(data->file);
+
+               /* Create the new file. */
+               data->file = ctf_fs_ds_file_create(
+                       data->ds_file_group->ctf_fs_trace,
+                       self_msg_iter,
+                       data->ds_file_group->stream,
+                       index_entry->path,
+                       log_level);
+               if (!data->file) {
+                       BT_MSG_ITER_LOGE_APPEND_CAUSE(self_msg_iter,
+                               "failed to create ctf_fs_ds_file.");
+                       status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
+                       goto end;
+               }
+       }
+
+       /*
+        * Ensure the right portion of the file will be returned on the next
+        * request_bytes call.
+        */
+       status = ds_file_mmap(data->file, index_entry->offset);
+       if (status != CTF_MSG_ITER_MEDIUM_STATUS_OK) {
+               goto end;
+       }
+
+       status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
+
+end:
+       return status;
+}
+
+static
+enum ctf_msg_iter_medium_status medop_group_switch_packet(void *void_data)
+{
+       struct ctf_fs_ds_group_medops_data *data = void_data;
+       struct ctf_fs_ds_index_entry *index_entry;
+       enum ctf_msg_iter_medium_status status;
+
+       /* If we have gone through all index entries, we are done. */
+       if (data->next_index_entry_index >=
+               data->ds_file_group->index->entries->len) {
+               status = CTF_MSG_ITER_MEDIUM_STATUS_EOF;
+               goto end;
+       }
+
+       /*
+        * Otherwise, look up the next index entry / packet and prepare it
+        *  for reading.
+        */
+       index_entry = g_ptr_array_index(
+               data->ds_file_group->index->entries,
+               data->next_index_entry_index);
+
+       status = ctf_fs_ds_group_medops_set_file(
+               data, index_entry, data->self_msg_iter, data->log_level);
+       if (status != CTF_MSG_ITER_MEDIUM_STATUS_OK) {
+               goto end;
+       }
+
+       data->next_index_entry_index++;
+
+       status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
+end:
+       return status;
+}
+
+BT_HIDDEN
+void ctf_fs_ds_group_medops_data_destroy(
+               struct ctf_fs_ds_group_medops_data *data)
+{
+       if (!data) {
+               goto end;
+       }
+
+       ctf_fs_ds_file_destroy(data->file);
+
+       g_free(data);
+
+end:
+       return;
+}
+
+enum ctf_msg_iter_medium_status ctf_fs_ds_group_medops_data_create(
+               struct ctf_fs_ds_file_group *ds_file_group,
+               bt_self_message_iterator *self_msg_iter,
+               bt_logging_level log_level,
+               struct ctf_fs_ds_group_medops_data **out)
+{
+       struct ctf_fs_ds_group_medops_data *data;
+       enum ctf_msg_iter_medium_status status;
+
+       BT_ASSERT(self_msg_iter);
+       BT_ASSERT(ds_file_group);
+       BT_ASSERT(ds_file_group->index);
+       BT_ASSERT(ds_file_group->index->entries->len > 0);
+
+       data = g_new0(struct ctf_fs_ds_group_medops_data, 1);
+       if (!data) {
+               BT_MSG_ITER_LOGE_APPEND_CAUSE(self_msg_iter,
+                       "Failed to allocate a struct ctf_fs_ds_group_medops_data");
+               status = CTF_MSG_ITER_MEDIUM_STATUS_MEMORY_ERROR;
+               goto error;
+       }
+
+       data->ds_file_group = ds_file_group;
+       data->self_msg_iter = self_msg_iter;
+       data->log_level = log_level;
+
+       /*
+        * No need to prepare the first file.  ctf_msg_iter will call
+        * switch_packet before reading the first packet, it will be
+        * done then.
+        */
+
+       *out = data;
+       status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
+       goto end;
+
+error:
+       ctf_fs_ds_group_medops_data_destroy(data);
+
+end:
+       return status;
+}
+
+void ctf_fs_ds_group_medops_data_reset(struct ctf_fs_ds_group_medops_data *data)
+{
+       data->next_index_entry_index = 0;
+}
+
+struct ctf_msg_iter_medium_ops ctf_fs_ds_group_medops = {
+       .request_bytes = medop_group_request_bytes,
+       .borrow_stream = medop_group_borrow_stream,
+       .switch_packet = medop_group_switch_packet,
+
+       /*
+        * We don't support seeking using this medops.  It would probably be
+        * possible, but it's not needed at the moment.
+        */
+       .seek = NULL,
+};
+
 static
 struct ctf_fs_ds_index_entry *ctf_fs_ds_index_entry_create(
                bt_self_component *self_comp, bt_logging_level log_level)
This page took 0.025719 seconds and 4 git commands to generate.