cpp-common/bt2c/fmt.hpp: use `wise_enum::string_type` in `EnableIfIsWiseEnum` definition
[babeltrace.git] / src / plugins / ctf / fs-sink / fs-sink.cpp
1 /*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
5 */
6
7 #include <glib.h>
8 #include <stdio.h>
9
10 #include <babeltrace2/babeltrace.h>
11
12 #include "common/assert.h"
13 #include "cpp-common/vendor/fmt/format.h"
14 #include "ctfser/ctfser.h"
15
16 #include "plugins/common/param-validation/param-validation.h"
17
18 #include "fs-sink-ctf-meta.hpp"
19 #include "fs-sink-stream.hpp"
20 #include "fs-sink-trace.hpp"
21 #include "fs-sink.hpp"
22 #include "translate-trace-ir-to-ctf-ir.hpp"
23
24 static const char * const in_port_name = "in";
25
26 static bt_component_class_initialize_method_status
27 ensure_output_dir_exists(struct fs_sink_comp *fs_sink)
28 {
29 bt_component_class_initialize_method_status status =
30 BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
31 int ret;
32
33 ret = g_mkdir_with_parents(fs_sink->output_dir_path->str, 0755);
34 if (ret) {
35 BT_CPPLOGE_ERRNO_APPEND_CAUSE_SPEC(
36 fs_sink->logger, "Cannot create directories for output directory",
37 ": output-dir-path=\"{}\"", fs_sink->output_dir_path->str);
38 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
39 goto end;
40 }
41
42 end:
43 return status;
44 }
45
46 static bt_param_validation_map_value_entry_descr fs_sink_params_descr[] = {
47 {"path", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
48 bt_param_validation_value_descr::makeString()},
49 {"assume-single-trace", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
50 bt_param_validation_value_descr::makeBool()},
51 {"ignore-discarded-events", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
52 bt_param_validation_value_descr::makeBool()},
53 {"ignore-discarded-packets", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
54 bt_param_validation_value_descr::makeBool()},
55 {"quiet", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
56 bt_param_validation_value_descr::makeBool()},
57 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
58
59 static bt_component_class_initialize_method_status configure_component(struct fs_sink_comp *fs_sink,
60 const bt_value *params)
61 {
62 bt_component_class_initialize_method_status status;
63 const bt_value *value;
64 enum bt_param_validation_status validation_status;
65 gchar *validation_error;
66
67 validation_status =
68 bt_param_validation_validate(params, fs_sink_params_descr, &validation_error);
69 if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
70 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
71 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "{}", validation_error);
72 goto end;
73 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
74 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
75 goto end;
76 }
77
78 value = bt_value_map_borrow_entry_value_const(params, "path");
79 g_string_assign(fs_sink->output_dir_path, bt_value_string_get(value));
80
81 value = bt_value_map_borrow_entry_value_const(params, "assume-single-trace");
82 if (value) {
83 fs_sink->assume_single_trace = (bool) bt_value_bool_get(value);
84 }
85
86 value = bt_value_map_borrow_entry_value_const(params, "ignore-discarded-events");
87 if (value) {
88 fs_sink->ignore_discarded_events = (bool) bt_value_bool_get(value);
89 }
90
91 value = bt_value_map_borrow_entry_value_const(params, "ignore-discarded-packets");
92 if (value) {
93 fs_sink->ignore_discarded_packets = (bool) bt_value_bool_get(value);
94 }
95
96 value = bt_value_map_borrow_entry_value_const(params, "quiet");
97 if (value) {
98 fs_sink->quiet = (bool) bt_value_bool_get(value);
99 }
100
101 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
102
103 end:
104 g_free(validation_error);
105 return status;
106 }
107
108 static void destroy_fs_sink_comp(struct fs_sink_comp *fs_sink)
109 {
110 if (!fs_sink) {
111 goto end;
112 }
113
114 if (fs_sink->output_dir_path) {
115 g_string_free(fs_sink->output_dir_path, TRUE);
116 fs_sink->output_dir_path = NULL;
117 }
118
119 if (fs_sink->traces) {
120 g_hash_table_destroy(fs_sink->traces);
121 fs_sink->traces = NULL;
122 }
123
124 BT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(fs_sink->upstream_iter);
125 delete fs_sink;
126
127 end:
128 return;
129 }
130
131 bt_component_class_initialize_method_status ctf_fs_sink_init(bt_self_component_sink *self_comp_sink,
132 bt_self_component_sink_configuration *,
133 const bt_value *params, void *)
134 {
135 try {
136 bt_component_class_initialize_method_status status;
137 bt_self_component_add_port_status add_port_status;
138 struct fs_sink_comp *fs_sink = NULL;
139 bt_self_component *self_comp = bt_self_component_sink_as_self_component(self_comp_sink);
140
141 fs_sink = new fs_sink_comp {bt2::SelfSinkComponent {self_comp_sink}};
142 fs_sink->output_dir_path = g_string_new(NULL);
143 status = configure_component(fs_sink, params);
144 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
145 /* configure_component() logs errors */
146 goto end;
147 }
148
149 if (fs_sink->assume_single_trace &&
150 g_file_test(fs_sink->output_dir_path->str, G_FILE_TEST_EXISTS)) {
151 BT_CPPLOGE_APPEND_CAUSE_SPEC(
152 fs_sink->logger, "Single trace mode, but output path exists: output-path=\"{}\"",
153 fs_sink->output_dir_path->str);
154 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
155 goto end;
156 }
157
158 status = ensure_output_dir_exists(fs_sink);
159 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
160 /* ensure_output_dir_exists() logs errors */
161 goto end;
162 }
163
164 fs_sink->traces = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL,
165 (GDestroyNotify) fs_sink_trace_destroy);
166 if (!fs_sink->traces) {
167 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to allocate one GHashTable.");
168 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
169 goto end;
170 }
171
172 add_port_status =
173 bt_self_component_sink_add_input_port(self_comp_sink, in_port_name, NULL, NULL);
174 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
175 status = (bt_component_class_initialize_method_status) add_port_status;
176 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to add input port.");
177 goto end;
178 }
179
180 bt_self_component_set_data(self_comp, fs_sink);
181
182 end:
183 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
184 destroy_fs_sink_comp(fs_sink);
185 }
186
187 return status;
188 } catch (const std::bad_alloc&) {
189 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
190 } catch (const bt2::Error&) {
191 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
192 }
193 }
194
195 static inline struct fs_sink_stream *borrow_stream(struct fs_sink_comp *fs_sink,
196 const bt_stream *ir_stream)
197 {
198 const bt_trace *ir_trace = bt_stream_borrow_trace_const(ir_stream);
199 struct fs_sink_trace *trace;
200 struct fs_sink_stream *stream = NULL;
201
202 trace = (fs_sink_trace *) g_hash_table_lookup(fs_sink->traces, ir_trace);
203 if (G_UNLIKELY(!trace)) {
204 if (fs_sink->assume_single_trace && g_hash_table_size(fs_sink->traces) > 0) {
205 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger,
206 "Single trace mode, but getting more than one trace: "
207 "stream-name=\"{}\"",
208 bt2c::maybeNull(bt_stream_get_name(ir_stream)));
209 goto end;
210 }
211
212 trace = fs_sink_trace_create(fs_sink, ir_trace);
213 if (!trace) {
214 goto end;
215 }
216 }
217
218 stream = (fs_sink_stream *) g_hash_table_lookup(trace->streams, ir_stream);
219 if (G_UNLIKELY(!stream)) {
220 stream = fs_sink_stream_create(trace, ir_stream);
221 if (!stream) {
222 goto end;
223 }
224 }
225
226 end:
227 return stream;
228 }
229
230 static inline bt_component_class_sink_consume_method_status
231 handle_event_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
232 {
233 try {
234 int ret;
235 bt_component_class_sink_consume_method_status status =
236 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
237 const bt_event *ir_event = bt_message_event_borrow_event_const(msg);
238 const bt_stream *ir_stream = bt_event_borrow_stream_const(ir_event);
239 struct fs_sink_stream *stream;
240 struct fs_sink_ctf_event_class *ec = NULL;
241 const bt_clock_snapshot *cs = NULL;
242
243 stream = borrow_stream(fs_sink, ir_stream);
244 if (G_UNLIKELY(!stream)) {
245 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to borrow stream.");
246 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
247 goto end;
248 }
249
250 ret = try_translate_event_class_trace_ir_to_ctf_ir(
251 fs_sink, stream->sc, bt_event_borrow_class_const(ir_event), &ec);
252 if (ret) {
253 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger,
254 "Failed to translate event class to CTF IR.");
255 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
256 goto end;
257 }
258
259 BT_ASSERT_DBG(ec);
260
261 if (stream->sc->default_clock_class) {
262 cs = bt_message_event_borrow_default_clock_snapshot_const(msg);
263 }
264
265 /*
266 * If this event's stream does not support packets, then we
267 * lazily create artificial packets.
268 *
269 * The size of an artificial packet is arbitrarily at least
270 * 4 MiB (it usually is greater because we close it when
271 * comes the time to write a new event and the packet's content
272 * size is >= 4 MiB), except the last one which can be smaller.
273 */
274 if (G_UNLIKELY(!stream->sc->has_packets)) {
275 if (stream->packet_state.is_open &&
276 bt_ctfser_get_offset_in_current_packet_bits(&stream->ctfser) / 8 >=
277 4 * 1024 * 1024) {
278 /*
279 * Stream's current packet is larger than 4 MiB:
280 * close it. A new packet will be opened just
281 * below.
282 */
283 ret = fs_sink_stream_close_packet(stream, NULL);
284 if (ret) {
285 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to close packet.");
286 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
287 goto end;
288 }
289 }
290
291 if (!stream->packet_state.is_open) {
292 /* Stream's packet is not currently opened: open it */
293 ret = fs_sink_stream_open_packet(stream, NULL, NULL);
294 if (ret) {
295 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to open packet.");
296 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
297 goto end;
298 }
299 }
300 }
301
302 BT_ASSERT_DBG(stream->packet_state.is_open);
303 ret = fs_sink_stream_write_event(stream, cs, ir_event, ec);
304 if (G_UNLIKELY(ret)) {
305 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to write event.");
306 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
307 goto end;
308 }
309
310 end:
311 return status;
312 } catch (const std::bad_alloc&) {
313 return BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_MEMORY_ERROR;
314 } catch (const bt2::Error&) {
315 return BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
316 }
317 }
318
319 static inline bt_component_class_sink_consume_method_status
320 handle_packet_beginning_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
321 {
322 int ret;
323 bt_component_class_sink_consume_method_status status =
324 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
325 const bt_packet *ir_packet = bt_message_packet_beginning_borrow_packet_const(msg);
326 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
327 struct fs_sink_stream *stream;
328 const bt_clock_snapshot *cs = NULL;
329
330 stream = borrow_stream(fs_sink, ir_stream);
331 if (G_UNLIKELY(!stream)) {
332 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to borrow stream.");
333 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
334 goto end;
335 }
336
337 if (stream->sc->packets_have_ts_begin) {
338 cs = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
339 BT_ASSERT(cs);
340 }
341
342 /*
343 * If we previously received a discarded events message with
344 * a time range, make sure that its beginning time matches what's
345 * expected for CTF 1.8, that is:
346 *
347 * * Its beginning time is the previous packet's end
348 * time (or the current packet's beginning time if
349 * this is the first packet).
350 *
351 * We check this here instead of in handle_packet_end_msg()
352 * because we want to catch any incompatible message as early as
353 * possible to report the error.
354 *
355 * Validation of the discarded events message's end time is
356 * performed in handle_packet_end_msg().
357 */
358 if (stream->discarded_events_state.in_range) {
359 uint64_t expected_cs;
360
361 /*
362 * `stream->discarded_events_state.in_range` is only set
363 * when the stream class's discarded events have a time
364 * range.
365 *
366 * It is required that the packet beginning and end
367 * messages for this stream class have times when
368 * discarded events have a time range.
369 */
370 BT_ASSERT(stream->sc->discarded_events_has_ts);
371 BT_ASSERT(stream->sc->packets_have_ts_begin);
372 BT_ASSERT(stream->sc->packets_have_ts_end);
373
374 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
375 /* We're opening the first packet */
376 expected_cs = bt_clock_snapshot_get_value(cs);
377 } else {
378 expected_cs = stream->prev_packet_state.end_cs;
379 }
380
381 if (stream->discarded_events_state.beginning_cs != expected_cs) {
382 BT_CPPLOGE_APPEND_CAUSE_SPEC(
383 fs_sink->logger,
384 "Incompatible discarded events message: "
385 "unexpected beginning time: "
386 "beginning-cs-val={}, "
387 "expected-beginning-cs-val={}, "
388 "stream-id={}, stream-name=\"{}\", "
389 "trace-name=\"{}\", path=\"{}/{}\"",
390 stream->discarded_events_state.beginning_cs, expected_cs,
391 bt_stream_get_id(ir_stream), bt2c::maybeNull(bt_stream_get_name(ir_stream)),
392 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream))),
393 stream->trace->path->str, stream->file_name->str);
394 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
395 goto end;
396 }
397 }
398
399 /*
400 * If we previously received a discarded packets message with a
401 * time range, make sure that its beginning and end times match
402 * what's expected for CTF 1.8, that is:
403 *
404 * * Its beginning time is the previous packet's end time.
405 *
406 * * Its end time is the current packet's beginning time.
407 */
408 if (stream->discarded_packets_state.in_range) {
409 uint64_t expected_end_cs;
410
411 /*
412 * `stream->discarded_packets_state.in_range` is only
413 * set when the stream class's discarded packets have a
414 * time range.
415 *
416 * It is required that the packet beginning and end
417 * messages for this stream class have times when
418 * discarded packets have a time range.
419 */
420 BT_ASSERT(stream->sc->discarded_packets_has_ts);
421 BT_ASSERT(stream->sc->packets_have_ts_begin);
422 BT_ASSERT(stream->sc->packets_have_ts_end);
423
424 /*
425 * It is not supported to have a discarded packets
426 * message _before_ the first packet: we cannot validate
427 * that its beginning time is compatible with CTF 1.8 in
428 * this case.
429 */
430 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
431 BT_CPPLOGE_APPEND_CAUSE_SPEC(
432 fs_sink->logger,
433 "Incompatible discarded packets message "
434 "occurring before the stream's first packet: "
435 "stream-id={}, stream-name=\"{}\", "
436 "trace-name=\"{}\", path=\"{}/{}\"",
437 bt_stream_get_id(ir_stream), bt2c::maybeNull(bt_stream_get_name(ir_stream)),
438 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream))),
439 stream->trace->path->str, stream->file_name->str);
440 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
441 goto end;
442 }
443
444 if (stream->discarded_packets_state.beginning_cs != stream->prev_packet_state.end_cs) {
445 BT_CPPLOGE_APPEND_CAUSE_SPEC(
446 fs_sink->logger,
447 "Incompatible discarded packets message: "
448 "unexpected beginning time: "
449 "beginning-cs-val={}, "
450 "expected-beginning-cs-val={}, "
451 "stream-id={}, stream-name=\"{}\", "
452 "trace-name=\"{}\", path=\"{}/{}\"",
453 stream->discarded_packets_state.beginning_cs, stream->prev_packet_state.end_cs,
454 bt_stream_get_id(ir_stream), bt2c::maybeNull(bt_stream_get_name(ir_stream)),
455 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream))),
456 stream->trace->path->str, stream->file_name->str);
457 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
458 goto end;
459 }
460
461 expected_end_cs = bt_clock_snapshot_get_value(cs);
462
463 if (stream->discarded_packets_state.end_cs != expected_end_cs) {
464 BT_CPPLOGE_APPEND_CAUSE_SPEC(
465 fs_sink->logger,
466 "Incompatible discarded packets message: "
467 "unexpected end time: "
468 "end-cs-val={}, "
469 "expected-end-cs-val={}, "
470 "stream-id={}, stream-name=\"{}\", "
471 "trace-name=\"{}\", path=\"{}/{}\"",
472 stream->discarded_packets_state.end_cs, expected_end_cs,
473 bt_stream_get_id(ir_stream), bt2c::maybeNull(bt_stream_get_name(ir_stream)),
474 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream))),
475 stream->trace->path->str, stream->file_name->str);
476 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
477 goto end;
478 }
479 }
480
481 /*
482 * We're not in a discarded packets time range anymore since we
483 * require that the discarded packets time ranges go from one
484 * packet's end time to the next packet's beginning time, and
485 * we're handling a packet beginning message here.
486 */
487 stream->discarded_packets_state.in_range = false;
488
489 ret = fs_sink_stream_open_packet(stream, cs, ir_packet);
490 if (ret) {
491 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to open packet.");
492 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
493 goto end;
494 }
495
496 end:
497 return status;
498 }
499
500 static inline bt_component_class_sink_consume_method_status
501 handle_packet_end_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
502 {
503 int ret;
504 bt_component_class_sink_consume_method_status status =
505 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
506 const bt_packet *ir_packet = bt_message_packet_end_borrow_packet_const(msg);
507 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
508 struct fs_sink_stream *stream;
509 const bt_clock_snapshot *cs = NULL;
510
511 stream = borrow_stream(fs_sink, ir_stream);
512 if (G_UNLIKELY(!stream)) {
513 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to borrow stream.");
514 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
515 goto end;
516 }
517
518 if (stream->sc->packets_have_ts_end) {
519 cs = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
520 BT_ASSERT(cs);
521 }
522
523 /*
524 * If we previously received a discarded events message with
525 * a time range, make sure that its end time matches what's
526 * expected for CTF 1.8, that is:
527 *
528 * * Its end time is the current packet's end time.
529 *
530 * Validation of the discarded events message's beginning time
531 * is performed in handle_packet_beginning_msg().
532 */
533 if (stream->discarded_events_state.in_range) {
534 uint64_t expected_cs;
535
536 /*
537 * `stream->discarded_events_state.in_range` is only set
538 * when the stream class's discarded events have a time
539 * range.
540 *
541 * It is required that the packet beginning and end
542 * messages for this stream class have times when
543 * discarded events have a time range.
544 */
545 BT_ASSERT(stream->sc->discarded_events_has_ts);
546 BT_ASSERT(stream->sc->packets_have_ts_begin);
547 BT_ASSERT(stream->sc->packets_have_ts_end);
548
549 expected_cs = bt_clock_snapshot_get_value(cs);
550
551 if (stream->discarded_events_state.end_cs != expected_cs) {
552 BT_CPPLOGE_APPEND_CAUSE_SPEC(
553 fs_sink->logger,
554 "Incompatible discarded events message: "
555 "unexpected end time: "
556 "end-cs-val={}, "
557 "expected-end-cs-val={}, "
558 "stream-id={}, stream-name=\"{}\", "
559 "trace-name=\"{}\", path=\"{}/{}\"",
560 stream->discarded_events_state.end_cs, expected_cs, bt_stream_get_id(ir_stream),
561 bt2c::maybeNull(bt_stream_get_name(ir_stream)),
562 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream))),
563 stream->trace->path->str, stream->file_name->str);
564 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
565 goto end;
566 }
567 }
568
569 ret = fs_sink_stream_close_packet(stream, cs);
570 if (ret) {
571 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to close packet.");
572 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
573 goto end;
574 }
575
576 /*
577 * We're not in a discarded events time range anymore since we
578 * require that the discarded events time ranges go from one
579 * packet's end time to the next packet's end time, and we're
580 * handling a packet end message here.
581 */
582 stream->discarded_events_state.in_range = false;
583
584 end:
585 return status;
586 }
587
588 static inline bt_component_class_sink_consume_method_status
589 handle_stream_beginning_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
590 {
591 bt_component_class_sink_consume_method_status status =
592 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
593 const bt_stream *ir_stream = bt_message_stream_beginning_borrow_stream_const(msg);
594 const bt_stream_class *ir_sc = bt_stream_borrow_class_const(ir_stream);
595 struct fs_sink_stream *stream;
596 bool packets_have_beginning_end_cs =
597 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc) &&
598 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc);
599
600 /*
601 * Not supported: discarded events or discarded packets support
602 * without packets support. Packets are the way to know where
603 * discarded events/packets occurred in CTF 1.8.
604 */
605 if (!bt_stream_class_supports_packets(ir_sc)) {
606 BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc));
607
608 if (!fs_sink->ignore_discarded_events && bt_stream_class_supports_discarded_events(ir_sc)) {
609 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger,
610 "Unsupported stream: "
611 "stream does not support packets, "
612 "but supports discarded events: "
613 "stream-addr={}, "
614 "stream-id={}, "
615 "stream-name=\"{}\"",
616 fmt::ptr(ir_stream), bt_stream_get_id(ir_stream),
617 bt2c::maybeNull(bt_stream_get_name(ir_stream)));
618 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
619 goto end;
620 }
621 }
622
623 /*
624 * Not supported: discarded events with default clock snapshots,
625 * but packet beginning/end without default clock snapshot.
626 */
627 if (!fs_sink->ignore_discarded_events &&
628 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc) &&
629 !packets_have_beginning_end_cs) {
630 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger,
631 "Unsupported stream: discarded events have "
632 "default clock snapshots, but packets have no "
633 "beginning and/or end default clock snapshots: "
634 "stream-addr={}, "
635 "stream-id={}, "
636 "stream-name=\"{}\"",
637 fmt::ptr(ir_stream), bt_stream_get_id(ir_stream),
638 bt2c::maybeNull(bt_stream_get_name(ir_stream)));
639 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
640 goto end;
641 }
642
643 /*
644 * Not supported: discarded packets with default clock
645 * snapshots, but packet beginning/end without default clock
646 * snapshot.
647 */
648 if (!fs_sink->ignore_discarded_packets &&
649 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc) &&
650 !packets_have_beginning_end_cs) {
651 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger,
652 "Unsupported stream: discarded packets have "
653 "default clock snapshots, but packets have no "
654 "beginning and/or end default clock snapshots: "
655 "stream-addr={}, "
656 "stream-id={}, "
657 "stream-name=\"{}\"",
658 fmt::ptr(ir_stream), bt_stream_get_id(ir_stream),
659 bt2c::maybeNull(bt_stream_get_name(ir_stream)));
660 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
661 goto end;
662 }
663
664 stream = borrow_stream(fs_sink, ir_stream);
665 if (!stream) {
666 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to borrow stream.");
667 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
668 goto end;
669 }
670
671 BT_CPPLOGI_SPEC(fs_sink->logger,
672 "Created new, empty stream file: "
673 "stream-id={}, stream-name=\"{}\", "
674 "trace-name=\"{}\", path=\"{}/{}\"",
675 bt_stream_get_id(ir_stream), bt2c::maybeNull(bt_stream_get_name(ir_stream)),
676 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream))),
677 stream->trace->path->str, stream->file_name->str);
678
679 end:
680 return status;
681 }
682
683 static inline bt_component_class_sink_consume_method_status
684 handle_stream_end_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
685 {
686 bt_component_class_sink_consume_method_status status =
687 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
688 const bt_stream *ir_stream = bt_message_stream_end_borrow_stream_const(msg);
689 struct fs_sink_stream *stream;
690
691 stream = borrow_stream(fs_sink, ir_stream);
692 if (!stream) {
693 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to borrow stream.");
694 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
695 goto end;
696 }
697
698 if (G_UNLIKELY(!stream->sc->has_packets && stream->packet_state.is_open)) {
699 /* Close stream's current artificial packet */
700 int ret = fs_sink_stream_close_packet(stream, NULL);
701
702 if (ret) {
703 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to close packet.");
704 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
705 goto end;
706 }
707 }
708
709 BT_CPPLOGI_SPEC(fs_sink->logger,
710 "Closing stream file: "
711 "stream-id={}, stream-name=\"{}\", "
712 "trace-name=\"{}\", path=\"{}/{}\"",
713 bt_stream_get_id(ir_stream), bt2c::maybeNull(bt_stream_get_name(ir_stream)),
714 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream))),
715 stream->trace->path->str, stream->file_name->str);
716
717 /*
718 * This destroys the stream object and frees all its resources,
719 * closing the stream file.
720 */
721 g_hash_table_remove(stream->trace->streams, ir_stream);
722
723 end:
724 return status;
725 }
726
727 static inline bt_component_class_sink_consume_method_status
728 handle_discarded_events_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
729 {
730 bt_component_class_sink_consume_method_status status =
731 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
732 const bt_stream *ir_stream = bt_message_discarded_events_borrow_stream_const(msg);
733 struct fs_sink_stream *stream;
734 const bt_clock_snapshot *cs = NULL;
735 bt_property_availability avail;
736 uint64_t count;
737
738 stream = borrow_stream(fs_sink, ir_stream);
739 if (!stream) {
740 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to borrow stream.");
741 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
742 goto end;
743 }
744
745 if (fs_sink->ignore_discarded_events) {
746 BT_CPPLOGI_SPEC(fs_sink->logger,
747 "Ignoring discarded events message: "
748 "stream-id={}, stream-name=\"{}\", "
749 "trace-name=\"{}\", path=\"{}/{}\"",
750 bt_stream_get_id(ir_stream), bt2c::maybeNull(bt_stream_get_name(ir_stream)),
751 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream))),
752 stream->trace->path->str, stream->file_name->str);
753 goto end;
754 }
755
756 if (stream->discarded_events_state.in_range) {
757 BT_CPPLOGE_APPEND_CAUSE_SPEC(
758 fs_sink->logger,
759 "Unsupported contiguous discarded events message: "
760 "stream-id={}, stream-name=\"{}\", "
761 "trace-name=\"{}\", path=\"{}/{}\"",
762 bt_stream_get_id(ir_stream), bt2c::maybeNull(bt_stream_get_name(ir_stream)),
763 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream))),
764 stream->trace->path->str, stream->file_name->str);
765 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
766 goto end;
767 }
768
769 /*
770 * If we're currently in an opened packet (got a packet
771 * beginning message, but no packet end message yet), we do not
772 * support having a discarded events message with a time range
773 * because we require that the discarded events message's time
774 * range go from a packet's end time to the next packet's end
775 * time.
776 */
777 if (stream->packet_state.is_open && stream->sc->discarded_events_has_ts) {
778 BT_CPPLOGE_APPEND_CAUSE_SPEC(
779 fs_sink->logger,
780 "Unsupported discarded events message with "
781 "default clock snapshots occurring within a packet: "
782 "stream-id={}, stream-name=\"{}\", "
783 "trace-name=\"{}\", path=\"{}/{}\"",
784 bt_stream_get_id(ir_stream), bt2c::maybeNull(bt_stream_get_name(ir_stream)),
785 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream))),
786 stream->trace->path->str, stream->file_name->str);
787 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
788 goto end;
789 }
790
791 if (stream->sc->discarded_events_has_ts) {
792 /*
793 * Make the stream's state be in the time range of a
794 * discarded events message since we have the message's
795 * time range (`stream->sc->discarded_events_has_ts`).
796 */
797 stream->discarded_events_state.in_range = true;
798
799 /*
800 * The clock snapshot values will be validated when
801 * handling the next packet beginning and end messages
802 * (next calls to handle_packet_beginning_msg() and
803 * handle_packet_end_msg()).
804 */
805 cs = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
806 BT_ASSERT(cs);
807 stream->discarded_events_state.beginning_cs = bt_clock_snapshot_get_value(cs);
808 cs = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg);
809 BT_ASSERT(cs);
810 stream->discarded_events_state.end_cs = bt_clock_snapshot_get_value(cs);
811 }
812
813 avail = bt_message_discarded_events_get_count(msg, &count);
814 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
815 /*
816 * There's no specific count of discarded events: set it
817 * to 1 so that we know that we at least discarded
818 * something.
819 */
820 count = 1;
821 }
822
823 stream->packet_state.discarded_events_counter += count;
824
825 end:
826 return status;
827 }
828
829 static inline bt_component_class_sink_consume_method_status
830 handle_discarded_packets_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
831 {
832 bt_component_class_sink_consume_method_status status =
833 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
834 const bt_stream *ir_stream = bt_message_discarded_packets_borrow_stream_const(msg);
835 struct fs_sink_stream *stream;
836 const bt_clock_snapshot *cs = NULL;
837 bt_property_availability avail;
838 uint64_t count;
839
840 stream = borrow_stream(fs_sink, ir_stream);
841 if (!stream) {
842 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to borrow stream.");
843 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
844 goto end;
845 }
846
847 if (fs_sink->ignore_discarded_packets) {
848 BT_CPPLOGI_SPEC(fs_sink->logger,
849 "Ignoring discarded packets message: "
850 "stream-id={}, stream-name=\"{}\", "
851 "trace-name=\"{}\", path=\"{}/{}\"",
852 bt_stream_get_id(ir_stream), bt2c::maybeNull(bt_stream_get_name(ir_stream)),
853 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream))),
854 stream->trace->path->str, stream->file_name->str);
855 goto end;
856 }
857
858 if (stream->discarded_packets_state.in_range) {
859 BT_CPPLOGE_APPEND_CAUSE_SPEC(
860 fs_sink->logger,
861 "Unsupported contiguous discarded packets message: "
862 "stream-id={}, stream-name=\"{}\", "
863 "trace-name=\"{}\", path=\"{}/{}\"",
864 bt_stream_get_id(ir_stream), bt2c::maybeNull(bt_stream_get_name(ir_stream)),
865 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream))),
866 stream->trace->path->str, stream->file_name->str);
867 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
868 goto end;
869 }
870
871 /*
872 * Discarded packets messages are guaranteed to occur between
873 * packets.
874 */
875 BT_ASSERT(!stream->packet_state.is_open);
876
877 if (stream->sc->discarded_packets_has_ts) {
878 /*
879 * Make the stream's state be in the time range of a
880 * discarded packets message since we have the message's
881 * time range (`stream->sc->discarded_packets_has_ts`).
882 */
883 stream->discarded_packets_state.in_range = true;
884
885 /*
886 * The clock snapshot values will be validated when
887 * handling the next packet beginning message (next call
888 * to handle_packet_beginning_msg()).
889 */
890 cs = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
891 BT_ASSERT(cs);
892 stream->discarded_packets_state.beginning_cs = bt_clock_snapshot_get_value(cs);
893 cs = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg);
894 BT_ASSERT(cs);
895 stream->discarded_packets_state.end_cs = bt_clock_snapshot_get_value(cs);
896 }
897
898 avail = bt_message_discarded_packets_get_count(msg, &count);
899 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
900 /*
901 * There's no specific count of discarded packets: set
902 * it to 1 so that we know that we at least discarded
903 * something.
904 */
905 count = 1;
906 }
907
908 stream->packet_state.seq_num += count;
909
910 end:
911 return status;
912 }
913
914 static inline void put_messages(bt_message_array_const msgs, uint64_t count)
915 {
916 uint64_t i;
917
918 for (i = 0; i < count; i++) {
919 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
920 }
921 }
922
923 bt_component_class_sink_consume_method_status ctf_fs_sink_consume(bt_self_component_sink *self_comp)
924 {
925 bt_component_class_sink_consume_method_status status =
926 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
927 struct fs_sink_comp *fs_sink;
928 bt_message_iterator_next_status next_status;
929 uint64_t msg_count = 0;
930 bt_message_array_const msgs;
931
932 fs_sink = (fs_sink_comp *) bt_self_component_get_data(
933 bt_self_component_sink_as_self_component(self_comp));
934 BT_ASSERT_DBG(fs_sink);
935 BT_ASSERT_DBG(fs_sink->upstream_iter);
936
937 /* Consume messages */
938 next_status = bt_message_iterator_next(fs_sink->upstream_iter, &msgs, &msg_count);
939 if (next_status < 0) {
940 status = (bt_component_class_sink_consume_method_status) next_status;
941 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger,
942 "Failed to get next message from upstream iterator.");
943 goto end;
944 }
945
946 switch (next_status) {
947 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK:
948 {
949 uint64_t i;
950
951 for (i = 0; i < msg_count; i++) {
952 const bt_message *msg = msgs[i];
953
954 BT_ASSERT_DBG(msg);
955
956 switch (bt_message_get_type(msg)) {
957 case BT_MESSAGE_TYPE_EVENT:
958 status = handle_event_msg(fs_sink, msg);
959 break;
960 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
961 status = handle_packet_beginning_msg(fs_sink, msg);
962 break;
963 case BT_MESSAGE_TYPE_PACKET_END:
964 status = handle_packet_end_msg(fs_sink, msg);
965 break;
966 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
967 /* Ignore */
968 BT_CPPLOGD_STR_SPEC(fs_sink->logger,
969 "Ignoring message iterator inactivity message.");
970 break;
971 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
972 status = handle_stream_beginning_msg(fs_sink, msg);
973 break;
974 case BT_MESSAGE_TYPE_STREAM_END:
975 status = handle_stream_end_msg(fs_sink, msg);
976 break;
977 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
978 status = handle_discarded_events_msg(fs_sink, msg);
979 break;
980 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
981 status = handle_discarded_packets_msg(fs_sink, msg);
982 break;
983 default:
984 bt_common_abort();
985 }
986
987 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
988
989 if (status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK) {
990 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger,
991 "Failed to handle message: "
992 "generated CTF traces could be incomplete: "
993 "output-dir-path=\"{}\"",
994 fs_sink->output_dir_path->str);
995 goto error;
996 }
997 }
998
999 break;
1000 }
1001 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN:
1002 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN;
1003 break;
1004 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END:
1005 /* TODO: Finalize all traces (should already be done?) */
1006 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END;
1007 break;
1008 default:
1009 break;
1010 }
1011
1012 goto end;
1013
1014 error:
1015 BT_ASSERT(status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK);
1016 put_messages(msgs, msg_count);
1017
1018 end:
1019 return status;
1020 }
1021
1022 bt_component_class_sink_graph_is_configured_method_status
1023 ctf_fs_sink_graph_is_configured(bt_self_component_sink *self_comp)
1024 {
1025 try {
1026 bt_component_class_sink_graph_is_configured_method_status status;
1027 bt_message_iterator_create_from_sink_component_status msg_iter_status;
1028 fs_sink_comp *fs_sink = (fs_sink_comp *) bt_self_component_get_data(
1029 bt_self_component_sink_as_self_component(self_comp));
1030
1031 msg_iter_status = bt_message_iterator_create_from_sink_component(
1032 self_comp, bt_self_component_sink_borrow_input_port_by_name(self_comp, in_port_name),
1033 &fs_sink->upstream_iter);
1034 if (msg_iter_status != BT_MESSAGE_ITERATOR_CREATE_FROM_SINK_COMPONENT_STATUS_OK) {
1035 status = (bt_component_class_sink_graph_is_configured_method_status) msg_iter_status;
1036 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to create upstream iterator.");
1037 goto end;
1038 }
1039
1040 status = BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK;
1041 end:
1042 return status;
1043 } catch (const std::bad_alloc&) {
1044 return BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_MEMORY_ERROR;
1045 } catch (const bt2c::Error&) {
1046 return BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_ERROR;
1047 }
1048 }
1049
1050 void ctf_fs_sink_finalize(bt_self_component_sink *self_comp)
1051 {
1052 fs_sink_comp *fs_sink = (fs_sink_comp *) bt_self_component_get_data(
1053 bt_self_component_sink_as_self_component(self_comp));
1054
1055 destroy_fs_sink_comp(fs_sink);
1056 }
This page took 0.050886 seconds and 4 git commands to generate.