src.ctf.fs: use GCharUP in build_index_from_idx_file
[babeltrace.git] / src / plugins / ctf / fs-src / data-stream-file.cpp
1 /*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2016-2017 Philippe Proulx <pproulx@efficios.com>
5 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
6 * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation
7 */
8
9 #include <glib.h>
10 #include <stdint.h>
11 #include <stdio.h>
12
13 #include "compat/endian.h" /* IWYU pragma: keep */
14 #include "compat/mman.h" /* IWYU: pragma keep */
15 #include "cpp-common/bt2c/glib-up.hpp"
16 #include "cpp-common/bt2s/make-unique.hpp"
17 #include "cpp-common/vendor/fmt/format.h"
18
19 #include "../common/src/msg-iter/msg-iter.hpp"
20 #include "data-stream-file.hpp"
21 #include "file.hpp"
22 #include "fs.hpp"
23 #include "lttng-index.hpp"
24
25 static inline size_t remaining_mmap_bytes(struct ctf_fs_ds_file *ds_file)
26 {
27 BT_ASSERT_DBG(ds_file->mmap_len >= ds_file->request_offset_in_mapping);
28 return ds_file->mmap_len - ds_file->request_offset_in_mapping;
29 }
30
31 /*
32 * Return true if `offset_in_file` is in the current mapping.
33 */
34
35 static bool offset_ist_mapped(struct ctf_fs_ds_file *ds_file, off_t offset_in_file)
36 {
37 return offset_in_file >= ds_file->mmap_offset_in_file &&
38 offset_in_file < (ds_file->mmap_offset_in_file + ds_file->mmap_len);
39 }
40
41 static enum ctf_msg_iter_medium_status ds_file_munmap(struct ctf_fs_ds_file *ds_file)
42 {
43 enum ctf_msg_iter_medium_status status;
44
45 BT_ASSERT(ds_file);
46
47 if (!ds_file->mmap_addr) {
48 status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
49 goto end;
50 }
51
52 if (bt_munmap(ds_file->mmap_addr, ds_file->mmap_len)) {
53 BT_CPPLOGE_ERRNO_SPEC(ds_file->logger, "Cannot memory-unmap file",
54 ": address={}, size={}, file_path=\"{}\", file={}",
55 fmt::ptr(ds_file->mmap_addr), ds_file->mmap_len,
56 ds_file->file ? ds_file->file->path : "NULL",
57 ds_file->file ? fmt::ptr(ds_file->file->fp) : NULL);
58 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
59 goto end;
60 }
61
62 ds_file->mmap_addr = NULL;
63
64 status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
65 end:
66 return status;
67 }
68
69 /*
70 * mmap a region of `ds_file` such that `requested_offset_in_file` is in the
71 * mapping. If the currently mmap-ed region already contains
72 * `requested_offset_in_file`, the mapping is kept.
73 *
74 * Set `ds_file->requested_offset_in_mapping` based on `request_offset_in_file`,
75 * such that the next call to `request_bytes` will return bytes starting at that
76 * position.
77 *
78 * `requested_offset_in_file` must be a valid offset in the file.
79 */
80 static enum ctf_msg_iter_medium_status ds_file_mmap(struct ctf_fs_ds_file *ds_file,
81 off_t requested_offset_in_file)
82 {
83 enum ctf_msg_iter_medium_status status;
84
85 /* Ensure the requested offset is in the file range. */
86 BT_ASSERT(requested_offset_in_file >= 0);
87 BT_ASSERT(requested_offset_in_file < ds_file->file->size);
88
89 /*
90 * If the mapping already contains the requested offset, just adjust
91 * requested_offset_in_mapping.
92 */
93 if (offset_ist_mapped(ds_file, requested_offset_in_file)) {
94 ds_file->request_offset_in_mapping =
95 requested_offset_in_file - ds_file->mmap_offset_in_file;
96 status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
97 goto end;
98 }
99
100 /* Unmap old region */
101 status = ds_file_munmap(ds_file);
102 if (status != CTF_MSG_ITER_MEDIUM_STATUS_OK) {
103 goto end;
104 }
105
106 /*
107 * Compute a mapping that has the required alignment properties and
108 * contains `requested_offset_in_file`.
109 */
110 ds_file->request_offset_in_mapping =
111 requested_offset_in_file %
112 bt_mmap_get_offset_align_size(static_cast<int>(ds_file->logger.level()));
113 ds_file->mmap_offset_in_file = requested_offset_in_file - ds_file->request_offset_in_mapping;
114 ds_file->mmap_len =
115 MIN(ds_file->file->size - ds_file->mmap_offset_in_file, ds_file->mmap_max_len);
116
117 BT_ASSERT(ds_file->mmap_len > 0);
118
119 ds_file->mmap_addr =
120 bt_mmap(ds_file->mmap_len, PROT_READ, MAP_PRIVATE, fileno(ds_file->file->fp.get()),
121 ds_file->mmap_offset_in_file, static_cast<int>(ds_file->logger.level()));
122 if (ds_file->mmap_addr == MAP_FAILED) {
123 BT_CPPLOGE_SPEC(ds_file->logger,
124 "Cannot memory-map address (size {}) of file \"{}\" ({}) at offset {}: {}",
125 ds_file->mmap_len, ds_file->file->path, fmt::ptr(ds_file->file->fp),
126 (intmax_t) ds_file->mmap_offset_in_file, strerror(errno));
127 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
128 goto end;
129 }
130
131 status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
132
133 end:
134 return status;
135 }
136
137 /*
138 * Change the mapping of the file to read the region that follows the current
139 * mapping.
140 *
141 * If the file hasn't been mapped yet, then everything (mmap_offset_in_file,
142 * mmap_len, request_offset_in_mapping) should have the value 0, which will
143 * result in the beginning of the file getting mapped.
144 *
145 * return _EOF if the current mapping is the end of the file.
146 */
147
148 static enum ctf_msg_iter_medium_status ds_file_mmap_next(struct ctf_fs_ds_file *ds_file)
149 {
150 enum ctf_msg_iter_medium_status status;
151
152 /*
153 * If we're called, it's because more bytes are requested but we have
154 * given all the bytes of the current mapping.
155 */
156 BT_ASSERT(ds_file->request_offset_in_mapping == ds_file->mmap_len);
157
158 /*
159 * If the current mapping coincides with the end of the file, there is
160 * no next mapping.
161 */
162 if (ds_file->mmap_offset_in_file + ds_file->mmap_len == ds_file->file->size) {
163 status = CTF_MSG_ITER_MEDIUM_STATUS_EOF;
164 goto end;
165 }
166
167 status = ds_file_mmap(ds_file, ds_file->mmap_offset_in_file + ds_file->mmap_len);
168
169 end:
170 return status;
171 }
172
173 static enum ctf_msg_iter_medium_status medop_request_bytes(size_t request_sz, uint8_t **buffer_addr,
174 size_t *buffer_sz, void *data)
175 {
176 enum ctf_msg_iter_medium_status status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
177 struct ctf_fs_ds_file *ds_file = (struct ctf_fs_ds_file *) data;
178
179 BT_ASSERT(request_sz > 0);
180
181 /*
182 * Check if we have at least one memory-mapped byte left. If we don't,
183 * mmap the next file.
184 */
185 if (remaining_mmap_bytes(ds_file) == 0) {
186 /* Are we at the end of the file? */
187 if (ds_file->mmap_offset_in_file >= ds_file->file->size) {
188 BT_CPPLOGD_SPEC(ds_file->logger, "Reached end of file \"{}\" ({})", ds_file->file->path,
189 fmt::ptr(ds_file->file->fp));
190 status = CTF_MSG_ITER_MEDIUM_STATUS_EOF;
191 goto end;
192 }
193
194 status = ds_file_mmap_next(ds_file);
195 switch (status) {
196 case CTF_MSG_ITER_MEDIUM_STATUS_OK:
197 break;
198 case CTF_MSG_ITER_MEDIUM_STATUS_EOF:
199 goto end;
200 default:
201 BT_CPPLOGE_SPEC(ds_file->logger, "Cannot memory-map next region of file \"{}\" ({})",
202 ds_file->file->path, fmt::ptr(ds_file->file->fp));
203 goto error;
204 }
205 }
206
207 BT_ASSERT(remaining_mmap_bytes(ds_file) > 0);
208 *buffer_sz = MIN(remaining_mmap_bytes(ds_file), request_sz);
209
210 BT_ASSERT(ds_file->mmap_addr);
211 *buffer_addr = ((uint8_t *) ds_file->mmap_addr) + ds_file->request_offset_in_mapping;
212
213 ds_file->request_offset_in_mapping += *buffer_sz;
214 goto end;
215
216 error:
217 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
218
219 end:
220 return status;
221 }
222
223 static bt_stream *medop_borrow_stream(bt_stream_class *stream_class, int64_t, void *data)
224 {
225 struct ctf_fs_ds_file *ds_file = (struct ctf_fs_ds_file *) data;
226 bt_stream_class *ds_file_stream_class;
227 bt_stream *stream = NULL;
228
229 ds_file_stream_class = ds_file->stream->cls().libObjPtr();
230
231 if (stream_class != ds_file_stream_class) {
232 /*
233 * Not supported: two packets described by two different
234 * stream classes within the same data stream file.
235 */
236 goto end;
237 }
238
239 stream = ds_file->stream->libObjPtr();
240
241 end:
242 return stream;
243 }
244
245 static enum ctf_msg_iter_medium_status medop_seek(off_t offset, void *data)
246 {
247 struct ctf_fs_ds_file *ds_file = (struct ctf_fs_ds_file *) data;
248
249 BT_ASSERT(offset >= 0);
250 BT_ASSERT(offset < ds_file->file->size);
251
252 return ds_file_mmap(ds_file, offset);
253 }
254
255 struct ctf_msg_iter_medium_ops ctf_fs_ds_file_medops = {
256 medop_request_bytes,
257 medop_seek,
258 nullptr,
259 medop_borrow_stream,
260 };
261
262 struct ctf_fs_ds_group_medops_data
263 {
264 explicit ctf_fs_ds_group_medops_data(const bt2c::Logger& parentLogger) :
265 logger {parentLogger, "PLUGIN/SRC.CTF.FS/DS-GROUP-MEDOPS"}
266 {
267 }
268
269 bt2c::Logger logger;
270
271 /* Weak, set once at creation time. */
272 struct ctf_fs_ds_file_group *ds_file_group = nullptr;
273
274 /*
275 * Index (as in element rank) of the index entry of ds_file_groups'
276 * index we will read next (so, the one after the one we are reading
277 * right now).
278 */
279 guint next_index_entry_index = 0;
280
281 /*
282 * File we are currently reading. Changes whenever we switch to
283 * reading another data file.
284 */
285 ctf_fs_ds_file::UP file;
286
287 /* Weak, for context / logging / appending causes. */
288 bt_self_message_iterator *self_msg_iter = nullptr;
289 };
290
291 static enum ctf_msg_iter_medium_status medop_group_request_bytes(size_t request_sz,
292 uint8_t **buffer_addr,
293 size_t *buffer_sz, void *void_data)
294 {
295 struct ctf_fs_ds_group_medops_data *data = (struct ctf_fs_ds_group_medops_data *) void_data;
296
297 /* Return bytes from the current file. */
298 return medop_request_bytes(request_sz, buffer_addr, buffer_sz, data->file.get());
299 }
300
301 static bt_stream *medop_group_borrow_stream(bt_stream_class *stream_class, int64_t stream_id,
302 void *void_data)
303 {
304 struct ctf_fs_ds_group_medops_data *data = (struct ctf_fs_ds_group_medops_data *) void_data;
305
306 return medop_borrow_stream(stream_class, stream_id, data->file.get());
307 }
308
309 /*
310 * Set `data->file` to prepare it to read the packet described
311 * by `index_entry`.
312 */
313
314 static enum ctf_msg_iter_medium_status
315 ctf_fs_ds_group_medops_set_file(struct ctf_fs_ds_group_medops_data *data,
316 struct ctf_fs_ds_index_entry *index_entry)
317 {
318 enum ctf_msg_iter_medium_status status;
319
320 BT_ASSERT(data);
321 BT_ASSERT(index_entry);
322
323 /* Check if that file is already the one mapped. */
324 if (!data->file || data->file->file->path != index_entry->path) {
325 /* Create the new file. */
326 data->file =
327 ctf_fs_ds_file_create(data->ds_file_group->ctf_fs_trace, data->ds_file_group->stream,
328 index_entry->path, data->logger);
329 if (!data->file) {
330 BT_CPPLOGE_APPEND_CAUSE_SPEC(data->logger, "failed to create ctf_fs_ds_file.");
331 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
332 goto end;
333 }
334 }
335
336 /*
337 * Ensure the right portion of the file will be returned on the next
338 * request_bytes call.
339 */
340 status = ds_file_mmap(data->file.get(), index_entry->offset.bytes());
341 if (status != CTF_MSG_ITER_MEDIUM_STATUS_OK) {
342 goto end;
343 }
344
345 status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
346
347 end:
348 return status;
349 }
350
351 static enum ctf_msg_iter_medium_status medop_group_switch_packet(void *void_data)
352 {
353 struct ctf_fs_ds_group_medops_data *data = (struct ctf_fs_ds_group_medops_data *) void_data;
354 struct ctf_fs_ds_index_entry *index_entry;
355 enum ctf_msg_iter_medium_status status;
356
357 /* If we have gone through all index entries, we are done. */
358 if (data->next_index_entry_index >= data->ds_file_group->index->entries.size()) {
359 status = CTF_MSG_ITER_MEDIUM_STATUS_EOF;
360 goto end;
361 }
362
363 /*
364 * Otherwise, look up the next index entry / packet and prepare it
365 * for reading.
366 */
367 index_entry = data->ds_file_group->index->entries[data->next_index_entry_index].get();
368
369 status = ctf_fs_ds_group_medops_set_file(data, index_entry);
370 if (status != CTF_MSG_ITER_MEDIUM_STATUS_OK) {
371 goto end;
372 }
373
374 data->next_index_entry_index++;
375
376 status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
377 end:
378 return status;
379 }
380
381 void ctf_fs_ds_group_medops_data_deleter::operator()(ctf_fs_ds_group_medops_data *data) noexcept
382 {
383 delete data;
384 }
385
386 enum ctf_msg_iter_medium_status ctf_fs_ds_group_medops_data_create(
387 struct ctf_fs_ds_file_group *ds_file_group, bt_self_message_iterator *self_msg_iter,
388 const bt2c::Logger& parentLogger, ctf_fs_ds_group_medops_data_up& out)
389 {
390 BT_ASSERT(self_msg_iter);
391 BT_ASSERT(ds_file_group);
392 BT_ASSERT(ds_file_group->index);
393 BT_ASSERT(!ds_file_group->index->entries.empty());
394
395 out.reset(new ctf_fs_ds_group_medops_data {parentLogger});
396
397 out->ds_file_group = ds_file_group;
398 out->self_msg_iter = self_msg_iter;
399
400 /*
401 * No need to prepare the first file. ctf_msg_iter will call
402 * switch_packet before reading the first packet, it will be
403 * done then.
404 */
405
406 return CTF_MSG_ITER_MEDIUM_STATUS_OK;
407 }
408
409 void ctf_fs_ds_group_medops_data_reset(struct ctf_fs_ds_group_medops_data *data)
410 {
411 data->next_index_entry_index = 0;
412 }
413
414 struct ctf_msg_iter_medium_ops ctf_fs_ds_group_medops = {
415 .request_bytes = medop_group_request_bytes,
416
417 /*
418 * We don't support seeking using this medops. It would probably be
419 * possible, but it's not needed at the moment.
420 */
421 .seek = NULL,
422
423 .switch_packet = medop_group_switch_packet,
424 .borrow_stream = medop_group_borrow_stream,
425 };
426
427 static ctf_fs_ds_index_entry::UP ctf_fs_ds_index_entry_create(const bt2c::DataLen offset,
428 const bt2c::DataLen packetSize)
429 {
430 ctf_fs_ds_index_entry::UP entry = bt2s::make_unique<ctf_fs_ds_index_entry>(offset, packetSize);
431
432 entry->packet_seq_num = UINT64_MAX;
433
434 return entry;
435 }
436
437 static int convert_cycles_to_ns(struct ctf_clock_class *clock_class, uint64_t cycles, int64_t *ns)
438 {
439 return bt_util_clock_cycles_to_ns_from_origin(cycles, clock_class->frequency,
440 clock_class->offset_seconds,
441 clock_class->offset_cycles, ns);
442 }
443
444 static ctf_fs_ds_index::UP build_index_from_idx_file(struct ctf_fs_ds_file *ds_file,
445 struct ctf_fs_ds_file_info *file_info,
446 struct ctf_msg_iter *msg_iter)
447 {
448 int ret;
449 bt2c::GCharUP directory;
450 bt2c::GCharUP basename;
451 GString *index_basename = NULL;
452 bt2c::GCharUP index_file_path;
453 GMappedFile *mapped_file = NULL;
454 gsize filesize;
455 const char *mmap_begin = NULL, *file_pos = NULL;
456 const struct ctf_packet_index_file_hdr *header = NULL;
457 ctf_fs_ds_index::UP index;
458 ctf_fs_ds_index_entry::UP index_entry;
459 ctf_fs_ds_index_entry *prev_index_entry = NULL;
460 auto totalPacketsSize = bt2c::DataLen::fromBytes(0);
461 size_t file_index_entry_size;
462 size_t file_entry_count;
463 size_t i;
464 struct ctf_stream_class *sc;
465 struct ctf_msg_iter_packet_properties props;
466 uint32_t version_major, version_minor;
467
468 BT_CPPLOGI_SPEC(ds_file->logger, "Building index from .idx file of stream file {}",
469 ds_file->file->path);
470 ret = ctf_msg_iter_get_packet_properties(msg_iter, &props);
471 if (ret) {
472 BT_CPPLOGI_STR_SPEC(ds_file->logger,
473 "Cannot read first packet's header and context fields.");
474 goto error;
475 }
476
477 sc = ctf_trace_class_borrow_stream_class_by_id(ds_file->metadata->tc, props.stream_class_id);
478 BT_ASSERT(sc);
479 if (!sc->default_clock_class) {
480 BT_CPPLOGI_STR_SPEC(ds_file->logger, "Cannot find stream class's default clock class.");
481 goto error;
482 }
483
484 /* Look for index file in relative path index/name.idx. */
485 basename.reset(g_path_get_basename(ds_file->file->path.c_str()));
486 if (!basename) {
487 BT_CPPLOGE_SPEC(ds_file->logger, "Cannot get the basename of datastream file {}",
488 ds_file->file->path);
489 goto error;
490 }
491
492 directory.reset(g_path_get_dirname(ds_file->file->path.c_str()));
493 if (!directory) {
494 BT_CPPLOGE_SPEC(ds_file->logger, "Cannot get dirname of datastream file {}",
495 ds_file->file->path);
496 goto error;
497 }
498
499 index_basename = g_string_new(basename.get());
500 if (!index_basename) {
501 BT_CPPLOGE_STR_SPEC(ds_file->logger, "Cannot allocate index file basename string");
502 goto error;
503 }
504
505 g_string_append(index_basename, ".idx");
506 index_file_path.reset(g_build_filename(directory.get(), "index", index_basename->str, NULL));
507 mapped_file = g_mapped_file_new(index_file_path.get(), FALSE, NULL);
508 if (!mapped_file) {
509 BT_CPPLOGD_SPEC(ds_file->logger, "Cannot create new mapped file {}", index_file_path.get());
510 goto error;
511 }
512
513 /*
514 * The g_mapped_file API limits us to 4GB files on 32-bit.
515 * Traces with such large indexes have never been seen in the wild,
516 * but this would need to be adjusted to support them.
517 */
518 filesize = g_mapped_file_get_length(mapped_file);
519 if (filesize < sizeof(*header)) {
520 BT_CPPLOGW_SPEC(ds_file->logger,
521 "Invalid LTTng trace index file: "
522 "file size ({} bytes) < header size ({} bytes)",
523 filesize, sizeof(*header));
524 goto error;
525 }
526
527 mmap_begin = g_mapped_file_get_contents(mapped_file);
528 header = (struct ctf_packet_index_file_hdr *) mmap_begin;
529
530 file_pos = g_mapped_file_get_contents(mapped_file) + sizeof(*header);
531 if (be32toh(header->magic) != CTF_INDEX_MAGIC) {
532 BT_CPPLOGW_STR_SPEC(ds_file->logger,
533 "Invalid LTTng trace index: \"magic\" field validation failed");
534 goto error;
535 }
536
537 version_major = be32toh(header->index_major);
538 version_minor = be32toh(header->index_minor);
539 if (version_major != 1) {
540 BT_CPPLOGW_SPEC(ds_file->logger, "Unknown LTTng trace index version: major={}, minor={}",
541 version_major, version_minor);
542 goto error;
543 }
544
545 file_index_entry_size = be32toh(header->packet_index_len);
546 if (file_index_entry_size < CTF_INDEX_1_0_SIZE) {
547 BT_CPPLOGW_SPEC(
548 ds_file->logger,
549 "Invalid `packet_index_len` in LTTng trace index file (`packet_index_len` < CTF index 1.0 index entry size): "
550 "packet_index_len={}, CTF_INDEX_1_0_SIZE={}",
551 file_index_entry_size, CTF_INDEX_1_0_SIZE);
552 goto error;
553 }
554
555 file_entry_count = (filesize - sizeof(*header)) / file_index_entry_size;
556 if ((filesize - sizeof(*header)) % file_index_entry_size) {
557 BT_CPPLOGW_SPEC(ds_file->logger,
558 "Invalid LTTng trace index: the index's size after the header "
559 "({} bytes) is not a multiple of the index entry size "
560 "({} bytes)",
561 (filesize - sizeof(*header)), sizeof(*header));
562 goto error;
563 }
564
565 index = bt2s::make_unique<ctf_fs_ds_index>();
566
567 for (i = 0; i < file_entry_count; i++) {
568 struct ctf_packet_index *file_index = (struct ctf_packet_index *) file_pos;
569 const auto packetSize = bt2c::DataLen::fromBits(be64toh(file_index->packet_size));
570
571 if (packetSize.hasExtraBits()) {
572 BT_CPPLOGW_SPEC(ds_file->logger,
573 "Invalid packet size encountered in LTTng trace index file");
574 goto error;
575 }
576
577 const auto offset = bt2c::DataLen::fromBytes(be64toh(file_index->offset));
578
579 if (i != 0 && offset < prev_index_entry->offset) {
580 BT_CPPLOGW_SPEC(
581 ds_file->logger,
582 "Invalid, non-monotonic, packet offset encountered in LTTng trace index file: "
583 "previous offset={} bytes, current offset={} bytes",
584 prev_index_entry->offset.bytes(), offset.bytes());
585 goto error;
586 }
587
588 index_entry = ctf_fs_ds_index_entry_create(offset, packetSize);
589 if (!index_entry) {
590 BT_CPPLOGE_APPEND_CAUSE_SPEC(ds_file->logger,
591 "Failed to create a ctf_fs_ds_index_entry.");
592 goto error;
593 }
594
595 /* Set path to stream file. */
596 index_entry->path = file_info->path.c_str();
597
598 index_entry->timestamp_begin = be64toh(file_index->timestamp_begin);
599 index_entry->timestamp_end = be64toh(file_index->timestamp_end);
600 if (index_entry->timestamp_end < index_entry->timestamp_begin) {
601 BT_CPPLOGW_SPEC(
602 ds_file->logger,
603 "Invalid packet time bounds encountered in LTTng trace index file (begin > end): "
604 "timestamp_begin={}, timestamp_end={}",
605 index_entry->timestamp_begin, index_entry->timestamp_end);
606 goto error;
607 }
608
609 /* Convert the packet's bound to nanoseconds since Epoch. */
610 ret = convert_cycles_to_ns(sc->default_clock_class, index_entry->timestamp_begin,
611 &index_entry->timestamp_begin_ns);
612 if (ret) {
613 BT_CPPLOGI_STR_SPEC(
614 ds_file->logger,
615 "Failed to convert raw timestamp to nanoseconds since Epoch during index parsing");
616 goto error;
617 }
618 ret = convert_cycles_to_ns(sc->default_clock_class, index_entry->timestamp_end,
619 &index_entry->timestamp_end_ns);
620 if (ret) {
621 BT_CPPLOGI_STR_SPEC(
622 ds_file->logger,
623 "Failed to convert raw timestamp to nanoseconds since Epoch during LTTng trace index parsing");
624 goto error;
625 }
626
627 if (version_minor >= 1) {
628 index_entry->packet_seq_num = be64toh(file_index->packet_seq_num);
629 }
630
631 totalPacketsSize += packetSize;
632 file_pos += file_index_entry_size;
633
634 prev_index_entry = index_entry.get();
635
636 index->entries.emplace_back(std::move(index_entry));
637 }
638
639 /* Validate that the index addresses the complete stream. */
640 if (ds_file->file->size != totalPacketsSize.bytes()) {
641 BT_CPPLOGW_SPEC(ds_file->logger,
642 "Invalid LTTng trace index file; indexed size != stream file size: "
643 "file-size={} bytes, total-packets-size={} bytes",
644 ds_file->file->size, totalPacketsSize.bytes());
645 goto error;
646 }
647 end:
648 if (index_basename) {
649 g_string_free(index_basename, TRUE);
650 }
651 if (mapped_file) {
652 g_mapped_file_unref(mapped_file);
653 }
654 return index;
655 error:
656 index.reset();
657 goto end;
658 }
659
660 static int init_index_entry(struct ctf_fs_ds_index_entry *entry, struct ctf_fs_ds_file *ds_file,
661 struct ctf_msg_iter_packet_properties *props)
662 {
663 int ret = 0;
664 struct ctf_stream_class *sc;
665
666 sc = ctf_trace_class_borrow_stream_class_by_id(ds_file->metadata->tc, props->stream_class_id);
667 BT_ASSERT(sc);
668
669 if (props->snapshots.beginning_clock != UINT64_C(-1)) {
670 entry->timestamp_begin = props->snapshots.beginning_clock;
671
672 /* Convert the packet's bound to nanoseconds since Epoch. */
673 ret = convert_cycles_to_ns(sc->default_clock_class, props->snapshots.beginning_clock,
674 &entry->timestamp_begin_ns);
675 if (ret) {
676 BT_CPPLOGI_STR_SPEC(ds_file->logger,
677 "Failed to convert raw timestamp to nanoseconds since Epoch.");
678 goto end;
679 }
680 } else {
681 entry->timestamp_begin = UINT64_C(-1);
682 entry->timestamp_begin_ns = UINT64_C(-1);
683 }
684
685 if (props->snapshots.end_clock != UINT64_C(-1)) {
686 entry->timestamp_end = props->snapshots.end_clock;
687
688 /* Convert the packet's bound to nanoseconds since Epoch. */
689 ret = convert_cycles_to_ns(sc->default_clock_class, props->snapshots.end_clock,
690 &entry->timestamp_end_ns);
691 if (ret) {
692 BT_CPPLOGI_STR_SPEC(ds_file->logger,
693 "Failed to convert raw timestamp to nanoseconds since Epoch.");
694 goto end;
695 }
696 } else {
697 entry->timestamp_end = UINT64_C(-1);
698 entry->timestamp_end_ns = UINT64_C(-1);
699 }
700
701 end:
702 return ret;
703 }
704
705 static ctf_fs_ds_index::UP build_index_from_stream_file(struct ctf_fs_ds_file *ds_file,
706 struct ctf_fs_ds_file_info *file_info,
707 struct ctf_msg_iter *msg_iter)
708 {
709 int ret;
710 enum ctf_msg_iter_status iter_status = CTF_MSG_ITER_STATUS_OK;
711 auto currentPacketOffset = bt2c::DataLen::fromBytes(0);
712
713 BT_CPPLOGI_SPEC(ds_file->logger, "Indexing stream file {}", ds_file->file->path);
714
715 ctf_fs_ds_index::UP index = bt2s::make_unique<ctf_fs_ds_index>();
716
717 while (true) {
718 struct ctf_msg_iter_packet_properties props;
719
720 if (currentPacketOffset.bytes() > ds_file->file->size) {
721 BT_CPPLOGE_STR_SPEC(ds_file->logger,
722 "Unexpected current packet's offset (larger than file).");
723 goto error;
724 } else if (currentPacketOffset.bytes() == ds_file->file->size) {
725 /* No more data */
726 break;
727 }
728
729 iter_status = ctf_msg_iter_seek(msg_iter, currentPacketOffset.bytes());
730 if (iter_status != CTF_MSG_ITER_STATUS_OK) {
731 goto error;
732 }
733
734 iter_status = ctf_msg_iter_get_packet_properties(msg_iter, &props);
735 if (iter_status != CTF_MSG_ITER_STATUS_OK) {
736 goto error;
737 }
738
739 /*
740 * Get the current packet size from the packet header, if set. Else,
741 * assume there is a single packet in the file, so take the file size
742 * as the packet size.
743 */
744 const auto currentPacketSize = props.exp_packet_total_size >= 0 ?
745 bt2c::DataLen::fromBits(props.exp_packet_total_size) :
746 bt2c::DataLen::fromBytes(ds_file->file->size);
747
748 if ((currentPacketOffset + currentPacketSize).bytes() > ds_file->file->size) {
749 BT_CPPLOGW_SPEC(ds_file->logger,
750 "Invalid packet size reported in file: stream=\"{}\", "
751 "packet-offset-bytes={}, packet-size-bytes={}, "
752 "file-size-bytes={}",
753 ds_file->file->path, currentPacketOffset.bytes(),
754 currentPacketSize.bytes(), ds_file->file->size);
755 goto error;
756 }
757
758 auto index_entry = ctf_fs_ds_index_entry_create(currentPacketOffset, currentPacketSize);
759 if (!index_entry) {
760 BT_CPPLOGE_APPEND_CAUSE_SPEC(ds_file->logger,
761 "Failed to create a ctf_fs_ds_index_entry.");
762 goto error;
763 }
764
765 /* Set path to stream file. */
766 index_entry->path = file_info->path.c_str();
767
768 ret = init_index_entry(index_entry.get(), ds_file, &props);
769 if (ret) {
770 goto error;
771 }
772
773 index->entries.emplace_back(std::move(index_entry));
774
775 currentPacketOffset += currentPacketSize;
776 BT_CPPLOGD_SPEC(ds_file->logger,
777 "Seeking to next packet: current-packet-offset-bytes={}, "
778 "next-packet-offset-bytes={}",
779 (currentPacketOffset - currentPacketSize).bytes(),
780 currentPacketOffset.bytes());
781 }
782
783 end:
784 return index;
785
786 error:
787 index.reset();
788 goto end;
789 }
790
791 ctf_fs_ds_file::UP ctf_fs_ds_file_create(struct ctf_fs_trace *ctf_fs_trace,
792 bt2::Stream::Shared stream, const char *path,
793 const bt2c::Logger& parentLogger)
794 {
795 int ret;
796 auto ds_file = bt2s::make_unique<ctf_fs_ds_file>(parentLogger);
797 size_t offset_align;
798
799 ds_file->file = bt2s::make_unique<ctf_fs_file>(parentLogger);
800 ds_file->stream = std::move(stream);
801 ds_file->metadata = ctf_fs_trace->metadata.get();
802 ds_file->file->path = path;
803 ret = ctf_fs_file_open(ds_file->file.get(), "rb");
804 if (ret) {
805 goto error;
806 }
807
808 offset_align = bt_mmap_get_offset_align_size(static_cast<int>(ds_file->logger.level()));
809 ds_file->mmap_max_len = offset_align * 2048;
810
811 goto end;
812
813 error:
814 /* Do not touch "borrowed" file. */
815 ds_file.reset();
816
817 end:
818 return ds_file;
819 }
820
821 ctf_fs_ds_index::UP ctf_fs_ds_file_build_index(struct ctf_fs_ds_file *ds_file,
822 struct ctf_fs_ds_file_info *file_info,
823 struct ctf_msg_iter *msg_iter)
824 {
825 auto index = build_index_from_idx_file(ds_file, file_info, msg_iter);
826 if (index) {
827 goto end;
828 }
829
830 BT_CPPLOGI_SPEC(ds_file->logger, "Failed to build index from .index file; "
831 "falling back to stream indexing.");
832 index = build_index_from_stream_file(ds_file, file_info, msg_iter);
833 end:
834 return index;
835 }
836
837 ctf_fs_ds_file::~ctf_fs_ds_file()
838 {
839 (void) ds_file_munmap(this);
840 }
841
842 ctf_fs_ds_file_info::UP ctf_fs_ds_file_info_create(const char *path, int64_t begin_ns)
843 {
844 ctf_fs_ds_file_info::UP ds_file_info = bt2s::make_unique<ctf_fs_ds_file_info>();
845
846 ds_file_info->path = path;
847 ds_file_info->begin_ns = begin_ns;
848 return ds_file_info;
849 }
850
851 ctf_fs_ds_file_group::UP ctf_fs_ds_file_group_create(struct ctf_fs_trace *ctf_fs_trace,
852 struct ctf_stream_class *sc,
853 uint64_t stream_instance_id,
854 ctf_fs_ds_index::UP index)
855 {
856 ctf_fs_ds_file_group::UP ds_file_group {new ctf_fs_ds_file_group};
857
858 ds_file_group->index = std::move(index);
859
860 ds_file_group->stream_id = stream_instance_id;
861 BT_ASSERT(sc);
862 ds_file_group->sc = sc;
863 ds_file_group->ctf_fs_trace = ctf_fs_trace;
864
865 return ds_file_group;
866 }
This page took 0.047068 seconds and 5 git commands to generate.