src.ctf.fs: use std::string in build_index_from_idx_file
[babeltrace.git] / src / plugins / ctf / fs-src / data-stream-file.cpp
... / ...
CommitLineData
1/*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2016-2017 Philippe Proulx <pproulx@efficios.com>
5 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
6 * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation
7 */
8
9#include <glib.h>
10#include <stdint.h>
11#include <stdio.h>
12
13#include "compat/endian.h" /* IWYU pragma: keep */
14#include "compat/mman.h" /* IWYU: pragma keep */
15#include "cpp-common/bt2c/glib-up.hpp"
16#include "cpp-common/bt2s/make-unique.hpp"
17#include "cpp-common/vendor/fmt/format.h"
18
19#include "../common/src/msg-iter/msg-iter.hpp"
20#include "data-stream-file.hpp"
21#include "file.hpp"
22#include "fs.hpp"
23#include "lttng-index.hpp"
24
25static inline size_t remaining_mmap_bytes(struct ctf_fs_ds_file *ds_file)
26{
27 BT_ASSERT_DBG(ds_file->mmap_len >= ds_file->request_offset_in_mapping);
28 return ds_file->mmap_len - ds_file->request_offset_in_mapping;
29}
30
31/*
32 * Return true if `offset_in_file` is in the current mapping.
33 */
34
35static bool offset_ist_mapped(struct ctf_fs_ds_file *ds_file, off_t offset_in_file)
36{
37 return offset_in_file >= ds_file->mmap_offset_in_file &&
38 offset_in_file < (ds_file->mmap_offset_in_file + ds_file->mmap_len);
39}
40
41static enum ctf_msg_iter_medium_status ds_file_munmap(struct ctf_fs_ds_file *ds_file)
42{
43 enum ctf_msg_iter_medium_status status;
44
45 BT_ASSERT(ds_file);
46
47 if (!ds_file->mmap_addr) {
48 status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
49 goto end;
50 }
51
52 if (bt_munmap(ds_file->mmap_addr, ds_file->mmap_len)) {
53 BT_CPPLOGE_ERRNO_SPEC(ds_file->logger, "Cannot memory-unmap file",
54 ": address={}, size={}, file_path=\"{}\", file={}",
55 fmt::ptr(ds_file->mmap_addr), ds_file->mmap_len,
56 ds_file->file ? ds_file->file->path : "NULL",
57 ds_file->file ? fmt::ptr(ds_file->file->fp) : NULL);
58 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
59 goto end;
60 }
61
62 ds_file->mmap_addr = NULL;
63
64 status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
65end:
66 return status;
67}
68
69/*
70 * mmap a region of `ds_file` such that `requested_offset_in_file` is in the
71 * mapping. If the currently mmap-ed region already contains
72 * `requested_offset_in_file`, the mapping is kept.
73 *
74 * Set `ds_file->requested_offset_in_mapping` based on `request_offset_in_file`,
75 * such that the next call to `request_bytes` will return bytes starting at that
76 * position.
77 *
78 * `requested_offset_in_file` must be a valid offset in the file.
79 */
80static enum ctf_msg_iter_medium_status ds_file_mmap(struct ctf_fs_ds_file *ds_file,
81 off_t requested_offset_in_file)
82{
83 enum ctf_msg_iter_medium_status status;
84
85 /* Ensure the requested offset is in the file range. */
86 BT_ASSERT(requested_offset_in_file >= 0);
87 BT_ASSERT(requested_offset_in_file < ds_file->file->size);
88
89 /*
90 * If the mapping already contains the requested offset, just adjust
91 * requested_offset_in_mapping.
92 */
93 if (offset_ist_mapped(ds_file, requested_offset_in_file)) {
94 ds_file->request_offset_in_mapping =
95 requested_offset_in_file - ds_file->mmap_offset_in_file;
96 status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
97 goto end;
98 }
99
100 /* Unmap old region */
101 status = ds_file_munmap(ds_file);
102 if (status != CTF_MSG_ITER_MEDIUM_STATUS_OK) {
103 goto end;
104 }
105
106 /*
107 * Compute a mapping that has the required alignment properties and
108 * contains `requested_offset_in_file`.
109 */
110 ds_file->request_offset_in_mapping =
111 requested_offset_in_file %
112 bt_mmap_get_offset_align_size(static_cast<int>(ds_file->logger.level()));
113 ds_file->mmap_offset_in_file = requested_offset_in_file - ds_file->request_offset_in_mapping;
114 ds_file->mmap_len =
115 MIN(ds_file->file->size - ds_file->mmap_offset_in_file, ds_file->mmap_max_len);
116
117 BT_ASSERT(ds_file->mmap_len > 0);
118
119 ds_file->mmap_addr =
120 bt_mmap(ds_file->mmap_len, PROT_READ, MAP_PRIVATE, fileno(ds_file->file->fp.get()),
121 ds_file->mmap_offset_in_file, static_cast<int>(ds_file->logger.level()));
122 if (ds_file->mmap_addr == MAP_FAILED) {
123 BT_CPPLOGE_SPEC(ds_file->logger,
124 "Cannot memory-map address (size {}) of file \"{}\" ({}) at offset {}: {}",
125 ds_file->mmap_len, ds_file->file->path, fmt::ptr(ds_file->file->fp),
126 (intmax_t) ds_file->mmap_offset_in_file, strerror(errno));
127 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
128 goto end;
129 }
130
131 status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
132
133end:
134 return status;
135}
136
137/*
138 * Change the mapping of the file to read the region that follows the current
139 * mapping.
140 *
141 * If the file hasn't been mapped yet, then everything (mmap_offset_in_file,
142 * mmap_len, request_offset_in_mapping) should have the value 0, which will
143 * result in the beginning of the file getting mapped.
144 *
145 * return _EOF if the current mapping is the end of the file.
146 */
147
148static enum ctf_msg_iter_medium_status ds_file_mmap_next(struct ctf_fs_ds_file *ds_file)
149{
150 enum ctf_msg_iter_medium_status status;
151
152 /*
153 * If we're called, it's because more bytes are requested but we have
154 * given all the bytes of the current mapping.
155 */
156 BT_ASSERT(ds_file->request_offset_in_mapping == ds_file->mmap_len);
157
158 /*
159 * If the current mapping coincides with the end of the file, there is
160 * no next mapping.
161 */
162 if (ds_file->mmap_offset_in_file + ds_file->mmap_len == ds_file->file->size) {
163 status = CTF_MSG_ITER_MEDIUM_STATUS_EOF;
164 goto end;
165 }
166
167 status = ds_file_mmap(ds_file, ds_file->mmap_offset_in_file + ds_file->mmap_len);
168
169end:
170 return status;
171}
172
173static enum ctf_msg_iter_medium_status medop_request_bytes(size_t request_sz, uint8_t **buffer_addr,
174 size_t *buffer_sz, void *data)
175{
176 enum ctf_msg_iter_medium_status status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
177 struct ctf_fs_ds_file *ds_file = (struct ctf_fs_ds_file *) data;
178
179 BT_ASSERT(request_sz > 0);
180
181 /*
182 * Check if we have at least one memory-mapped byte left. If we don't,
183 * mmap the next file.
184 */
185 if (remaining_mmap_bytes(ds_file) == 0) {
186 /* Are we at the end of the file? */
187 if (ds_file->mmap_offset_in_file >= ds_file->file->size) {
188 BT_CPPLOGD_SPEC(ds_file->logger, "Reached end of file \"{}\" ({})", ds_file->file->path,
189 fmt::ptr(ds_file->file->fp));
190 status = CTF_MSG_ITER_MEDIUM_STATUS_EOF;
191 goto end;
192 }
193
194 status = ds_file_mmap_next(ds_file);
195 switch (status) {
196 case CTF_MSG_ITER_MEDIUM_STATUS_OK:
197 break;
198 case CTF_MSG_ITER_MEDIUM_STATUS_EOF:
199 goto end;
200 default:
201 BT_CPPLOGE_SPEC(ds_file->logger, "Cannot memory-map next region of file \"{}\" ({})",
202 ds_file->file->path, fmt::ptr(ds_file->file->fp));
203 goto error;
204 }
205 }
206
207 BT_ASSERT(remaining_mmap_bytes(ds_file) > 0);
208 *buffer_sz = MIN(remaining_mmap_bytes(ds_file), request_sz);
209
210 BT_ASSERT(ds_file->mmap_addr);
211 *buffer_addr = ((uint8_t *) ds_file->mmap_addr) + ds_file->request_offset_in_mapping;
212
213 ds_file->request_offset_in_mapping += *buffer_sz;
214 goto end;
215
216error:
217 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
218
219end:
220 return status;
221}
222
223static bt_stream *medop_borrow_stream(bt_stream_class *stream_class, int64_t, void *data)
224{
225 struct ctf_fs_ds_file *ds_file = (struct ctf_fs_ds_file *) data;
226 bt_stream_class *ds_file_stream_class;
227 bt_stream *stream = NULL;
228
229 ds_file_stream_class = ds_file->stream->cls().libObjPtr();
230
231 if (stream_class != ds_file_stream_class) {
232 /*
233 * Not supported: two packets described by two different
234 * stream classes within the same data stream file.
235 */
236 goto end;
237 }
238
239 stream = ds_file->stream->libObjPtr();
240
241end:
242 return stream;
243}
244
245static enum ctf_msg_iter_medium_status medop_seek(off_t offset, void *data)
246{
247 struct ctf_fs_ds_file *ds_file = (struct ctf_fs_ds_file *) data;
248
249 BT_ASSERT(offset >= 0);
250 BT_ASSERT(offset < ds_file->file->size);
251
252 return ds_file_mmap(ds_file, offset);
253}
254
255struct ctf_msg_iter_medium_ops ctf_fs_ds_file_medops = {
256 medop_request_bytes,
257 medop_seek,
258 nullptr,
259 medop_borrow_stream,
260};
261
262struct ctf_fs_ds_group_medops_data
263{
264 explicit ctf_fs_ds_group_medops_data(const bt2c::Logger& parentLogger) :
265 logger {parentLogger, "PLUGIN/SRC.CTF.FS/DS-GROUP-MEDOPS"}
266 {
267 }
268
269 bt2c::Logger logger;
270
271 /* Weak, set once at creation time. */
272 struct ctf_fs_ds_file_group *ds_file_group = nullptr;
273
274 /*
275 * Index (as in element rank) of the index entry of ds_file_groups'
276 * index we will read next (so, the one after the one we are reading
277 * right now).
278 */
279 guint next_index_entry_index = 0;
280
281 /*
282 * File we are currently reading. Changes whenever we switch to
283 * reading another data file.
284 */
285 ctf_fs_ds_file::UP file;
286
287 /* Weak, for context / logging / appending causes. */
288 bt_self_message_iterator *self_msg_iter = nullptr;
289};
290
291static enum ctf_msg_iter_medium_status medop_group_request_bytes(size_t request_sz,
292 uint8_t **buffer_addr,
293 size_t *buffer_sz, void *void_data)
294{
295 struct ctf_fs_ds_group_medops_data *data = (struct ctf_fs_ds_group_medops_data *) void_data;
296
297 /* Return bytes from the current file. */
298 return medop_request_bytes(request_sz, buffer_addr, buffer_sz, data->file.get());
299}
300
301static bt_stream *medop_group_borrow_stream(bt_stream_class *stream_class, int64_t stream_id,
302 void *void_data)
303{
304 struct ctf_fs_ds_group_medops_data *data = (struct ctf_fs_ds_group_medops_data *) void_data;
305
306 return medop_borrow_stream(stream_class, stream_id, data->file.get());
307}
308
309/*
310 * Set `data->file` to prepare it to read the packet described
311 * by `index_entry`.
312 */
313
314static enum ctf_msg_iter_medium_status
315ctf_fs_ds_group_medops_set_file(struct ctf_fs_ds_group_medops_data *data,
316 struct ctf_fs_ds_index_entry *index_entry)
317{
318 enum ctf_msg_iter_medium_status status;
319
320 BT_ASSERT(data);
321 BT_ASSERT(index_entry);
322
323 /* Check if that file is already the one mapped. */
324 if (!data->file || data->file->file->path != index_entry->path) {
325 /* Create the new file. */
326 data->file =
327 ctf_fs_ds_file_create(data->ds_file_group->ctf_fs_trace, data->ds_file_group->stream,
328 index_entry->path, data->logger);
329 if (!data->file) {
330 BT_CPPLOGE_APPEND_CAUSE_SPEC(data->logger, "failed to create ctf_fs_ds_file.");
331 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
332 goto end;
333 }
334 }
335
336 /*
337 * Ensure the right portion of the file will be returned on the next
338 * request_bytes call.
339 */
340 status = ds_file_mmap(data->file.get(), index_entry->offset.bytes());
341 if (status != CTF_MSG_ITER_MEDIUM_STATUS_OK) {
342 goto end;
343 }
344
345 status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
346
347end:
348 return status;
349}
350
351static enum ctf_msg_iter_medium_status medop_group_switch_packet(void *void_data)
352{
353 struct ctf_fs_ds_group_medops_data *data = (struct ctf_fs_ds_group_medops_data *) void_data;
354 struct ctf_fs_ds_index_entry *index_entry;
355 enum ctf_msg_iter_medium_status status;
356
357 /* If we have gone through all index entries, we are done. */
358 if (data->next_index_entry_index >= data->ds_file_group->index->entries.size()) {
359 status = CTF_MSG_ITER_MEDIUM_STATUS_EOF;
360 goto end;
361 }
362
363 /*
364 * Otherwise, look up the next index entry / packet and prepare it
365 * for reading.
366 */
367 index_entry = data->ds_file_group->index->entries[data->next_index_entry_index].get();
368
369 status = ctf_fs_ds_group_medops_set_file(data, index_entry);
370 if (status != CTF_MSG_ITER_MEDIUM_STATUS_OK) {
371 goto end;
372 }
373
374 data->next_index_entry_index++;
375
376 status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
377end:
378 return status;
379}
380
381void ctf_fs_ds_group_medops_data_deleter::operator()(ctf_fs_ds_group_medops_data *data) noexcept
382{
383 delete data;
384}
385
386enum ctf_msg_iter_medium_status ctf_fs_ds_group_medops_data_create(
387 struct ctf_fs_ds_file_group *ds_file_group, bt_self_message_iterator *self_msg_iter,
388 const bt2c::Logger& parentLogger, ctf_fs_ds_group_medops_data_up& out)
389{
390 BT_ASSERT(self_msg_iter);
391 BT_ASSERT(ds_file_group);
392 BT_ASSERT(ds_file_group->index);
393 BT_ASSERT(!ds_file_group->index->entries.empty());
394
395 out.reset(new ctf_fs_ds_group_medops_data {parentLogger});
396
397 out->ds_file_group = ds_file_group;
398 out->self_msg_iter = self_msg_iter;
399
400 /*
401 * No need to prepare the first file. ctf_msg_iter will call
402 * switch_packet before reading the first packet, it will be
403 * done then.
404 */
405
406 return CTF_MSG_ITER_MEDIUM_STATUS_OK;
407}
408
409void ctf_fs_ds_group_medops_data_reset(struct ctf_fs_ds_group_medops_data *data)
410{
411 data->next_index_entry_index = 0;
412}
413
414struct ctf_msg_iter_medium_ops ctf_fs_ds_group_medops = {
415 .request_bytes = medop_group_request_bytes,
416
417 /*
418 * We don't support seeking using this medops. It would probably be
419 * possible, but it's not needed at the moment.
420 */
421 .seek = NULL,
422
423 .switch_packet = medop_group_switch_packet,
424 .borrow_stream = medop_group_borrow_stream,
425};
426
427static ctf_fs_ds_index_entry::UP ctf_fs_ds_index_entry_create(const bt2c::DataLen offset,
428 const bt2c::DataLen packetSize)
429{
430 ctf_fs_ds_index_entry::UP entry = bt2s::make_unique<ctf_fs_ds_index_entry>(offset, packetSize);
431
432 entry->packet_seq_num = UINT64_MAX;
433
434 return entry;
435}
436
437static int convert_cycles_to_ns(struct ctf_clock_class *clock_class, uint64_t cycles, int64_t *ns)
438{
439 return bt_util_clock_cycles_to_ns_from_origin(cycles, clock_class->frequency,
440 clock_class->offset_seconds,
441 clock_class->offset_cycles, ns);
442}
443
444static ctf_fs_ds_index::UP build_index_from_idx_file(struct ctf_fs_ds_file *ds_file,
445 struct ctf_fs_ds_file_info *file_info,
446 struct ctf_msg_iter *msg_iter)
447{
448 int ret;
449 bt2c::GCharUP directory;
450 bt2c::GCharUP basename;
451 std::string index_basename;
452 bt2c::GCharUP index_file_path;
453 GMappedFile *mapped_file = NULL;
454 gsize filesize;
455 const char *mmap_begin = NULL, *file_pos = NULL;
456 const struct ctf_packet_index_file_hdr *header = NULL;
457 ctf_fs_ds_index::UP index;
458 ctf_fs_ds_index_entry::UP index_entry;
459 ctf_fs_ds_index_entry *prev_index_entry = NULL;
460 auto totalPacketsSize = bt2c::DataLen::fromBytes(0);
461 size_t file_index_entry_size;
462 size_t file_entry_count;
463 size_t i;
464 struct ctf_stream_class *sc;
465 struct ctf_msg_iter_packet_properties props;
466 uint32_t version_major, version_minor;
467
468 BT_CPPLOGI_SPEC(ds_file->logger, "Building index from .idx file of stream file {}",
469 ds_file->file->path);
470 ret = ctf_msg_iter_get_packet_properties(msg_iter, &props);
471 if (ret) {
472 BT_CPPLOGI_STR_SPEC(ds_file->logger,
473 "Cannot read first packet's header and context fields.");
474 goto error;
475 }
476
477 sc = ctf_trace_class_borrow_stream_class_by_id(ds_file->metadata->tc, props.stream_class_id);
478 BT_ASSERT(sc);
479 if (!sc->default_clock_class) {
480 BT_CPPLOGI_STR_SPEC(ds_file->logger, "Cannot find stream class's default clock class.");
481 goto error;
482 }
483
484 /* Look for index file in relative path index/name.idx. */
485 basename.reset(g_path_get_basename(ds_file->file->path.c_str()));
486 if (!basename) {
487 BT_CPPLOGE_SPEC(ds_file->logger, "Cannot get the basename of datastream file {}",
488 ds_file->file->path);
489 goto error;
490 }
491
492 directory.reset(g_path_get_dirname(ds_file->file->path.c_str()));
493 if (!directory) {
494 BT_CPPLOGE_SPEC(ds_file->logger, "Cannot get dirname of datastream file {}",
495 ds_file->file->path);
496 goto error;
497 }
498
499 index_basename = fmt::format("{}.idx", basename.get());
500 index_file_path.reset(g_build_filename(directory.get(), "index", index_basename.c_str(), NULL));
501 mapped_file = g_mapped_file_new(index_file_path.get(), FALSE, NULL);
502 if (!mapped_file) {
503 BT_CPPLOGD_SPEC(ds_file->logger, "Cannot create new mapped file {}", index_file_path.get());
504 goto error;
505 }
506
507 /*
508 * The g_mapped_file API limits us to 4GB files on 32-bit.
509 * Traces with such large indexes have never been seen in the wild,
510 * but this would need to be adjusted to support them.
511 */
512 filesize = g_mapped_file_get_length(mapped_file);
513 if (filesize < sizeof(*header)) {
514 BT_CPPLOGW_SPEC(ds_file->logger,
515 "Invalid LTTng trace index file: "
516 "file size ({} bytes) < header size ({} bytes)",
517 filesize, sizeof(*header));
518 goto error;
519 }
520
521 mmap_begin = g_mapped_file_get_contents(mapped_file);
522 header = (struct ctf_packet_index_file_hdr *) mmap_begin;
523
524 file_pos = g_mapped_file_get_contents(mapped_file) + sizeof(*header);
525 if (be32toh(header->magic) != CTF_INDEX_MAGIC) {
526 BT_CPPLOGW_STR_SPEC(ds_file->logger,
527 "Invalid LTTng trace index: \"magic\" field validation failed");
528 goto error;
529 }
530
531 version_major = be32toh(header->index_major);
532 version_minor = be32toh(header->index_minor);
533 if (version_major != 1) {
534 BT_CPPLOGW_SPEC(ds_file->logger, "Unknown LTTng trace index version: major={}, minor={}",
535 version_major, version_minor);
536 goto error;
537 }
538
539 file_index_entry_size = be32toh(header->packet_index_len);
540 if (file_index_entry_size < CTF_INDEX_1_0_SIZE) {
541 BT_CPPLOGW_SPEC(
542 ds_file->logger,
543 "Invalid `packet_index_len` in LTTng trace index file (`packet_index_len` < CTF index 1.0 index entry size): "
544 "packet_index_len={}, CTF_INDEX_1_0_SIZE={}",
545 file_index_entry_size, CTF_INDEX_1_0_SIZE);
546 goto error;
547 }
548
549 file_entry_count = (filesize - sizeof(*header)) / file_index_entry_size;
550 if ((filesize - sizeof(*header)) % file_index_entry_size) {
551 BT_CPPLOGW_SPEC(ds_file->logger,
552 "Invalid LTTng trace index: the index's size after the header "
553 "({} bytes) is not a multiple of the index entry size "
554 "({} bytes)",
555 (filesize - sizeof(*header)), sizeof(*header));
556 goto error;
557 }
558
559 index = bt2s::make_unique<ctf_fs_ds_index>();
560
561 for (i = 0; i < file_entry_count; i++) {
562 struct ctf_packet_index *file_index = (struct ctf_packet_index *) file_pos;
563 const auto packetSize = bt2c::DataLen::fromBits(be64toh(file_index->packet_size));
564
565 if (packetSize.hasExtraBits()) {
566 BT_CPPLOGW_SPEC(ds_file->logger,
567 "Invalid packet size encountered in LTTng trace index file");
568 goto error;
569 }
570
571 const auto offset = bt2c::DataLen::fromBytes(be64toh(file_index->offset));
572
573 if (i != 0 && offset < prev_index_entry->offset) {
574 BT_CPPLOGW_SPEC(
575 ds_file->logger,
576 "Invalid, non-monotonic, packet offset encountered in LTTng trace index file: "
577 "previous offset={} bytes, current offset={} bytes",
578 prev_index_entry->offset.bytes(), offset.bytes());
579 goto error;
580 }
581
582 index_entry = ctf_fs_ds_index_entry_create(offset, packetSize);
583 if (!index_entry) {
584 BT_CPPLOGE_APPEND_CAUSE_SPEC(ds_file->logger,
585 "Failed to create a ctf_fs_ds_index_entry.");
586 goto error;
587 }
588
589 /* Set path to stream file. */
590 index_entry->path = file_info->path.c_str();
591
592 index_entry->timestamp_begin = be64toh(file_index->timestamp_begin);
593 index_entry->timestamp_end = be64toh(file_index->timestamp_end);
594 if (index_entry->timestamp_end < index_entry->timestamp_begin) {
595 BT_CPPLOGW_SPEC(
596 ds_file->logger,
597 "Invalid packet time bounds encountered in LTTng trace index file (begin > end): "
598 "timestamp_begin={}, timestamp_end={}",
599 index_entry->timestamp_begin, index_entry->timestamp_end);
600 goto error;
601 }
602
603 /* Convert the packet's bound to nanoseconds since Epoch. */
604 ret = convert_cycles_to_ns(sc->default_clock_class, index_entry->timestamp_begin,
605 &index_entry->timestamp_begin_ns);
606 if (ret) {
607 BT_CPPLOGI_STR_SPEC(
608 ds_file->logger,
609 "Failed to convert raw timestamp to nanoseconds since Epoch during index parsing");
610 goto error;
611 }
612 ret = convert_cycles_to_ns(sc->default_clock_class, index_entry->timestamp_end,
613 &index_entry->timestamp_end_ns);
614 if (ret) {
615 BT_CPPLOGI_STR_SPEC(
616 ds_file->logger,
617 "Failed to convert raw timestamp to nanoseconds since Epoch during LTTng trace index parsing");
618 goto error;
619 }
620
621 if (version_minor >= 1) {
622 index_entry->packet_seq_num = be64toh(file_index->packet_seq_num);
623 }
624
625 totalPacketsSize += packetSize;
626 file_pos += file_index_entry_size;
627
628 prev_index_entry = index_entry.get();
629
630 index->entries.emplace_back(std::move(index_entry));
631 }
632
633 /* Validate that the index addresses the complete stream. */
634 if (ds_file->file->size != totalPacketsSize.bytes()) {
635 BT_CPPLOGW_SPEC(ds_file->logger,
636 "Invalid LTTng trace index file; indexed size != stream file size: "
637 "file-size={} bytes, total-packets-size={} bytes",
638 ds_file->file->size, totalPacketsSize.bytes());
639 goto error;
640 }
641end:
642 if (mapped_file) {
643 g_mapped_file_unref(mapped_file);
644 }
645 return index;
646error:
647 index.reset();
648 goto end;
649}
650
651static int init_index_entry(struct ctf_fs_ds_index_entry *entry, struct ctf_fs_ds_file *ds_file,
652 struct ctf_msg_iter_packet_properties *props)
653{
654 int ret = 0;
655 struct ctf_stream_class *sc;
656
657 sc = ctf_trace_class_borrow_stream_class_by_id(ds_file->metadata->tc, props->stream_class_id);
658 BT_ASSERT(sc);
659
660 if (props->snapshots.beginning_clock != UINT64_C(-1)) {
661 entry->timestamp_begin = props->snapshots.beginning_clock;
662
663 /* Convert the packet's bound to nanoseconds since Epoch. */
664 ret = convert_cycles_to_ns(sc->default_clock_class, props->snapshots.beginning_clock,
665 &entry->timestamp_begin_ns);
666 if (ret) {
667 BT_CPPLOGI_STR_SPEC(ds_file->logger,
668 "Failed to convert raw timestamp to nanoseconds since Epoch.");
669 goto end;
670 }
671 } else {
672 entry->timestamp_begin = UINT64_C(-1);
673 entry->timestamp_begin_ns = UINT64_C(-1);
674 }
675
676 if (props->snapshots.end_clock != UINT64_C(-1)) {
677 entry->timestamp_end = props->snapshots.end_clock;
678
679 /* Convert the packet's bound to nanoseconds since Epoch. */
680 ret = convert_cycles_to_ns(sc->default_clock_class, props->snapshots.end_clock,
681 &entry->timestamp_end_ns);
682 if (ret) {
683 BT_CPPLOGI_STR_SPEC(ds_file->logger,
684 "Failed to convert raw timestamp to nanoseconds since Epoch.");
685 goto end;
686 }
687 } else {
688 entry->timestamp_end = UINT64_C(-1);
689 entry->timestamp_end_ns = UINT64_C(-1);
690 }
691
692end:
693 return ret;
694}
695
696static ctf_fs_ds_index::UP build_index_from_stream_file(struct ctf_fs_ds_file *ds_file,
697 struct ctf_fs_ds_file_info *file_info,
698 struct ctf_msg_iter *msg_iter)
699{
700 int ret;
701 enum ctf_msg_iter_status iter_status = CTF_MSG_ITER_STATUS_OK;
702 auto currentPacketOffset = bt2c::DataLen::fromBytes(0);
703
704 BT_CPPLOGI_SPEC(ds_file->logger, "Indexing stream file {}", ds_file->file->path);
705
706 ctf_fs_ds_index::UP index = bt2s::make_unique<ctf_fs_ds_index>();
707
708 while (true) {
709 struct ctf_msg_iter_packet_properties props;
710
711 if (currentPacketOffset.bytes() > ds_file->file->size) {
712 BT_CPPLOGE_STR_SPEC(ds_file->logger,
713 "Unexpected current packet's offset (larger than file).");
714 goto error;
715 } else if (currentPacketOffset.bytes() == ds_file->file->size) {
716 /* No more data */
717 break;
718 }
719
720 iter_status = ctf_msg_iter_seek(msg_iter, currentPacketOffset.bytes());
721 if (iter_status != CTF_MSG_ITER_STATUS_OK) {
722 goto error;
723 }
724
725 iter_status = ctf_msg_iter_get_packet_properties(msg_iter, &props);
726 if (iter_status != CTF_MSG_ITER_STATUS_OK) {
727 goto error;
728 }
729
730 /*
731 * Get the current packet size from the packet header, if set. Else,
732 * assume there is a single packet in the file, so take the file size
733 * as the packet size.
734 */
735 const auto currentPacketSize = props.exp_packet_total_size >= 0 ?
736 bt2c::DataLen::fromBits(props.exp_packet_total_size) :
737 bt2c::DataLen::fromBytes(ds_file->file->size);
738
739 if ((currentPacketOffset + currentPacketSize).bytes() > ds_file->file->size) {
740 BT_CPPLOGW_SPEC(ds_file->logger,
741 "Invalid packet size reported in file: stream=\"{}\", "
742 "packet-offset-bytes={}, packet-size-bytes={}, "
743 "file-size-bytes={}",
744 ds_file->file->path, currentPacketOffset.bytes(),
745 currentPacketSize.bytes(), ds_file->file->size);
746 goto error;
747 }
748
749 auto index_entry = ctf_fs_ds_index_entry_create(currentPacketOffset, currentPacketSize);
750 if (!index_entry) {
751 BT_CPPLOGE_APPEND_CAUSE_SPEC(ds_file->logger,
752 "Failed to create a ctf_fs_ds_index_entry.");
753 goto error;
754 }
755
756 /* Set path to stream file. */
757 index_entry->path = file_info->path.c_str();
758
759 ret = init_index_entry(index_entry.get(), ds_file, &props);
760 if (ret) {
761 goto error;
762 }
763
764 index->entries.emplace_back(std::move(index_entry));
765
766 currentPacketOffset += currentPacketSize;
767 BT_CPPLOGD_SPEC(ds_file->logger,
768 "Seeking to next packet: current-packet-offset-bytes={}, "
769 "next-packet-offset-bytes={}",
770 (currentPacketOffset - currentPacketSize).bytes(),
771 currentPacketOffset.bytes());
772 }
773
774end:
775 return index;
776
777error:
778 index.reset();
779 goto end;
780}
781
782ctf_fs_ds_file::UP ctf_fs_ds_file_create(struct ctf_fs_trace *ctf_fs_trace,
783 bt2::Stream::Shared stream, const char *path,
784 const bt2c::Logger& parentLogger)
785{
786 int ret;
787 auto ds_file = bt2s::make_unique<ctf_fs_ds_file>(parentLogger);
788 size_t offset_align;
789
790 ds_file->file = bt2s::make_unique<ctf_fs_file>(parentLogger);
791 ds_file->stream = std::move(stream);
792 ds_file->metadata = ctf_fs_trace->metadata.get();
793 ds_file->file->path = path;
794 ret = ctf_fs_file_open(ds_file->file.get(), "rb");
795 if (ret) {
796 goto error;
797 }
798
799 offset_align = bt_mmap_get_offset_align_size(static_cast<int>(ds_file->logger.level()));
800 ds_file->mmap_max_len = offset_align * 2048;
801
802 goto end;
803
804error:
805 /* Do not touch "borrowed" file. */
806 ds_file.reset();
807
808end:
809 return ds_file;
810}
811
812ctf_fs_ds_index::UP ctf_fs_ds_file_build_index(struct ctf_fs_ds_file *ds_file,
813 struct ctf_fs_ds_file_info *file_info,
814 struct ctf_msg_iter *msg_iter)
815{
816 auto index = build_index_from_idx_file(ds_file, file_info, msg_iter);
817 if (index) {
818 goto end;
819 }
820
821 BT_CPPLOGI_SPEC(ds_file->logger, "Failed to build index from .index file; "
822 "falling back to stream indexing.");
823 index = build_index_from_stream_file(ds_file, file_info, msg_iter);
824end:
825 return index;
826}
827
828ctf_fs_ds_file::~ctf_fs_ds_file()
829{
830 (void) ds_file_munmap(this);
831}
832
833ctf_fs_ds_file_info::UP ctf_fs_ds_file_info_create(const char *path, int64_t begin_ns)
834{
835 ctf_fs_ds_file_info::UP ds_file_info = bt2s::make_unique<ctf_fs_ds_file_info>();
836
837 ds_file_info->path = path;
838 ds_file_info->begin_ns = begin_ns;
839 return ds_file_info;
840}
841
842ctf_fs_ds_file_group::UP ctf_fs_ds_file_group_create(struct ctf_fs_trace *ctf_fs_trace,
843 struct ctf_stream_class *sc,
844 uint64_t stream_instance_id,
845 ctf_fs_ds_index::UP index)
846{
847 ctf_fs_ds_file_group::UP ds_file_group {new ctf_fs_ds_file_group};
848
849 ds_file_group->index = std::move(index);
850
851 ds_file_group->stream_id = stream_instance_id;
852 BT_ASSERT(sc);
853 ds_file_group->sc = sc;
854 ds_file_group->ctf_fs_trace = ctf_fs_trace;
855
856 return ds_file_group;
857}
This page took 0.025254 seconds and 4 git commands to generate.