2 * SPDX-License-Identifier: MIT
4 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
10 #include <babeltrace2/babeltrace.h>
12 #include "common/assert.h"
13 #include "cpp-common/vendor/fmt/format.h"
14 #include "ctfser/ctfser.h"
16 #include "plugins/common/param-validation/param-validation.h"
18 #include "fs-sink-ctf-meta.hpp"
19 #include "fs-sink-stream.hpp"
20 #include "fs-sink-trace.hpp"
21 #include "fs-sink.hpp"
22 #include "translate-trace-ir-to-ctf-ir.hpp"
24 static const char * const in_port_name
= "in";
26 static bt_component_class_initialize_method_status
27 ensure_output_dir_exists(struct fs_sink_comp
*fs_sink
)
29 bt_component_class_initialize_method_status status
=
30 BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
;
33 ret
= g_mkdir_with_parents(fs_sink
->output_dir_path
->str
, 0755);
35 BT_CPPLOGE_ERRNO_APPEND_CAUSE_SPEC(
36 fs_sink
->logger
, "Cannot create directories for output directory",
37 ": output-dir-path=\"{}\"", fs_sink
->output_dir_path
->str
);
38 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
46 static bt_param_validation_map_value_entry_descr fs_sink_params_descr
[] = {
47 {"path", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY
,
48 bt_param_validation_value_descr::makeString()},
49 {"assume-single-trace", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
50 bt_param_validation_value_descr::makeBool()},
51 {"ignore-discarded-events", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
52 bt_param_validation_value_descr::makeBool()},
53 {"ignore-discarded-packets", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
54 bt_param_validation_value_descr::makeBool()},
55 {"quiet", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
56 bt_param_validation_value_descr::makeBool()},
57 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END
};
59 static bt_component_class_initialize_method_status
configure_component(struct fs_sink_comp
*fs_sink
,
60 const bt_value
*params
)
62 bt_component_class_initialize_method_status status
;
63 const bt_value
*value
;
64 enum bt_param_validation_status validation_status
;
65 gchar
*validation_error
;
68 bt_param_validation_validate(params
, fs_sink_params_descr
, &validation_error
);
69 if (validation_status
== BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR
) {
70 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
71 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "{}", validation_error
);
73 } else if (validation_status
== BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR
) {
74 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
78 value
= bt_value_map_borrow_entry_value_const(params
, "path");
79 g_string_assign(fs_sink
->output_dir_path
, bt_value_string_get(value
));
81 value
= bt_value_map_borrow_entry_value_const(params
, "assume-single-trace");
83 fs_sink
->assume_single_trace
= (bool) bt_value_bool_get(value
);
86 value
= bt_value_map_borrow_entry_value_const(params
, "ignore-discarded-events");
88 fs_sink
->ignore_discarded_events
= (bool) bt_value_bool_get(value
);
91 value
= bt_value_map_borrow_entry_value_const(params
, "ignore-discarded-packets");
93 fs_sink
->ignore_discarded_packets
= (bool) bt_value_bool_get(value
);
96 value
= bt_value_map_borrow_entry_value_const(params
, "quiet");
98 fs_sink
->quiet
= (bool) bt_value_bool_get(value
);
101 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
;
104 g_free(validation_error
);
108 static void destroy_fs_sink_comp(struct fs_sink_comp
*fs_sink
)
114 if (fs_sink
->output_dir_path
) {
115 g_string_free(fs_sink
->output_dir_path
, TRUE
);
116 fs_sink
->output_dir_path
= NULL
;
119 if (fs_sink
->traces
) {
120 g_hash_table_destroy(fs_sink
->traces
);
121 fs_sink
->traces
= NULL
;
124 BT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(fs_sink
->upstream_iter
);
131 bt_component_class_initialize_method_status
ctf_fs_sink_init(bt_self_component_sink
*self_comp_sink
,
132 bt_self_component_sink_configuration
*,
133 const bt_value
*params
, void *)
135 bt_component_class_initialize_method_status status
;
136 bt_self_component_add_port_status add_port_status
;
137 struct fs_sink_comp
*fs_sink
= NULL
;
138 bt_self_component
*self_comp
= bt_self_component_sink_as_self_component(self_comp_sink
);
140 fs_sink
= new fs_sink_comp
{bt2::SelfSinkComponent
{self_comp_sink
}};
141 fs_sink
->output_dir_path
= g_string_new(NULL
);
142 status
= configure_component(fs_sink
, params
);
143 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
144 /* configure_component() logs errors */
148 if (fs_sink
->assume_single_trace
&&
149 g_file_test(fs_sink
->output_dir_path
->str
, G_FILE_TEST_EXISTS
)) {
150 BT_CPPLOGE_APPEND_CAUSE_SPEC(
151 fs_sink
->logger
, "Single trace mode, but output path exists: output-path=\"{}\"",
152 fs_sink
->output_dir_path
->str
);
153 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
157 status
= ensure_output_dir_exists(fs_sink
);
158 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
159 /* ensure_output_dir_exists() logs errors */
163 fs_sink
->traces
= g_hash_table_new_full(g_direct_hash
, g_direct_equal
, NULL
,
164 (GDestroyNotify
) fs_sink_trace_destroy
);
165 if (!fs_sink
->traces
) {
166 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to allocate one GHashTable.");
167 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
172 bt_self_component_sink_add_input_port(self_comp_sink
, in_port_name
, NULL
, NULL
);
173 if (add_port_status
!= BT_SELF_COMPONENT_ADD_PORT_STATUS_OK
) {
174 status
= (bt_component_class_initialize_method_status
) add_port_status
;
175 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to add input port.");
179 bt_self_component_set_data(self_comp
, fs_sink
);
182 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
183 destroy_fs_sink_comp(fs_sink
);
189 static inline struct fs_sink_stream
*borrow_stream(struct fs_sink_comp
*fs_sink
,
190 const bt_stream
*ir_stream
)
192 const bt_trace
*ir_trace
= bt_stream_borrow_trace_const(ir_stream
);
193 struct fs_sink_trace
*trace
;
194 struct fs_sink_stream
*stream
= NULL
;
196 trace
= (fs_sink_trace
*) g_hash_table_lookup(fs_sink
->traces
, ir_trace
);
197 if (G_UNLIKELY(!trace
)) {
198 if (fs_sink
->assume_single_trace
&& g_hash_table_size(fs_sink
->traces
) > 0) {
199 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
,
200 "Single trace mode, but getting more than one trace: "
201 "stream-name=\"{}\"",
202 bt2c::maybeNull(bt_stream_get_name(ir_stream
)));
206 trace
= fs_sink_trace_create(fs_sink
, ir_trace
);
212 stream
= (fs_sink_stream
*) g_hash_table_lookup(trace
->streams
, ir_stream
);
213 if (G_UNLIKELY(!stream
)) {
214 stream
= fs_sink_stream_create(trace
, ir_stream
);
224 static inline bt_component_class_sink_consume_method_status
225 handle_event_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
228 bt_component_class_sink_consume_method_status status
=
229 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
230 const bt_event
*ir_event
= bt_message_event_borrow_event_const(msg
);
231 const bt_stream
*ir_stream
= bt_event_borrow_stream_const(ir_event
);
232 struct fs_sink_stream
*stream
;
233 struct fs_sink_ctf_event_class
*ec
= NULL
;
234 const bt_clock_snapshot
*cs
= NULL
;
236 stream
= borrow_stream(fs_sink
, ir_stream
);
237 if (G_UNLIKELY(!stream
)) {
238 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to borrow stream.");
239 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
243 ret
= try_translate_event_class_trace_ir_to_ctf_ir(fs_sink
, stream
->sc
,
244 bt_event_borrow_class_const(ir_event
), &ec
);
246 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to translate event class to CTF IR.");
247 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
253 if (stream
->sc
->default_clock_class
) {
254 cs
= bt_message_event_borrow_default_clock_snapshot_const(msg
);
258 * If this event's stream does not support packets, then we
259 * lazily create artificial packets.
261 * The size of an artificial packet is arbitrarily at least
262 * 4 MiB (it usually is greater because we close it when
263 * comes the time to write a new event and the packet's content
264 * size is >= 4 MiB), except the last one which can be smaller.
266 if (G_UNLIKELY(!stream
->sc
->has_packets
)) {
267 if (stream
->packet_state
.is_open
&&
268 bt_ctfser_get_offset_in_current_packet_bits(&stream
->ctfser
) / 8 >= 4 * 1024 * 1024) {
270 * Stream's current packet is larger than 4 MiB:
271 * close it. A new packet will be opened just
274 ret
= fs_sink_stream_close_packet(stream
, NULL
);
276 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to close packet.");
277 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
282 if (!stream
->packet_state
.is_open
) {
283 /* Stream's packet is not currently opened: open it */
284 ret
= fs_sink_stream_open_packet(stream
, NULL
, NULL
);
286 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to open packet.");
287 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
293 BT_ASSERT_DBG(stream
->packet_state
.is_open
);
294 ret
= fs_sink_stream_write_event(stream
, cs
, ir_event
, ec
);
295 if (G_UNLIKELY(ret
)) {
296 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to write event.");
297 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
305 static inline bt_component_class_sink_consume_method_status
306 handle_packet_beginning_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
309 bt_component_class_sink_consume_method_status status
=
310 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
311 const bt_packet
*ir_packet
= bt_message_packet_beginning_borrow_packet_const(msg
);
312 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
313 struct fs_sink_stream
*stream
;
314 const bt_clock_snapshot
*cs
= NULL
;
316 stream
= borrow_stream(fs_sink
, ir_stream
);
317 if (G_UNLIKELY(!stream
)) {
318 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to borrow stream.");
319 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
323 if (stream
->sc
->packets_have_ts_begin
) {
324 cs
= bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg
);
329 * If we previously received a discarded events message with
330 * a time range, make sure that its beginning time matches what's
331 * expected for CTF 1.8, that is:
333 * * Its beginning time is the previous packet's end
334 * time (or the current packet's beginning time if
335 * this is the first packet).
337 * We check this here instead of in handle_packet_end_msg()
338 * because we want to catch any incompatible message as early as
339 * possible to report the error.
341 * Validation of the discarded events message's end time is
342 * performed in handle_packet_end_msg().
344 if (stream
->discarded_events_state
.in_range
) {
345 uint64_t expected_cs
;
348 * `stream->discarded_events_state.in_range` is only set
349 * when the stream class's discarded events have a time
352 * It is required that the packet beginning and end
353 * messages for this stream class have times when
354 * discarded events have a time range.
356 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
357 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
358 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
360 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
361 /* We're opening the first packet */
362 expected_cs
= bt_clock_snapshot_get_value(cs
);
364 expected_cs
= stream
->prev_packet_state
.end_cs
;
367 if (stream
->discarded_events_state
.beginning_cs
!= expected_cs
) {
368 BT_CPPLOGE_APPEND_CAUSE_SPEC(
370 "Incompatible discarded events message: "
371 "unexpected beginning time: "
372 "beginning-cs-val={}, "
373 "expected-beginning-cs-val={}, "
374 "stream-id={}, stream-name=\"{}\", "
375 "trace-name=\"{}\", path=\"{}/{}\"",
376 stream
->discarded_events_state
.beginning_cs
, expected_cs
,
377 bt_stream_get_id(ir_stream
), bt2c::maybeNull(bt_stream_get_name(ir_stream
)),
378 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
))),
379 stream
->trace
->path
->str
, stream
->file_name
->str
);
380 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
386 * If we previously received a discarded packets message with a
387 * time range, make sure that its beginning and end times match
388 * what's expected for CTF 1.8, that is:
390 * * Its beginning time is the previous packet's end time.
392 * * Its end time is the current packet's beginning time.
394 if (stream
->discarded_packets_state
.in_range
) {
395 uint64_t expected_end_cs
;
398 * `stream->discarded_packets_state.in_range` is only
399 * set when the stream class's discarded packets have a
402 * It is required that the packet beginning and end
403 * messages for this stream class have times when
404 * discarded packets have a time range.
406 BT_ASSERT(stream
->sc
->discarded_packets_has_ts
);
407 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
408 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
411 * It is not supported to have a discarded packets
412 * message _before_ the first packet: we cannot validate
413 * that its beginning time is compatible with CTF 1.8 in
416 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
417 BT_CPPLOGE_APPEND_CAUSE_SPEC(
419 "Incompatible discarded packets message "
420 "occurring before the stream's first packet: "
421 "stream-id={}, stream-name=\"{}\", "
422 "trace-name=\"{}\", path=\"{}/{}\"",
423 bt_stream_get_id(ir_stream
), bt2c::maybeNull(bt_stream_get_name(ir_stream
)),
424 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
))),
425 stream
->trace
->path
->str
, stream
->file_name
->str
);
426 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
430 if (stream
->discarded_packets_state
.beginning_cs
!= stream
->prev_packet_state
.end_cs
) {
431 BT_CPPLOGE_APPEND_CAUSE_SPEC(
433 "Incompatible discarded packets message: "
434 "unexpected beginning time: "
435 "beginning-cs-val={}, "
436 "expected-beginning-cs-val={}, "
437 "stream-id={}, stream-name=\"{}\", "
438 "trace-name=\"{}\", path=\"{}/{}\"",
439 stream
->discarded_packets_state
.beginning_cs
, stream
->prev_packet_state
.end_cs
,
440 bt_stream_get_id(ir_stream
), bt2c::maybeNull(bt_stream_get_name(ir_stream
)),
441 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
))),
442 stream
->trace
->path
->str
, stream
->file_name
->str
);
443 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
447 expected_end_cs
= bt_clock_snapshot_get_value(cs
);
449 if (stream
->discarded_packets_state
.end_cs
!= expected_end_cs
) {
450 BT_CPPLOGE_APPEND_CAUSE_SPEC(
452 "Incompatible discarded packets message: "
453 "unexpected end time: "
455 "expected-end-cs-val={}, "
456 "stream-id={}, stream-name=\"{}\", "
457 "trace-name=\"{}\", path=\"{}/{}\"",
458 stream
->discarded_packets_state
.end_cs
, expected_end_cs
,
459 bt_stream_get_id(ir_stream
), bt2c::maybeNull(bt_stream_get_name(ir_stream
)),
460 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
))),
461 stream
->trace
->path
->str
, stream
->file_name
->str
);
462 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
468 * We're not in a discarded packets time range anymore since we
469 * require that the discarded packets time ranges go from one
470 * packet's end time to the next packet's beginning time, and
471 * we're handling a packet beginning message here.
473 stream
->discarded_packets_state
.in_range
= false;
475 ret
= fs_sink_stream_open_packet(stream
, cs
, ir_packet
);
477 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to open packet.");
478 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
486 static inline bt_component_class_sink_consume_method_status
487 handle_packet_end_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
490 bt_component_class_sink_consume_method_status status
=
491 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
492 const bt_packet
*ir_packet
= bt_message_packet_end_borrow_packet_const(msg
);
493 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
494 struct fs_sink_stream
*stream
;
495 const bt_clock_snapshot
*cs
= NULL
;
497 stream
= borrow_stream(fs_sink
, ir_stream
);
498 if (G_UNLIKELY(!stream
)) {
499 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to borrow stream.");
500 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
504 if (stream
->sc
->packets_have_ts_end
) {
505 cs
= bt_message_packet_end_borrow_default_clock_snapshot_const(msg
);
510 * If we previously received a discarded events message with
511 * a time range, make sure that its end time matches what's
512 * expected for CTF 1.8, that is:
514 * * Its end time is the current packet's end time.
516 * Validation of the discarded events message's beginning time
517 * is performed in handle_packet_beginning_msg().
519 if (stream
->discarded_events_state
.in_range
) {
520 uint64_t expected_cs
;
523 * `stream->discarded_events_state.in_range` is only set
524 * when the stream class's discarded events have a time
527 * It is required that the packet beginning and end
528 * messages for this stream class have times when
529 * discarded events have a time range.
531 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
532 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
533 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
535 expected_cs
= bt_clock_snapshot_get_value(cs
);
537 if (stream
->discarded_events_state
.end_cs
!= expected_cs
) {
538 BT_CPPLOGE_APPEND_CAUSE_SPEC(
540 "Incompatible discarded events message: "
541 "unexpected end time: "
543 "expected-end-cs-val={}, "
544 "stream-id={}, stream-name=\"{}\", "
545 "trace-name=\"{}\", path=\"{}/{}\"",
546 stream
->discarded_events_state
.end_cs
, expected_cs
, bt_stream_get_id(ir_stream
),
547 bt2c::maybeNull(bt_stream_get_name(ir_stream
)),
548 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
))),
549 stream
->trace
->path
->str
, stream
->file_name
->str
);
550 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
555 ret
= fs_sink_stream_close_packet(stream
, cs
);
557 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to close packet.");
558 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
563 * We're not in a discarded events time range anymore since we
564 * require that the discarded events time ranges go from one
565 * packet's end time to the next packet's end time, and we're
566 * handling a packet end message here.
568 stream
->discarded_events_state
.in_range
= false;
574 static inline bt_component_class_sink_consume_method_status
575 handle_stream_beginning_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
577 bt_component_class_sink_consume_method_status status
=
578 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
579 const bt_stream
*ir_stream
= bt_message_stream_beginning_borrow_stream_const(msg
);
580 const bt_stream_class
*ir_sc
= bt_stream_borrow_class_const(ir_stream
);
581 struct fs_sink_stream
*stream
;
582 bool packets_have_beginning_end_cs
=
583 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc
) &&
584 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc
);
587 * Not supported: discarded events or discarded packets support
588 * without packets support. Packets are the way to know where
589 * discarded events/packets occurred in CTF 1.8.
591 if (!bt_stream_class_supports_packets(ir_sc
)) {
592 BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc
));
594 if (!fs_sink
->ignore_discarded_events
&& bt_stream_class_supports_discarded_events(ir_sc
)) {
595 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
,
596 "Unsupported stream: "
597 "stream does not support packets, "
598 "but supports discarded events: "
601 "stream-name=\"{}\"",
602 fmt::ptr(ir_stream
), bt_stream_get_id(ir_stream
),
603 bt2c::maybeNull(bt_stream_get_name(ir_stream
)));
604 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
610 * Not supported: discarded events with default clock snapshots,
611 * but packet beginning/end without default clock snapshot.
613 if (!fs_sink
->ignore_discarded_events
&&
614 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc
) &&
615 !packets_have_beginning_end_cs
) {
616 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
,
617 "Unsupported stream: discarded events have "
618 "default clock snapshots, but packets have no "
619 "beginning and/or end default clock snapshots: "
622 "stream-name=\"{}\"",
623 fmt::ptr(ir_stream
), bt_stream_get_id(ir_stream
),
624 bt2c::maybeNull(bt_stream_get_name(ir_stream
)));
625 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
630 * Not supported: discarded packets with default clock
631 * snapshots, but packet beginning/end without default clock
634 if (!fs_sink
->ignore_discarded_packets
&&
635 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc
) &&
636 !packets_have_beginning_end_cs
) {
637 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
,
638 "Unsupported stream: discarded packets have "
639 "default clock snapshots, but packets have no "
640 "beginning and/or end default clock snapshots: "
643 "stream-name=\"{}\"",
644 fmt::ptr(ir_stream
), bt_stream_get_id(ir_stream
),
645 bt2c::maybeNull(bt_stream_get_name(ir_stream
)));
646 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
650 stream
= borrow_stream(fs_sink
, ir_stream
);
652 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to borrow stream.");
653 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
657 BT_CPPLOGI_SPEC(fs_sink
->logger
,
658 "Created new, empty stream file: "
659 "stream-id={}, stream-name=\"{}\", "
660 "trace-name=\"{}\", path=\"{}/{}\"",
661 bt_stream_get_id(ir_stream
), bt2c::maybeNull(bt_stream_get_name(ir_stream
)),
662 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
))),
663 stream
->trace
->path
->str
, stream
->file_name
->str
);
669 static inline bt_component_class_sink_consume_method_status
670 handle_stream_end_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
672 bt_component_class_sink_consume_method_status status
=
673 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
674 const bt_stream
*ir_stream
= bt_message_stream_end_borrow_stream_const(msg
);
675 struct fs_sink_stream
*stream
;
677 stream
= borrow_stream(fs_sink
, ir_stream
);
679 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to borrow stream.");
680 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
684 if (G_UNLIKELY(!stream
->sc
->has_packets
&& stream
->packet_state
.is_open
)) {
685 /* Close stream's current artificial packet */
686 int ret
= fs_sink_stream_close_packet(stream
, NULL
);
689 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to close packet.");
690 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
695 BT_CPPLOGI_SPEC(fs_sink
->logger
,
696 "Closing stream file: "
697 "stream-id={}, stream-name=\"{}\", "
698 "trace-name=\"{}\", path=\"{}/{}\"",
699 bt_stream_get_id(ir_stream
), bt2c::maybeNull(bt_stream_get_name(ir_stream
)),
700 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
))),
701 stream
->trace
->path
->str
, stream
->file_name
->str
);
704 * This destroys the stream object and frees all its resources,
705 * closing the stream file.
707 g_hash_table_remove(stream
->trace
->streams
, ir_stream
);
713 static inline bt_component_class_sink_consume_method_status
714 handle_discarded_events_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
716 bt_component_class_sink_consume_method_status status
=
717 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
718 const bt_stream
*ir_stream
= bt_message_discarded_events_borrow_stream_const(msg
);
719 struct fs_sink_stream
*stream
;
720 const bt_clock_snapshot
*cs
= NULL
;
721 bt_property_availability avail
;
724 stream
= borrow_stream(fs_sink
, ir_stream
);
726 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to borrow stream.");
727 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
731 if (fs_sink
->ignore_discarded_events
) {
732 BT_CPPLOGI_SPEC(fs_sink
->logger
,
733 "Ignoring discarded events message: "
734 "stream-id={}, stream-name=\"{}\", "
735 "trace-name=\"{}\", path=\"{}/{}\"",
736 bt_stream_get_id(ir_stream
), bt2c::maybeNull(bt_stream_get_name(ir_stream
)),
737 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
))),
738 stream
->trace
->path
->str
, stream
->file_name
->str
);
742 if (stream
->discarded_events_state
.in_range
) {
743 BT_CPPLOGE_APPEND_CAUSE_SPEC(
745 "Unsupported contiguous discarded events message: "
746 "stream-id={}, stream-name=\"{}\", "
747 "trace-name=\"{}\", path=\"{}/{}\"",
748 bt_stream_get_id(ir_stream
), bt2c::maybeNull(bt_stream_get_name(ir_stream
)),
749 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
))),
750 stream
->trace
->path
->str
, stream
->file_name
->str
);
751 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
756 * If we're currently in an opened packet (got a packet
757 * beginning message, but no packet end message yet), we do not
758 * support having a discarded events message with a time range
759 * because we require that the discarded events message's time
760 * range go from a packet's end time to the next packet's end
763 if (stream
->packet_state
.is_open
&& stream
->sc
->discarded_events_has_ts
) {
764 BT_CPPLOGE_APPEND_CAUSE_SPEC(
766 "Unsupported discarded events message with "
767 "default clock snapshots occurring within a packet: "
768 "stream-id={}, stream-name=\"{}\", "
769 "trace-name=\"{}\", path=\"{}/{}\"",
770 bt_stream_get_id(ir_stream
), bt2c::maybeNull(bt_stream_get_name(ir_stream
)),
771 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
))),
772 stream
->trace
->path
->str
, stream
->file_name
->str
);
773 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
777 if (stream
->sc
->discarded_events_has_ts
) {
779 * Make the stream's state be in the time range of a
780 * discarded events message since we have the message's
781 * time range (`stream->sc->discarded_events_has_ts`).
783 stream
->discarded_events_state
.in_range
= true;
786 * The clock snapshot values will be validated when
787 * handling the next packet beginning and end messages
788 * (next calls to handle_packet_beginning_msg() and
789 * handle_packet_end_msg()).
791 cs
= bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg
);
793 stream
->discarded_events_state
.beginning_cs
= bt_clock_snapshot_get_value(cs
);
794 cs
= bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg
);
796 stream
->discarded_events_state
.end_cs
= bt_clock_snapshot_get_value(cs
);
799 avail
= bt_message_discarded_events_get_count(msg
, &count
);
800 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
802 * There's no specific count of discarded events: set it
803 * to 1 so that we know that we at least discarded
809 stream
->packet_state
.discarded_events_counter
+= count
;
815 static inline bt_component_class_sink_consume_method_status
816 handle_discarded_packets_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
818 bt_component_class_sink_consume_method_status status
=
819 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
820 const bt_stream
*ir_stream
= bt_message_discarded_packets_borrow_stream_const(msg
);
821 struct fs_sink_stream
*stream
;
822 const bt_clock_snapshot
*cs
= NULL
;
823 bt_property_availability avail
;
826 stream
= borrow_stream(fs_sink
, ir_stream
);
828 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to borrow stream.");
829 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
833 if (fs_sink
->ignore_discarded_packets
) {
834 BT_CPPLOGI_SPEC(fs_sink
->logger
,
835 "Ignoring discarded packets message: "
836 "stream-id={}, stream-name=\"{}\", "
837 "trace-name=\"{}\", path=\"{}/{}\"",
838 bt_stream_get_id(ir_stream
), bt2c::maybeNull(bt_stream_get_name(ir_stream
)),
839 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
))),
840 stream
->trace
->path
->str
, stream
->file_name
->str
);
844 if (stream
->discarded_packets_state
.in_range
) {
845 BT_CPPLOGE_APPEND_CAUSE_SPEC(
847 "Unsupported contiguous discarded packets message: "
848 "stream-id={}, stream-name=\"{}\", "
849 "trace-name=\"{}\", path=\"{}/{}\"",
850 bt_stream_get_id(ir_stream
), bt2c::maybeNull(bt_stream_get_name(ir_stream
)),
851 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
))),
852 stream
->trace
->path
->str
, stream
->file_name
->str
);
853 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
858 * Discarded packets messages are guaranteed to occur between
861 BT_ASSERT(!stream
->packet_state
.is_open
);
863 if (stream
->sc
->discarded_packets_has_ts
) {
865 * Make the stream's state be in the time range of a
866 * discarded packets message since we have the message's
867 * time range (`stream->sc->discarded_packets_has_ts`).
869 stream
->discarded_packets_state
.in_range
= true;
872 * The clock snapshot values will be validated when
873 * handling the next packet beginning message (next call
874 * to handle_packet_beginning_msg()).
876 cs
= bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg
);
878 stream
->discarded_packets_state
.beginning_cs
= bt_clock_snapshot_get_value(cs
);
879 cs
= bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg
);
881 stream
->discarded_packets_state
.end_cs
= bt_clock_snapshot_get_value(cs
);
884 avail
= bt_message_discarded_packets_get_count(msg
, &count
);
885 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
887 * There's no specific count of discarded packets: set
888 * it to 1 so that we know that we at least discarded
894 stream
->packet_state
.seq_num
+= count
;
900 static inline void put_messages(bt_message_array_const msgs
, uint64_t count
)
904 for (i
= 0; i
< count
; i
++) {
905 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
909 bt_component_class_sink_consume_method_status
ctf_fs_sink_consume(bt_self_component_sink
*self_comp
)
911 bt_component_class_sink_consume_method_status status
=
912 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
913 struct fs_sink_comp
*fs_sink
;
914 bt_message_iterator_next_status next_status
;
915 uint64_t msg_count
= 0;
916 bt_message_array_const msgs
;
918 fs_sink
= (fs_sink_comp
*) bt_self_component_get_data(
919 bt_self_component_sink_as_self_component(self_comp
));
920 BT_ASSERT_DBG(fs_sink
);
921 BT_ASSERT_DBG(fs_sink
->upstream_iter
);
923 /* Consume messages */
924 next_status
= bt_message_iterator_next(fs_sink
->upstream_iter
, &msgs
, &msg_count
);
925 if (next_status
< 0) {
926 status
= (bt_component_class_sink_consume_method_status
) next_status
;
927 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
,
928 "Failed to get next message from upstream iterator.");
932 switch (next_status
) {
933 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK
:
937 for (i
= 0; i
< msg_count
; i
++) {
938 const bt_message
*msg
= msgs
[i
];
942 switch (bt_message_get_type(msg
)) {
943 case BT_MESSAGE_TYPE_EVENT
:
944 status
= handle_event_msg(fs_sink
, msg
);
946 case BT_MESSAGE_TYPE_PACKET_BEGINNING
:
947 status
= handle_packet_beginning_msg(fs_sink
, msg
);
949 case BT_MESSAGE_TYPE_PACKET_END
:
950 status
= handle_packet_end_msg(fs_sink
, msg
);
952 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY
:
954 BT_CPPLOGD_STR_SPEC(fs_sink
->logger
,
955 "Ignoring message iterator inactivity message.");
957 case BT_MESSAGE_TYPE_STREAM_BEGINNING
:
958 status
= handle_stream_beginning_msg(fs_sink
, msg
);
960 case BT_MESSAGE_TYPE_STREAM_END
:
961 status
= handle_stream_end_msg(fs_sink
, msg
);
963 case BT_MESSAGE_TYPE_DISCARDED_EVENTS
:
964 status
= handle_discarded_events_msg(fs_sink
, msg
);
966 case BT_MESSAGE_TYPE_DISCARDED_PACKETS
:
967 status
= handle_discarded_packets_msg(fs_sink
, msg
);
973 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
975 if (status
!= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
) {
976 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
,
977 "Failed to handle message: "
978 "generated CTF traces could be incomplete: "
979 "output-dir-path=\"{}\"",
980 fs_sink
->output_dir_path
->str
);
987 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN
:
988 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN
;
990 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END
:
991 /* TODO: Finalize all traces (should already be done?) */
992 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END
;
1001 BT_ASSERT(status
!= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
);
1002 put_messages(msgs
, msg_count
);
1008 bt_component_class_sink_graph_is_configured_method_status
1009 ctf_fs_sink_graph_is_configured(bt_self_component_sink
*self_comp
)
1011 bt_component_class_sink_graph_is_configured_method_status status
;
1012 bt_message_iterator_create_from_sink_component_status msg_iter_status
;
1013 fs_sink_comp
*fs_sink
= (fs_sink_comp
*) bt_self_component_get_data(
1014 bt_self_component_sink_as_self_component(self_comp
));
1016 msg_iter_status
= bt_message_iterator_create_from_sink_component(
1017 self_comp
, bt_self_component_sink_borrow_input_port_by_name(self_comp
, in_port_name
),
1018 &fs_sink
->upstream_iter
);
1019 if (msg_iter_status
!= BT_MESSAGE_ITERATOR_CREATE_FROM_SINK_COMPONENT_STATUS_OK
) {
1020 status
= (bt_component_class_sink_graph_is_configured_method_status
) msg_iter_status
;
1021 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to create upstream iterator.");
1025 status
= BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK
;
1030 void ctf_fs_sink_finalize(bt_self_component_sink
*self_comp
)
1032 fs_sink_comp
*fs_sink
= (fs_sink_comp
*) bt_self_component_get_data(
1033 bt_self_component_sink_as_self_component(self_comp
));
1035 destroy_fs_sink_comp(fs_sink
);