* Ensure the right portion of the file will be returned on the next
* request_bytes call.
*/
- status = ds_file_mmap(data->file, index_entry->offset);
+ status = ds_file_mmap(data->file, index_entry->offset.bytes());
if (status != CTF_MSG_ITER_MEDIUM_STATUS_OK) {
goto end;
}
delete entry;
}
-static struct ctf_fs_ds_index_entry *ctf_fs_ds_index_entry_create()
+static struct ctf_fs_ds_index_entry *ctf_fs_ds_index_entry_create(const bt2c::DataLen offset,
+ const bt2c::DataLen packetSize)
{
- ctf_fs_ds_index_entry *entry = new ctf_fs_ds_index_entry;
+ ctf_fs_ds_index_entry *entry = new ctf_fs_ds_index_entry {offset, packetSize};
+
entry->packet_seq_num = UINT64_MAX;
return entry;
const struct ctf_packet_index_file_hdr *header = NULL;
struct ctf_fs_ds_index *index = NULL;
struct ctf_fs_ds_index_entry *index_entry = NULL, *prev_index_entry = NULL;
- uint64_t total_packets_size = 0;
+ auto totalPacketsSize = bt2c::DataLen::fromBytes(0);
size_t file_index_entry_size;
size_t file_entry_count;
size_t i;
for (i = 0; i < file_entry_count; i++) {
struct ctf_packet_index *file_index = (struct ctf_packet_index *) file_pos;
- uint64_t packet_size = be64toh(file_index->packet_size);
+ const auto packetSize = bt2c::DataLen::fromBits(be64toh(file_index->packet_size));
- if (packet_size % CHAR_BIT) {
+ if (packetSize.hasExtraBits()) {
BT_CPPLOGW_SPEC(ds_file->logger,
"Invalid packet size encountered in LTTng trace index file");
goto error;
}
- index_entry = ctf_fs_ds_index_entry_create();
+ const auto offset = bt2c::DataLen::fromBytes(be64toh(file_index->offset));
+
+ if (i != 0 && offset < prev_index_entry->offset) {
+ BT_CPPLOGW_SPEC(
+ ds_file->logger,
+ "Invalid, non-monotonic, packet offset encountered in LTTng trace index file: "
+ "previous offset={} bytes, current offset={} bytes",
+ prev_index_entry->offset.bytes(), offset.bytes());
+ goto error;
+ }
+
+ index_entry = ctf_fs_ds_index_entry_create(offset, packetSize);
if (!index_entry) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(ds_file->logger,
"Failed to create a ctf_fs_ds_index_entry.");
/* Set path to stream file. */
index_entry->path = file_info->path->str;
- /* Convert size in bits to bytes. */
- packet_size /= CHAR_BIT;
- index_entry->packet_size = packet_size;
-
- index_entry->offset = be64toh(file_index->offset);
- if (i != 0 && index_entry->offset < prev_index_entry->offset) {
- BT_CPPLOGW_SPEC(
- ds_file->logger,
- "Invalid, non-monotonic, packet offset encountered in LTTng trace index file: "
- "previous offset={}, current offset={}",
- prev_index_entry->offset, index_entry->offset);
- goto error;
- }
-
index_entry->timestamp_begin = be64toh(file_index->timestamp_begin);
index_entry->timestamp_end = be64toh(file_index->timestamp_end);
if (index_entry->timestamp_end < index_entry->timestamp_begin) {
index_entry->packet_seq_num = be64toh(file_index->packet_seq_num);
}
- total_packets_size += packet_size;
+ totalPacketsSize += packetSize;
file_pos += file_index_entry_size;
prev_index_entry = index_entry;
}
/* Validate that the index addresses the complete stream. */
- if (ds_file->file->size != total_packets_size) {
+ if (ds_file->file->size != totalPacketsSize.bytes()) {
BT_CPPLOGW_SPEC(ds_file->logger,
"Invalid LTTng trace index file; indexed size != stream file size: "
- "file-size={}, total-packets-size={}",
- ds_file->file->size, total_packets_size);
+ "file-size={} bytes, total-packets-size={} bytes",
+ ds_file->file->size, totalPacketsSize.bytes());
goto error;
}
end:
}
static int init_index_entry(struct ctf_fs_ds_index_entry *entry, struct ctf_fs_ds_file *ds_file,
- struct ctf_msg_iter_packet_properties *props, off_t packet_size,
- off_t packet_offset)
+ struct ctf_msg_iter_packet_properties *props)
{
int ret = 0;
struct ctf_stream_class *sc;
sc = ctf_trace_class_borrow_stream_class_by_id(ds_file->metadata->tc, props->stream_class_id);
BT_ASSERT(sc);
- BT_ASSERT(packet_offset >= 0);
- entry->offset = packet_offset;
- BT_ASSERT(packet_size >= 0);
- entry->packet_size = packet_size;
if (props->snapshots.beginning_clock != UINT64_C(-1)) {
entry->timestamp_begin = props->snapshots.beginning_clock;
int ret;
struct ctf_fs_ds_index *index = NULL;
enum ctf_msg_iter_status iter_status = CTF_MSG_ITER_STATUS_OK;
- off_t current_packet_offset_bytes = 0;
+ auto currentPacketOffset = bt2c::DataLen::fromBytes(0);
BT_CPPLOGI_SPEC(ds_file->logger, "Indexing stream file {}", ds_file->file->path->str);
}
while (true) {
- off_t current_packet_size_bytes;
- struct ctf_fs_ds_index_entry *index_entry;
struct ctf_msg_iter_packet_properties props;
- if (current_packet_offset_bytes < 0) {
- BT_CPPLOGE_STR_SPEC(ds_file->logger, "Cannot get the current packet's offset.");
- goto error;
- } else if (current_packet_offset_bytes > ds_file->file->size) {
+ if (currentPacketOffset.bytes() > ds_file->file->size) {
BT_CPPLOGE_STR_SPEC(ds_file->logger,
"Unexpected current packet's offset (larger than file).");
goto error;
- } else if (current_packet_offset_bytes == ds_file->file->size) {
+ } else if (currentPacketOffset.bytes() == ds_file->file->size) {
/* No more data */
break;
}
- iter_status = ctf_msg_iter_seek(msg_iter, current_packet_offset_bytes);
+ iter_status = ctf_msg_iter_seek(msg_iter, currentPacketOffset.bytes());
if (iter_status != CTF_MSG_ITER_STATUS_OK) {
goto error;
}
goto error;
}
- if (props.exp_packet_total_size >= 0) {
- current_packet_size_bytes = (uint64_t) props.exp_packet_total_size / 8;
- } else {
- current_packet_size_bytes = ds_file->file->size;
- }
+ /*
+ * Get the current packet size from the packet header, if set. Else,
+ * assume there is a single packet in the file, so take the file size
+ * as the packet size.
+ */
+ const auto currentPacketSize = props.exp_packet_total_size >= 0 ?
+ bt2c::DataLen::fromBits(props.exp_packet_total_size) :
+ bt2c::DataLen::fromBytes(ds_file->file->size);
- if (current_packet_offset_bytes + current_packet_size_bytes > ds_file->file->size) {
+ if ((currentPacketOffset + currentPacketSize).bytes() > ds_file->file->size) {
BT_CPPLOGW_SPEC(ds_file->logger,
"Invalid packet size reported in file: stream=\"{}\", "
- "packet-offset={}, packet-size-bytes={}, "
- "file-size={}",
- ds_file->file->path->str, (intmax_t) current_packet_offset_bytes,
- (intmax_t) current_packet_size_bytes, (intmax_t) ds_file->file->size);
+ "packet-offset-bytes={}, packet-size-bytes={}, "
+ "file-size-bytes={}",
+ ds_file->file->path->str, currentPacketOffset.bytes(),
+ currentPacketSize.bytes(), ds_file->file->size);
goto error;
}
- index_entry = ctf_fs_ds_index_entry_create();
+ const auto index_entry =
+ ctf_fs_ds_index_entry_create(currentPacketOffset, currentPacketSize);
if (!index_entry) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(ds_file->logger,
"Failed to create a ctf_fs_ds_index_entry.");
/* Set path to stream file. */
index_entry->path = file_info->path->str;
- ret = init_index_entry(index_entry, ds_file, &props, current_packet_size_bytes,
- current_packet_offset_bytes);
+ ret = init_index_entry(index_entry, ds_file, &props);
if (ret) {
ctf_fs_ds_index_entry_destroy(index_entry);
goto error;
g_ptr_array_add(index->entries, index_entry);
- current_packet_offset_bytes += current_packet_size_bytes;
+ currentPacketOffset += currentPacketSize;
BT_CPPLOGD_SPEC(ds_file->logger,
- "Seeking to next packet: current-packet-offset={}, "
- "next-packet-offset={}",
- (intmax_t) (current_packet_offset_bytes - current_packet_size_bytes),
- (intmax_t) current_packet_offset_bytes);
+ "Seeking to next packet: current-packet-offset-bytes={}, "
+ "next-packet-offset-bytes={}",
+ (currentPacketOffset - currentPacketSize).bytes(),
+ currentPacketOffset.bytes());
}
end:
delete index;
}
+
+void ctf_fs_ds_file_info_destroy(struct ctf_fs_ds_file_info *ds_file_info)
+{
+ if (!ds_file_info) {
+ return;
+ }
+
+ if (ds_file_info->path) {
+ g_string_free(ds_file_info->path, TRUE);
+ }
+
+ delete ds_file_info;
+}
+
+struct ctf_fs_ds_file_info *ctf_fs_ds_file_info_create(const char *path, int64_t begin_ns)
+{
+ ctf_fs_ds_file_info *ds_file_info = new ctf_fs_ds_file_info;
+ ds_file_info->path = g_string_new(path);
+ if (!ds_file_info->path) {
+ ctf_fs_ds_file_info_destroy(ds_file_info);
+ ds_file_info = NULL;
+ goto end;
+ }
+
+ ds_file_info->begin_ns = begin_ns;
+
+end:
+ return ds_file_info;
+}
+
+void ctf_fs_ds_file_group_destroy(struct ctf_fs_ds_file_group *ds_file_group)
+{
+ if (!ds_file_group) {
+ return;
+ }
+
+ if (ds_file_group->ds_file_infos) {
+ g_ptr_array_free(ds_file_group->ds_file_infos, TRUE);
+ }
+
+ ctf_fs_ds_index_destroy(ds_file_group->index);
+
+ bt_stream_put_ref(ds_file_group->stream);
+ delete ds_file_group;
+}
+
+void ctf_fs_ds_file_group_deleter::operator()(ctf_fs_ds_file_group *group) noexcept
+{
+ ctf_fs_ds_file_group_destroy(group);
+}
+
+ctf_fs_ds_file_group::UP ctf_fs_ds_file_group_create(struct ctf_fs_trace *ctf_fs_trace,
+ struct ctf_stream_class *sc,
+ uint64_t stream_instance_id,
+ struct ctf_fs_ds_index *index)
+{
+ ctf_fs_ds_file_group::UP ds_file_group {new ctf_fs_ds_file_group};
+
+ ds_file_group->ds_file_infos =
+ g_ptr_array_new_with_free_func((GDestroyNotify) ctf_fs_ds_file_info_destroy);
+ if (!ds_file_group->ds_file_infos) {
+ goto error;
+ }
+
+ ds_file_group->index = index;
+
+ ds_file_group->stream_id = stream_instance_id;
+ BT_ASSERT(sc);
+ ds_file_group->sc = sc;
+ ds_file_group->ctf_fs_trace = ctf_fs_trace;
+ goto end;
+
+error:
+ ds_file_group.reset();
+ ctf_fs_ds_index_destroy(index);
+
+end:
+ return ds_file_group;
+}