2 * SPDX-License-Identifier: MIT
4 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
10 #include <babeltrace2/babeltrace.h>
12 #include "common/assert.h"
13 #include "cpp-common/vendor/fmt/format.h"
14 #include "ctfser/ctfser.h"
16 #include "plugins/common/param-validation/param-validation.h"
18 #include "fs-sink-ctf-meta.hpp"
19 #include "fs-sink-stream.hpp"
20 #include "fs-sink-trace.hpp"
21 #include "fs-sink.hpp"
22 #include "translate-trace-ir-to-ctf-ir.hpp"
24 static const char * const in_port_name
= "in";
26 static bt_component_class_initialize_method_status
27 ensure_output_dir_exists(struct fs_sink_comp
*fs_sink
)
29 bt_component_class_initialize_method_status status
=
30 BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
;
33 ret
= g_mkdir_with_parents(fs_sink
->output_dir_path
->str
, 0755);
35 BT_CPPLOGE_ERRNO_APPEND_CAUSE_SPEC(
36 fs_sink
->logger
, "Cannot create directories for output directory",
37 ": output-dir-path=\"{}\"", fs_sink
->output_dir_path
->str
);
38 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
46 static bt_param_validation_map_value_entry_descr fs_sink_params_descr
[] = {
47 {"path", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY
,
48 bt_param_validation_value_descr::makeString()},
49 {"assume-single-trace", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
50 bt_param_validation_value_descr::makeBool()},
51 {"ignore-discarded-events", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
52 bt_param_validation_value_descr::makeBool()},
53 {"ignore-discarded-packets", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
54 bt_param_validation_value_descr::makeBool()},
55 {"quiet", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
56 bt_param_validation_value_descr::makeBool()},
57 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END
};
59 static bt_component_class_initialize_method_status
configure_component(struct fs_sink_comp
*fs_sink
,
60 const bt_value
*params
)
62 bt_component_class_initialize_method_status status
;
63 const bt_value
*value
;
64 enum bt_param_validation_status validation_status
;
65 gchar
*validation_error
;
68 bt_param_validation_validate(params
, fs_sink_params_descr
, &validation_error
);
69 if (validation_status
== BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR
) {
70 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
71 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "{}", validation_error
);
73 } else if (validation_status
== BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR
) {
74 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
78 value
= bt_value_map_borrow_entry_value_const(params
, "path");
79 g_string_assign(fs_sink
->output_dir_path
, bt_value_string_get(value
));
81 value
= bt_value_map_borrow_entry_value_const(params
, "assume-single-trace");
83 fs_sink
->assume_single_trace
= (bool) bt_value_bool_get(value
);
86 value
= bt_value_map_borrow_entry_value_const(params
, "ignore-discarded-events");
88 fs_sink
->ignore_discarded_events
= (bool) bt_value_bool_get(value
);
91 value
= bt_value_map_borrow_entry_value_const(params
, "ignore-discarded-packets");
93 fs_sink
->ignore_discarded_packets
= (bool) bt_value_bool_get(value
);
96 value
= bt_value_map_borrow_entry_value_const(params
, "quiet");
98 fs_sink
->quiet
= (bool) bt_value_bool_get(value
);
101 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
;
104 g_free(validation_error
);
108 static void destroy_fs_sink_comp(struct fs_sink_comp
*fs_sink
)
114 if (fs_sink
->output_dir_path
) {
115 g_string_free(fs_sink
->output_dir_path
, TRUE
);
116 fs_sink
->output_dir_path
= NULL
;
119 if (fs_sink
->traces
) {
120 g_hash_table_destroy(fs_sink
->traces
);
121 fs_sink
->traces
= NULL
;
124 BT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(fs_sink
->upstream_iter
);
131 bt_component_class_initialize_method_status
ctf_fs_sink_init(bt_self_component_sink
*self_comp_sink
,
132 bt_self_component_sink_configuration
*,
133 const bt_value
*params
, void *)
136 bt_component_class_initialize_method_status status
;
137 bt_self_component_add_port_status add_port_status
;
138 struct fs_sink_comp
*fs_sink
= NULL
;
139 bt_self_component
*self_comp
= bt_self_component_sink_as_self_component(self_comp_sink
);
141 fs_sink
= new fs_sink_comp
{bt2::SelfSinkComponent
{self_comp_sink
}};
142 fs_sink
->output_dir_path
= g_string_new(NULL
);
143 status
= configure_component(fs_sink
, params
);
144 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
145 /* configure_component() logs errors */
149 if (fs_sink
->assume_single_trace
&&
150 g_file_test(fs_sink
->output_dir_path
->str
, G_FILE_TEST_EXISTS
)) {
151 BT_CPPLOGE_APPEND_CAUSE_SPEC(
152 fs_sink
->logger
, "Single trace mode, but output path exists: output-path=\"{}\"",
153 fs_sink
->output_dir_path
->str
);
154 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
158 status
= ensure_output_dir_exists(fs_sink
);
159 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
160 /* ensure_output_dir_exists() logs errors */
164 fs_sink
->traces
= g_hash_table_new_full(g_direct_hash
, g_direct_equal
, NULL
,
165 (GDestroyNotify
) fs_sink_trace_destroy
);
166 if (!fs_sink
->traces
) {
167 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to allocate one GHashTable.");
168 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
173 bt_self_component_sink_add_input_port(self_comp_sink
, in_port_name
, NULL
, NULL
);
174 if (add_port_status
!= BT_SELF_COMPONENT_ADD_PORT_STATUS_OK
) {
175 status
= (bt_component_class_initialize_method_status
) add_port_status
;
176 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to add input port.");
180 bt_self_component_set_data(self_comp
, fs_sink
);
183 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
184 destroy_fs_sink_comp(fs_sink
);
188 } catch (const std::bad_alloc
&) {
189 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
190 } catch (const bt2::Error
&) {
191 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
195 static inline struct fs_sink_stream
*borrow_stream(struct fs_sink_comp
*fs_sink
,
196 const bt_stream
*ir_stream
)
198 const bt_trace
*ir_trace
= bt_stream_borrow_trace_const(ir_stream
);
199 struct fs_sink_trace
*trace
;
200 struct fs_sink_stream
*stream
= NULL
;
202 trace
= (fs_sink_trace
*) g_hash_table_lookup(fs_sink
->traces
, ir_trace
);
203 if (G_UNLIKELY(!trace
)) {
204 if (fs_sink
->assume_single_trace
&& g_hash_table_size(fs_sink
->traces
) > 0) {
205 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
,
206 "Single trace mode, but getting more than one trace: "
207 "stream-name=\"{}\"",
208 bt2c::maybeNull(bt_stream_get_name(ir_stream
)));
212 trace
= fs_sink_trace_create(fs_sink
, ir_trace
);
218 stream
= (fs_sink_stream
*) g_hash_table_lookup(trace
->streams
, ir_stream
);
219 if (G_UNLIKELY(!stream
)) {
220 stream
= fs_sink_stream_create(trace
, ir_stream
);
230 static inline bt_component_class_sink_consume_method_status
231 handle_event_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
235 bt_component_class_sink_consume_method_status status
=
236 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
237 const bt_event
*ir_event
= bt_message_event_borrow_event_const(msg
);
238 const bt_stream
*ir_stream
= bt_event_borrow_stream_const(ir_event
);
239 struct fs_sink_stream
*stream
;
240 struct fs_sink_ctf_event_class
*ec
= NULL
;
241 const bt_clock_snapshot
*cs
= NULL
;
243 stream
= borrow_stream(fs_sink
, ir_stream
);
244 if (G_UNLIKELY(!stream
)) {
245 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to borrow stream.");
246 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
250 ret
= try_translate_event_class_trace_ir_to_ctf_ir(
251 fs_sink
, stream
->sc
, bt_event_borrow_class_const(ir_event
), &ec
);
253 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
,
254 "Failed to translate event class to CTF IR.");
255 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
261 if (stream
->sc
->default_clock_class
) {
262 cs
= bt_message_event_borrow_default_clock_snapshot_const(msg
);
266 * If this event's stream does not support packets, then we
267 * lazily create artificial packets.
269 * The size of an artificial packet is arbitrarily at least
270 * 4 MiB (it usually is greater because we close it when
271 * comes the time to write a new event and the packet's content
272 * size is >= 4 MiB), except the last one which can be smaller.
274 if (G_UNLIKELY(!stream
->sc
->has_packets
)) {
275 if (stream
->packet_state
.is_open
&&
276 bt_ctfser_get_offset_in_current_packet_bits(&stream
->ctfser
) / 8 >=
279 * Stream's current packet is larger than 4 MiB:
280 * close it. A new packet will be opened just
283 ret
= fs_sink_stream_close_packet(stream
, NULL
);
285 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to close packet.");
286 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
291 if (!stream
->packet_state
.is_open
) {
292 /* Stream's packet is not currently opened: open it */
293 ret
= fs_sink_stream_open_packet(stream
, NULL
, NULL
);
295 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to open packet.");
296 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
302 BT_ASSERT_DBG(stream
->packet_state
.is_open
);
303 ret
= fs_sink_stream_write_event(stream
, cs
, ir_event
, ec
);
304 if (G_UNLIKELY(ret
)) {
305 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to write event.");
306 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
312 } catch (const std::bad_alloc
&) {
313 return BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_MEMORY_ERROR
;
314 } catch (const bt2::Error
&) {
315 return BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
319 static inline bt_component_class_sink_consume_method_status
320 handle_packet_beginning_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
323 bt_component_class_sink_consume_method_status status
=
324 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
325 const bt_packet
*ir_packet
= bt_message_packet_beginning_borrow_packet_const(msg
);
326 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
327 struct fs_sink_stream
*stream
;
328 const bt_clock_snapshot
*cs
= NULL
;
330 stream
= borrow_stream(fs_sink
, ir_stream
);
331 if (G_UNLIKELY(!stream
)) {
332 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to borrow stream.");
333 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
337 if (stream
->sc
->packets_have_ts_begin
) {
338 cs
= bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg
);
343 * If we previously received a discarded events message with
344 * a time range, make sure that its beginning time matches what's
345 * expected for CTF 1.8, that is:
347 * * Its beginning time is the previous packet's end
348 * time (or the current packet's beginning time if
349 * this is the first packet).
351 * We check this here instead of in handle_packet_end_msg()
352 * because we want to catch any incompatible message as early as
353 * possible to report the error.
355 * Validation of the discarded events message's end time is
356 * performed in handle_packet_end_msg().
358 if (stream
->discarded_events_state
.in_range
) {
359 uint64_t expected_cs
;
362 * `stream->discarded_events_state.in_range` is only set
363 * when the stream class's discarded events have a time
366 * It is required that the packet beginning and end
367 * messages for this stream class have times when
368 * discarded events have a time range.
370 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
371 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
372 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
374 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
375 /* We're opening the first packet */
376 expected_cs
= bt_clock_snapshot_get_value(cs
);
378 expected_cs
= stream
->prev_packet_state
.end_cs
;
381 if (stream
->discarded_events_state
.beginning_cs
!= expected_cs
) {
382 BT_CPPLOGE_APPEND_CAUSE_SPEC(
384 "Incompatible discarded events message: "
385 "unexpected beginning time: "
386 "beginning-cs-val={}, "
387 "expected-beginning-cs-val={}, "
388 "stream-id={}, stream-name=\"{}\", "
389 "trace-name=\"{}\", path=\"{}/{}\"",
390 stream
->discarded_events_state
.beginning_cs
, expected_cs
,
391 bt_stream_get_id(ir_stream
), bt2c::maybeNull(bt_stream_get_name(ir_stream
)),
392 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
))),
393 stream
->trace
->path
->str
, stream
->file_name
->str
);
394 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
400 * If we previously received a discarded packets message with a
401 * time range, make sure that its beginning and end times match
402 * what's expected for CTF 1.8, that is:
404 * * Its beginning time is the previous packet's end time.
406 * * Its end time is the current packet's beginning time.
408 if (stream
->discarded_packets_state
.in_range
) {
409 uint64_t expected_end_cs
;
412 * `stream->discarded_packets_state.in_range` is only
413 * set when the stream class's discarded packets have a
416 * It is required that the packet beginning and end
417 * messages for this stream class have times when
418 * discarded packets have a time range.
420 BT_ASSERT(stream
->sc
->discarded_packets_has_ts
);
421 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
422 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
425 * It is not supported to have a discarded packets
426 * message _before_ the first packet: we cannot validate
427 * that its beginning time is compatible with CTF 1.8 in
430 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
431 BT_CPPLOGE_APPEND_CAUSE_SPEC(
433 "Incompatible discarded packets message "
434 "occurring before the stream's first packet: "
435 "stream-id={}, stream-name=\"{}\", "
436 "trace-name=\"{}\", path=\"{}/{}\"",
437 bt_stream_get_id(ir_stream
), bt2c::maybeNull(bt_stream_get_name(ir_stream
)),
438 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
))),
439 stream
->trace
->path
->str
, stream
->file_name
->str
);
440 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
444 if (stream
->discarded_packets_state
.beginning_cs
!= stream
->prev_packet_state
.end_cs
) {
445 BT_CPPLOGE_APPEND_CAUSE_SPEC(
447 "Incompatible discarded packets message: "
448 "unexpected beginning time: "
449 "beginning-cs-val={}, "
450 "expected-beginning-cs-val={}, "
451 "stream-id={}, stream-name=\"{}\", "
452 "trace-name=\"{}\", path=\"{}/{}\"",
453 stream
->discarded_packets_state
.beginning_cs
, stream
->prev_packet_state
.end_cs
,
454 bt_stream_get_id(ir_stream
), bt2c::maybeNull(bt_stream_get_name(ir_stream
)),
455 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
))),
456 stream
->trace
->path
->str
, stream
->file_name
->str
);
457 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
461 expected_end_cs
= bt_clock_snapshot_get_value(cs
);
463 if (stream
->discarded_packets_state
.end_cs
!= expected_end_cs
) {
464 BT_CPPLOGE_APPEND_CAUSE_SPEC(
466 "Incompatible discarded packets message: "
467 "unexpected end time: "
469 "expected-end-cs-val={}, "
470 "stream-id={}, stream-name=\"{}\", "
471 "trace-name=\"{}\", path=\"{}/{}\"",
472 stream
->discarded_packets_state
.end_cs
, expected_end_cs
,
473 bt_stream_get_id(ir_stream
), bt2c::maybeNull(bt_stream_get_name(ir_stream
)),
474 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
))),
475 stream
->trace
->path
->str
, stream
->file_name
->str
);
476 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
482 * We're not in a discarded packets time range anymore since we
483 * require that the discarded packets time ranges go from one
484 * packet's end time to the next packet's beginning time, and
485 * we're handling a packet beginning message here.
487 stream
->discarded_packets_state
.in_range
= false;
489 ret
= fs_sink_stream_open_packet(stream
, cs
, ir_packet
);
491 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to open packet.");
492 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
500 static inline bt_component_class_sink_consume_method_status
501 handle_packet_end_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
504 bt_component_class_sink_consume_method_status status
=
505 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
506 const bt_packet
*ir_packet
= bt_message_packet_end_borrow_packet_const(msg
);
507 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
508 struct fs_sink_stream
*stream
;
509 const bt_clock_snapshot
*cs
= NULL
;
511 stream
= borrow_stream(fs_sink
, ir_stream
);
512 if (G_UNLIKELY(!stream
)) {
513 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to borrow stream.");
514 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
518 if (stream
->sc
->packets_have_ts_end
) {
519 cs
= bt_message_packet_end_borrow_default_clock_snapshot_const(msg
);
524 * If we previously received a discarded events message with
525 * a time range, make sure that its end time matches what's
526 * expected for CTF 1.8, that is:
528 * * Its end time is the current packet's end time.
530 * Validation of the discarded events message's beginning time
531 * is performed in handle_packet_beginning_msg().
533 if (stream
->discarded_events_state
.in_range
) {
534 uint64_t expected_cs
;
537 * `stream->discarded_events_state.in_range` is only set
538 * when the stream class's discarded events have a time
541 * It is required that the packet beginning and end
542 * messages for this stream class have times when
543 * discarded events have a time range.
545 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
546 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
547 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
549 expected_cs
= bt_clock_snapshot_get_value(cs
);
551 if (stream
->discarded_events_state
.end_cs
!= expected_cs
) {
552 BT_CPPLOGE_APPEND_CAUSE_SPEC(
554 "Incompatible discarded events message: "
555 "unexpected end time: "
557 "expected-end-cs-val={}, "
558 "stream-id={}, stream-name=\"{}\", "
559 "trace-name=\"{}\", path=\"{}/{}\"",
560 stream
->discarded_events_state
.end_cs
, expected_cs
, bt_stream_get_id(ir_stream
),
561 bt2c::maybeNull(bt_stream_get_name(ir_stream
)),
562 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
))),
563 stream
->trace
->path
->str
, stream
->file_name
->str
);
564 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
569 ret
= fs_sink_stream_close_packet(stream
, cs
);
571 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to close packet.");
572 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
577 * We're not in a discarded events time range anymore since we
578 * require that the discarded events time ranges go from one
579 * packet's end time to the next packet's end time, and we're
580 * handling a packet end message here.
582 stream
->discarded_events_state
.in_range
= false;
588 static inline bt_component_class_sink_consume_method_status
589 handle_stream_beginning_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
591 bt_component_class_sink_consume_method_status status
=
592 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
593 const bt_stream
*ir_stream
= bt_message_stream_beginning_borrow_stream_const(msg
);
594 const bt_stream_class
*ir_sc
= bt_stream_borrow_class_const(ir_stream
);
595 struct fs_sink_stream
*stream
;
596 bool packets_have_beginning_end_cs
=
597 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc
) &&
598 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc
);
601 * Not supported: discarded events or discarded packets support
602 * without packets support. Packets are the way to know where
603 * discarded events/packets occurred in CTF 1.8.
605 if (!bt_stream_class_supports_packets(ir_sc
)) {
606 BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc
));
608 if (!fs_sink
->ignore_discarded_events
&& bt_stream_class_supports_discarded_events(ir_sc
)) {
609 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
,
610 "Unsupported stream: "
611 "stream does not support packets, "
612 "but supports discarded events: "
615 "stream-name=\"{}\"",
616 fmt::ptr(ir_stream
), bt_stream_get_id(ir_stream
),
617 bt2c::maybeNull(bt_stream_get_name(ir_stream
)));
618 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
624 * Not supported: discarded events with default clock snapshots,
625 * but packet beginning/end without default clock snapshot.
627 if (!fs_sink
->ignore_discarded_events
&&
628 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc
) &&
629 !packets_have_beginning_end_cs
) {
630 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
,
631 "Unsupported stream: discarded events have "
632 "default clock snapshots, but packets have no "
633 "beginning and/or end default clock snapshots: "
636 "stream-name=\"{}\"",
637 fmt::ptr(ir_stream
), bt_stream_get_id(ir_stream
),
638 bt2c::maybeNull(bt_stream_get_name(ir_stream
)));
639 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
644 * Not supported: discarded packets with default clock
645 * snapshots, but packet beginning/end without default clock
648 if (!fs_sink
->ignore_discarded_packets
&&
649 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc
) &&
650 !packets_have_beginning_end_cs
) {
651 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
,
652 "Unsupported stream: discarded packets have "
653 "default clock snapshots, but packets have no "
654 "beginning and/or end default clock snapshots: "
657 "stream-name=\"{}\"",
658 fmt::ptr(ir_stream
), bt_stream_get_id(ir_stream
),
659 bt2c::maybeNull(bt_stream_get_name(ir_stream
)));
660 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
664 stream
= borrow_stream(fs_sink
, ir_stream
);
666 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to borrow stream.");
667 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
671 BT_CPPLOGI_SPEC(fs_sink
->logger
,
672 "Created new, empty stream file: "
673 "stream-id={}, stream-name=\"{}\", "
674 "trace-name=\"{}\", path=\"{}/{}\"",
675 bt_stream_get_id(ir_stream
), bt2c::maybeNull(bt_stream_get_name(ir_stream
)),
676 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
))),
677 stream
->trace
->path
->str
, stream
->file_name
->str
);
683 static inline bt_component_class_sink_consume_method_status
684 handle_stream_end_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
686 bt_component_class_sink_consume_method_status status
=
687 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
688 const bt_stream
*ir_stream
= bt_message_stream_end_borrow_stream_const(msg
);
689 struct fs_sink_stream
*stream
;
691 stream
= borrow_stream(fs_sink
, ir_stream
);
693 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to borrow stream.");
694 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
698 if (G_UNLIKELY(!stream
->sc
->has_packets
&& stream
->packet_state
.is_open
)) {
699 /* Close stream's current artificial packet */
700 int ret
= fs_sink_stream_close_packet(stream
, NULL
);
703 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to close packet.");
704 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
709 BT_CPPLOGI_SPEC(fs_sink
->logger
,
710 "Closing stream file: "
711 "stream-id={}, stream-name=\"{}\", "
712 "trace-name=\"{}\", path=\"{}/{}\"",
713 bt_stream_get_id(ir_stream
), bt2c::maybeNull(bt_stream_get_name(ir_stream
)),
714 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
))),
715 stream
->trace
->path
->str
, stream
->file_name
->str
);
718 * This destroys the stream object and frees all its resources,
719 * closing the stream file.
721 g_hash_table_remove(stream
->trace
->streams
, ir_stream
);
727 static inline bt_component_class_sink_consume_method_status
728 handle_discarded_events_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
730 bt_component_class_sink_consume_method_status status
=
731 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
732 const bt_stream
*ir_stream
= bt_message_discarded_events_borrow_stream_const(msg
);
733 struct fs_sink_stream
*stream
;
734 const bt_clock_snapshot
*cs
= NULL
;
735 bt_property_availability avail
;
738 stream
= borrow_stream(fs_sink
, ir_stream
);
740 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to borrow stream.");
741 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
745 if (fs_sink
->ignore_discarded_events
) {
746 BT_CPPLOGI_SPEC(fs_sink
->logger
,
747 "Ignoring discarded events message: "
748 "stream-id={}, stream-name=\"{}\", "
749 "trace-name=\"{}\", path=\"{}/{}\"",
750 bt_stream_get_id(ir_stream
), bt2c::maybeNull(bt_stream_get_name(ir_stream
)),
751 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
))),
752 stream
->trace
->path
->str
, stream
->file_name
->str
);
756 if (stream
->discarded_events_state
.in_range
) {
757 BT_CPPLOGE_APPEND_CAUSE_SPEC(
759 "Unsupported contiguous discarded events message: "
760 "stream-id={}, stream-name=\"{}\", "
761 "trace-name=\"{}\", path=\"{}/{}\"",
762 bt_stream_get_id(ir_stream
), bt2c::maybeNull(bt_stream_get_name(ir_stream
)),
763 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
))),
764 stream
->trace
->path
->str
, stream
->file_name
->str
);
765 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
770 * If we're currently in an opened packet (got a packet
771 * beginning message, but no packet end message yet), we do not
772 * support having a discarded events message with a time range
773 * because we require that the discarded events message's time
774 * range go from a packet's end time to the next packet's end
777 if (stream
->packet_state
.is_open
&& stream
->sc
->discarded_events_has_ts
) {
778 BT_CPPLOGE_APPEND_CAUSE_SPEC(
780 "Unsupported discarded events message with "
781 "default clock snapshots occurring within a packet: "
782 "stream-id={}, stream-name=\"{}\", "
783 "trace-name=\"{}\", path=\"{}/{}\"",
784 bt_stream_get_id(ir_stream
), bt2c::maybeNull(bt_stream_get_name(ir_stream
)),
785 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
))),
786 stream
->trace
->path
->str
, stream
->file_name
->str
);
787 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
791 if (stream
->sc
->discarded_events_has_ts
) {
793 * Make the stream's state be in the time range of a
794 * discarded events message since we have the message's
795 * time range (`stream->sc->discarded_events_has_ts`).
797 stream
->discarded_events_state
.in_range
= true;
800 * The clock snapshot values will be validated when
801 * handling the next packet beginning and end messages
802 * (next calls to handle_packet_beginning_msg() and
803 * handle_packet_end_msg()).
805 cs
= bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg
);
807 stream
->discarded_events_state
.beginning_cs
= bt_clock_snapshot_get_value(cs
);
808 cs
= bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg
);
810 stream
->discarded_events_state
.end_cs
= bt_clock_snapshot_get_value(cs
);
813 avail
= bt_message_discarded_events_get_count(msg
, &count
);
814 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
816 * There's no specific count of discarded events: set it
817 * to 1 so that we know that we at least discarded
823 stream
->packet_state
.discarded_events_counter
+= count
;
829 static inline bt_component_class_sink_consume_method_status
830 handle_discarded_packets_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
832 bt_component_class_sink_consume_method_status status
=
833 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
834 const bt_stream
*ir_stream
= bt_message_discarded_packets_borrow_stream_const(msg
);
835 struct fs_sink_stream
*stream
;
836 const bt_clock_snapshot
*cs
= NULL
;
837 bt_property_availability avail
;
840 stream
= borrow_stream(fs_sink
, ir_stream
);
842 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to borrow stream.");
843 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
847 if (fs_sink
->ignore_discarded_packets
) {
848 BT_CPPLOGI_SPEC(fs_sink
->logger
,
849 "Ignoring discarded packets message: "
850 "stream-id={}, stream-name=\"{}\", "
851 "trace-name=\"{}\", path=\"{}/{}\"",
852 bt_stream_get_id(ir_stream
), bt2c::maybeNull(bt_stream_get_name(ir_stream
)),
853 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
))),
854 stream
->trace
->path
->str
, stream
->file_name
->str
);
858 if (stream
->discarded_packets_state
.in_range
) {
859 BT_CPPLOGE_APPEND_CAUSE_SPEC(
861 "Unsupported contiguous discarded packets message: "
862 "stream-id={}, stream-name=\"{}\", "
863 "trace-name=\"{}\", path=\"{}/{}\"",
864 bt_stream_get_id(ir_stream
), bt2c::maybeNull(bt_stream_get_name(ir_stream
)),
865 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
))),
866 stream
->trace
->path
->str
, stream
->file_name
->str
);
867 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
872 * Discarded packets messages are guaranteed to occur between
875 BT_ASSERT(!stream
->packet_state
.is_open
);
877 if (stream
->sc
->discarded_packets_has_ts
) {
879 * Make the stream's state be in the time range of a
880 * discarded packets message since we have the message's
881 * time range (`stream->sc->discarded_packets_has_ts`).
883 stream
->discarded_packets_state
.in_range
= true;
886 * The clock snapshot values will be validated when
887 * handling the next packet beginning message (next call
888 * to handle_packet_beginning_msg()).
890 cs
= bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg
);
892 stream
->discarded_packets_state
.beginning_cs
= bt_clock_snapshot_get_value(cs
);
893 cs
= bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg
);
895 stream
->discarded_packets_state
.end_cs
= bt_clock_snapshot_get_value(cs
);
898 avail
= bt_message_discarded_packets_get_count(msg
, &count
);
899 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
901 * There's no specific count of discarded packets: set
902 * it to 1 so that we know that we at least discarded
908 stream
->packet_state
.seq_num
+= count
;
914 static inline void put_messages(bt_message_array_const msgs
, uint64_t count
)
918 for (i
= 0; i
< count
; i
++) {
919 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
923 bt_component_class_sink_consume_method_status
ctf_fs_sink_consume(bt_self_component_sink
*self_comp
)
925 bt_component_class_sink_consume_method_status status
=
926 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
927 struct fs_sink_comp
*fs_sink
;
928 bt_message_iterator_next_status next_status
;
929 uint64_t msg_count
= 0;
930 bt_message_array_const msgs
;
932 fs_sink
= (fs_sink_comp
*) bt_self_component_get_data(
933 bt_self_component_sink_as_self_component(self_comp
));
934 BT_ASSERT_DBG(fs_sink
);
935 BT_ASSERT_DBG(fs_sink
->upstream_iter
);
937 /* Consume messages */
938 next_status
= bt_message_iterator_next(fs_sink
->upstream_iter
, &msgs
, &msg_count
);
939 if (next_status
< 0) {
940 status
= (bt_component_class_sink_consume_method_status
) next_status
;
941 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
,
942 "Failed to get next message from upstream iterator.");
946 switch (next_status
) {
947 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK
:
951 for (i
= 0; i
< msg_count
; i
++) {
952 const bt_message
*msg
= msgs
[i
];
956 switch (bt_message_get_type(msg
)) {
957 case BT_MESSAGE_TYPE_EVENT
:
958 status
= handle_event_msg(fs_sink
, msg
);
960 case BT_MESSAGE_TYPE_PACKET_BEGINNING
:
961 status
= handle_packet_beginning_msg(fs_sink
, msg
);
963 case BT_MESSAGE_TYPE_PACKET_END
:
964 status
= handle_packet_end_msg(fs_sink
, msg
);
966 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY
:
968 BT_CPPLOGD_STR_SPEC(fs_sink
->logger
,
969 "Ignoring message iterator inactivity message.");
971 case BT_MESSAGE_TYPE_STREAM_BEGINNING
:
972 status
= handle_stream_beginning_msg(fs_sink
, msg
);
974 case BT_MESSAGE_TYPE_STREAM_END
:
975 status
= handle_stream_end_msg(fs_sink
, msg
);
977 case BT_MESSAGE_TYPE_DISCARDED_EVENTS
:
978 status
= handle_discarded_events_msg(fs_sink
, msg
);
980 case BT_MESSAGE_TYPE_DISCARDED_PACKETS
:
981 status
= handle_discarded_packets_msg(fs_sink
, msg
);
987 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
989 if (status
!= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
) {
990 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
,
991 "Failed to handle message: "
992 "generated CTF traces could be incomplete: "
993 "output-dir-path=\"{}\"",
994 fs_sink
->output_dir_path
->str
);
1001 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN
:
1002 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN
;
1004 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END
:
1005 /* TODO: Finalize all traces (should already be done?) */
1006 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END
;
1015 BT_ASSERT(status
!= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
);
1016 put_messages(msgs
, msg_count
);
1022 bt_component_class_sink_graph_is_configured_method_status
1023 ctf_fs_sink_graph_is_configured(bt_self_component_sink
*self_comp
)
1026 bt_component_class_sink_graph_is_configured_method_status status
;
1027 bt_message_iterator_create_from_sink_component_status msg_iter_status
;
1028 fs_sink_comp
*fs_sink
= (fs_sink_comp
*) bt_self_component_get_data(
1029 bt_self_component_sink_as_self_component(self_comp
));
1031 msg_iter_status
= bt_message_iterator_create_from_sink_component(
1032 self_comp
, bt_self_component_sink_borrow_input_port_by_name(self_comp
, in_port_name
),
1033 &fs_sink
->upstream_iter
);
1034 if (msg_iter_status
!= BT_MESSAGE_ITERATOR_CREATE_FROM_SINK_COMPONENT_STATUS_OK
) {
1035 status
= (bt_component_class_sink_graph_is_configured_method_status
) msg_iter_status
;
1036 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink
->logger
, "Failed to create upstream iterator.");
1040 status
= BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK
;
1043 } catch (const std::bad_alloc
&) {
1044 return BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_MEMORY_ERROR
;
1045 } catch (const bt2c::Error
&) {
1046 return BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_ERROR
;
1050 void ctf_fs_sink_finalize(bt_self_component_sink
*self_comp
)
1052 fs_sink_comp
*fs_sink
= (fs_sink_comp
*) bt_self_component_get_data(
1053 bt_self_component_sink_as_self_component(self_comp
));
1055 destroy_fs_sink_comp(fs_sink
);