2 * SPDX-License-Identifier: MIT
4 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
7 #define BT_COMP_LOG_SELF_COMP (fs_sink->self_comp)
8 #define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level)
9 #define BT_LOG_TAG "PLUGIN/SINK.CTF.FS"
10 #include "logging/comp-logging.h"
12 #include <babeltrace2/babeltrace.h>
16 #include "common/assert.h"
17 #include "ctfser/ctfser.h"
18 #include "plugins/common/param-validation/param-validation.h"
21 #include "fs-sink-trace.h"
22 #include "fs-sink-stream.h"
23 #include "fs-sink-ctf-meta.h"
24 #include "translate-trace-ir-to-ctf-ir.h"
25 #include "translate-ctf-ir-to-tsdl.h"
28 const char * const in_port_name
= "in";
31 bt_component_class_initialize_method_status
ensure_output_dir_exists(
32 struct fs_sink_comp
*fs_sink
)
34 bt_component_class_initialize_method_status status
=
35 BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
;
38 ret
= g_mkdir_with_parents(fs_sink
->output_dir_path
->str
, 0755);
41 "Cannot create directories for output directory",
42 ": output-dir-path=\"%s\"",
43 fs_sink
->output_dir_path
->str
);
44 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
52 static struct bt_param_validation_map_value_entry_descr fs_sink_params_descr
[] = {
53 { "path", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY
, { .type
= BT_VALUE_TYPE_STRING
} },
54 { "assume-single-trace", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
, { .type
= BT_VALUE_TYPE_BOOL
} },
55 { "ignore-discarded-events", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
, { .type
= BT_VALUE_TYPE_BOOL
} },
56 { "ignore-discarded-packets", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
, { .type
= BT_VALUE_TYPE_BOOL
} },
57 { "quiet", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
, { .type
= BT_VALUE_TYPE_BOOL
} },
58 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END
62 bt_component_class_initialize_method_status
63 configure_component(struct fs_sink_comp
*fs_sink
, const bt_value
*params
)
65 bt_component_class_initialize_method_status status
;
66 const bt_value
*value
;
67 enum bt_param_validation_status validation_status
;
68 gchar
*validation_error
;
70 validation_status
= bt_param_validation_validate(params
,
71 fs_sink_params_descr
, &validation_error
);
72 if (validation_status
== BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR
) {
73 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
75 } else if (validation_status
== BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR
) {
76 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
80 value
= bt_value_map_borrow_entry_value_const(params
, "path");
81 g_string_assign(fs_sink
->output_dir_path
,
82 bt_value_string_get(value
));
84 value
= bt_value_map_borrow_entry_value_const(params
,
85 "assume-single-trace");
87 fs_sink
->assume_single_trace
= (bool) bt_value_bool_get(value
);
90 value
= bt_value_map_borrow_entry_value_const(params
,
91 "ignore-discarded-events");
93 fs_sink
->ignore_discarded_events
=
94 (bool) bt_value_bool_get(value
);
97 value
= bt_value_map_borrow_entry_value_const(params
,
98 "ignore-discarded-packets");
100 fs_sink
->ignore_discarded_packets
=
101 (bool) bt_value_bool_get(value
);
104 value
= bt_value_map_borrow_entry_value_const(params
,
107 fs_sink
->quiet
= (bool) bt_value_bool_get(value
);
110 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
;
113 g_free(validation_error
);
118 void destroy_fs_sink_comp(struct fs_sink_comp
*fs_sink
)
124 if (fs_sink
->output_dir_path
) {
125 g_string_free(fs_sink
->output_dir_path
, TRUE
);
126 fs_sink
->output_dir_path
= NULL
;
129 if (fs_sink
->traces
) {
130 g_hash_table_destroy(fs_sink
->traces
);
131 fs_sink
->traces
= NULL
;
134 BT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(
135 fs_sink
->upstream_iter
);
143 bt_component_class_initialize_method_status
ctf_fs_sink_init(
144 bt_self_component_sink
*self_comp_sink
,
145 bt_self_component_sink_configuration
*config
,
146 const bt_value
*params
,
147 void *init_method_data
)
149 bt_component_class_initialize_method_status status
=
150 BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
;
151 bt_self_component_add_port_status add_port_status
;
152 struct fs_sink_comp
*fs_sink
= NULL
;
153 bt_self_component
*self_comp
=
154 bt_self_component_sink_as_self_component(self_comp_sink
);
155 bt_logging_level log_level
= bt_component_get_logging_level(
156 bt_self_component_as_component(self_comp
));
158 fs_sink
= g_new0(struct fs_sink_comp
, 1);
160 BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR
, log_level
, self_comp
,
161 "Failed to allocate one CTF FS sink structure.");
162 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
166 fs_sink
->log_level
= log_level
;
167 fs_sink
->self_comp
= self_comp
;
168 fs_sink
->output_dir_path
= g_string_new(NULL
);
169 status
= configure_component(fs_sink
, params
);
170 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
171 /* configure_component() logs errors */
175 if (fs_sink
->assume_single_trace
&&
176 g_file_test(fs_sink
->output_dir_path
->str
,
177 G_FILE_TEST_EXISTS
)) {
178 BT_COMP_LOGE("Single trace mode, but output path exists: "
179 "output-path=\"%s\"", fs_sink
->output_dir_path
->str
);
180 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
184 status
= ensure_output_dir_exists(fs_sink
);
185 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
186 /* ensure_output_dir_exists() logs errors */
190 fs_sink
->traces
= g_hash_table_new_full(g_direct_hash
, g_direct_equal
,
191 NULL
, (GDestroyNotify
) fs_sink_trace_destroy
);
192 if (!fs_sink
->traces
) {
193 BT_COMP_LOGE_STR("Failed to allocate one GHashTable.");
194 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
198 add_port_status
= bt_self_component_sink_add_input_port(
199 self_comp_sink
, in_port_name
, NULL
, NULL
);
200 switch (add_port_status
) {
201 case BT_SELF_COMPONENT_ADD_PORT_STATUS_ERROR
:
202 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
204 case BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR
:
205 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
211 bt_self_component_set_data(self_comp
, fs_sink
);
214 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
215 destroy_fs_sink_comp(fs_sink
);
222 struct fs_sink_stream
*borrow_stream(struct fs_sink_comp
*fs_sink
,
223 const bt_stream
*ir_stream
)
225 const bt_trace
*ir_trace
= bt_stream_borrow_trace_const(ir_stream
);
226 struct fs_sink_trace
*trace
;
227 struct fs_sink_stream
*stream
= NULL
;
229 trace
= g_hash_table_lookup(fs_sink
->traces
, ir_trace
);
230 if (G_UNLIKELY(!trace
)) {
231 if (fs_sink
->assume_single_trace
&&
232 g_hash_table_size(fs_sink
->traces
) > 0) {
233 BT_COMP_LOGE("Single trace mode, but getting more than one trace: "
234 "stream-name=\"%s\"",
235 bt_stream_get_name(ir_stream
));
239 trace
= fs_sink_trace_create(fs_sink
, ir_trace
);
245 stream
= g_hash_table_lookup(trace
->streams
, ir_stream
);
246 if (G_UNLIKELY(!stream
)) {
247 stream
= fs_sink_stream_create(trace
, ir_stream
);
258 bt_component_class_sink_consume_method_status
handle_event_msg(
259 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
262 bt_component_class_sink_consume_method_status status
=
263 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
264 const bt_event
*ir_event
= bt_message_event_borrow_event_const(msg
);
265 const bt_stream
*ir_stream
= bt_event_borrow_stream_const(ir_event
);
266 struct fs_sink_stream
*stream
;
267 struct fs_sink_ctf_event_class
*ec
= NULL
;
268 const bt_clock_snapshot
*cs
= NULL
;
270 stream
= borrow_stream(fs_sink
, ir_stream
);
271 if (G_UNLIKELY(!stream
)) {
272 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
276 ret
= try_translate_event_class_trace_ir_to_ctf_ir(fs_sink
,
277 stream
->sc
, bt_event_borrow_class_const(ir_event
), &ec
);
279 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
285 if (stream
->sc
->default_clock_class
) {
286 cs
= bt_message_event_borrow_default_clock_snapshot_const(
291 * If this event's stream does not support packets, then we
292 * lazily create artificial packets.
294 * The size of an artificial packet is arbitrarily at least
295 * 4 MiB (it usually is greater because we close it when
296 * comes the time to write a new event and the packet's content
297 * size is >= 4 MiB), except the last one which can be smaller.
299 if (G_UNLIKELY(!stream
->sc
->has_packets
)) {
300 if (stream
->packet_state
.is_open
&&
301 bt_ctfser_get_offset_in_current_packet_bits(&stream
->ctfser
) / 8 >=
304 * Stream's current packet is larger than 4 MiB:
305 * close it. A new packet will be opened just
308 ret
= fs_sink_stream_close_packet(stream
, NULL
);
310 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
315 if (!stream
->packet_state
.is_open
) {
316 /* Stream's packet is not currently opened: open it */
317 ret
= fs_sink_stream_open_packet(stream
, NULL
, NULL
);
319 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
325 BT_ASSERT_DBG(stream
->packet_state
.is_open
);
326 ret
= fs_sink_stream_write_event(stream
, cs
, ir_event
, ec
);
327 if (G_UNLIKELY(ret
)) {
328 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
337 bt_component_class_sink_consume_method_status
handle_packet_beginning_msg(
338 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
341 bt_component_class_sink_consume_method_status status
=
342 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
343 const bt_packet
*ir_packet
=
344 bt_message_packet_beginning_borrow_packet_const(msg
);
345 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
346 struct fs_sink_stream
*stream
;
347 const bt_clock_snapshot
*cs
= NULL
;
349 stream
= borrow_stream(fs_sink
, ir_stream
);
350 if (G_UNLIKELY(!stream
)) {
351 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
355 if (stream
->sc
->packets_have_ts_begin
) {
356 cs
= bt_message_packet_beginning_borrow_default_clock_snapshot_const(
362 * If we previously received a discarded events message with
363 * a time range, make sure that its beginning time matches what's
364 * expected for CTF 1.8, that is:
366 * * Its beginning time is the previous packet's end
367 * time (or the current packet's beginning time if
368 * this is the first packet).
370 * We check this here instead of in handle_packet_end_msg()
371 * because we want to catch any incompatible message as early as
372 * possible to report the error.
374 * Validation of the discarded events message's end time is
375 * performed in handle_packet_end_msg().
377 if (stream
->discarded_events_state
.in_range
) {
378 uint64_t expected_cs
;
381 * `stream->discarded_events_state.in_range` is only set
382 * when the stream class's discarded events have a time
385 * It is required that the packet beginning and end
386 * messages for this stream class have times when
387 * discarded events have a time range.
389 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
390 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
391 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
393 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
394 /* We're opening the first packet */
395 expected_cs
= bt_clock_snapshot_get_value(cs
);
397 expected_cs
= stream
->prev_packet_state
.end_cs
;
400 if (stream
->discarded_events_state
.beginning_cs
!=
402 BT_COMP_LOGE("Incompatible discarded events message: "
403 "unexpected beginning time: "
404 "beginning-cs-val=%" PRIu64
", "
405 "expected-beginning-cs-val=%" PRIu64
", "
406 "stream-id=%" PRIu64
", stream-name=\"%s\", "
407 "trace-name=\"%s\", path=\"%s/%s\"",
408 stream
->discarded_events_state
.beginning_cs
,
410 bt_stream_get_id(ir_stream
),
411 bt_stream_get_name(ir_stream
),
413 bt_stream_borrow_trace_const(ir_stream
)),
414 stream
->trace
->path
->str
, stream
->file_name
->str
);
415 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
421 * If we previously received a discarded packets message with a
422 * time range, make sure that its beginning and end times match
423 * what's expected for CTF 1.8, that is:
425 * * Its beginning time is the previous packet's end time.
427 * * Its end time is the current packet's beginning time.
429 if (stream
->discarded_packets_state
.in_range
) {
430 uint64_t expected_end_cs
;
433 * `stream->discarded_packets_state.in_range` is only
434 * set when the stream class's discarded packets have a
437 * It is required that the packet beginning and end
438 * messages for this stream class have times when
439 * discarded packets have a time range.
441 BT_ASSERT(stream
->sc
->discarded_packets_has_ts
);
442 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
443 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
446 * It is not supported to have a discarded packets
447 * message _before_ the first packet: we cannot validate
448 * that its beginning time is compatible with CTF 1.8 in
451 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
452 BT_COMP_LOGE("Incompatible discarded packets message "
453 "occurring before the stream's first packet: "
454 "stream-id=%" PRIu64
", stream-name=\"%s\", "
455 "trace-name=\"%s\", path=\"%s/%s\"",
456 bt_stream_get_id(ir_stream
),
457 bt_stream_get_name(ir_stream
),
459 bt_stream_borrow_trace_const(ir_stream
)),
460 stream
->trace
->path
->str
, stream
->file_name
->str
);
461 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
465 if (stream
->discarded_packets_state
.beginning_cs
!=
466 stream
->prev_packet_state
.end_cs
) {
467 BT_COMP_LOGE("Incompatible discarded packets message: "
468 "unexpected beginning time: "
469 "beginning-cs-val=%" PRIu64
", "
470 "expected-beginning-cs-val=%" PRIu64
", "
471 "stream-id=%" PRIu64
", stream-name=\"%s\", "
472 "trace-name=\"%s\", path=\"%s/%s\"",
473 stream
->discarded_packets_state
.beginning_cs
,
474 stream
->prev_packet_state
.end_cs
,
475 bt_stream_get_id(ir_stream
),
476 bt_stream_get_name(ir_stream
),
478 bt_stream_borrow_trace_const(ir_stream
)),
479 stream
->trace
->path
->str
, stream
->file_name
->str
);
480 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
484 expected_end_cs
= bt_clock_snapshot_get_value(cs
);
486 if (stream
->discarded_packets_state
.end_cs
!=
488 BT_COMP_LOGE("Incompatible discarded packets message: "
489 "unexpected end time: "
490 "end-cs-val=%" PRIu64
", "
491 "expected-end-cs-val=%" PRIu64
", "
492 "stream-id=%" PRIu64
", stream-name=\"%s\", "
493 "trace-name=\"%s\", path=\"%s/%s\"",
494 stream
->discarded_packets_state
.end_cs
,
496 bt_stream_get_id(ir_stream
),
497 bt_stream_get_name(ir_stream
),
499 bt_stream_borrow_trace_const(ir_stream
)),
500 stream
->trace
->path
->str
, stream
->file_name
->str
);
501 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
507 * We're not in a discarded packets time range anymore since we
508 * require that the discarded packets time ranges go from one
509 * packet's end time to the next packet's beginning time, and
510 * we're handling a packet beginning message here.
512 stream
->discarded_packets_state
.in_range
= false;
514 ret
= fs_sink_stream_open_packet(stream
, cs
, ir_packet
);
516 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
525 bt_component_class_sink_consume_method_status
handle_packet_end_msg(
526 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
529 bt_component_class_sink_consume_method_status status
=
530 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
531 const bt_packet
*ir_packet
=
532 bt_message_packet_end_borrow_packet_const(msg
);
533 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
534 struct fs_sink_stream
*stream
;
535 const bt_clock_snapshot
*cs
= NULL
;
537 stream
= borrow_stream(fs_sink
, ir_stream
);
538 if (G_UNLIKELY(!stream
)) {
539 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
543 if (stream
->sc
->packets_have_ts_end
) {
544 cs
= bt_message_packet_end_borrow_default_clock_snapshot_const(
550 * If we previously received a discarded events message with
551 * a time range, make sure that its end time matches what's
552 * expected for CTF 1.8, that is:
554 * * Its end time is the current packet's end time.
556 * Validation of the discarded events message's beginning time
557 * is performed in handle_packet_beginning_msg().
559 if (stream
->discarded_events_state
.in_range
) {
560 uint64_t expected_cs
;
563 * `stream->discarded_events_state.in_range` is only set
564 * when the stream class's discarded events have a time
567 * It is required that the packet beginning and end
568 * messages for this stream class have times when
569 * discarded events have a time range.
571 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
572 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
573 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
575 expected_cs
= bt_clock_snapshot_get_value(cs
);
577 if (stream
->discarded_events_state
.end_cs
!= expected_cs
) {
578 BT_COMP_LOGE("Incompatible discarded events message: "
579 "unexpected end time: "
580 "end-cs-val=%" PRIu64
", "
581 "expected-end-cs-val=%" PRIu64
", "
582 "stream-id=%" PRIu64
", stream-name=\"%s\", "
583 "trace-name=\"%s\", path=\"%s/%s\"",
584 stream
->discarded_events_state
.end_cs
,
586 bt_stream_get_id(ir_stream
),
587 bt_stream_get_name(ir_stream
),
589 bt_stream_borrow_trace_const(ir_stream
)),
590 stream
->trace
->path
->str
, stream
->file_name
->str
);
591 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
596 ret
= fs_sink_stream_close_packet(stream
, cs
);
598 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
603 * We're not in a discarded events time range anymore since we
604 * require that the discarded events time ranges go from one
605 * packet's end time to the next packet's end time, and we're
606 * handling a packet end message here.
608 stream
->discarded_events_state
.in_range
= false;
615 bt_component_class_sink_consume_method_status
handle_stream_beginning_msg(
616 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
618 bt_component_class_sink_consume_method_status status
=
619 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
620 const bt_stream
*ir_stream
=
621 bt_message_stream_beginning_borrow_stream_const(msg
);
622 const bt_stream_class
*ir_sc
=
623 bt_stream_borrow_class_const(ir_stream
);
624 struct fs_sink_stream
*stream
;
625 bool packets_have_beginning_end_cs
=
626 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc
) &&
627 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc
);
630 * Not supported: discarded events or discarded packets support
631 * without packets support. Packets are the way to know where
632 * discarded events/packets occurred in CTF 1.8.
634 if (!bt_stream_class_supports_packets(ir_sc
)) {
635 BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc
));
637 if (!fs_sink
->ignore_discarded_events
&&
638 bt_stream_class_supports_discarded_events(ir_sc
)) {
639 BT_COMP_LOGE("Unsupported stream: "
640 "stream does not support packets, "
641 "but supports discarded events: "
643 "stream-id=%" PRIu64
", "
644 "stream-name=\"%s\"",
645 ir_stream
, bt_stream_get_id(ir_stream
),
646 bt_stream_get_name(ir_stream
));
647 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
653 * Not supported: discarded events with default clock snapshots,
654 * but packet beginning/end without default clock snapshot.
656 if (!fs_sink
->ignore_discarded_events
&&
657 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc
) &&
658 !packets_have_beginning_end_cs
) {
659 BT_COMP_LOGE("Unsupported stream: discarded events have "
660 "default clock snapshots, but packets have no "
661 "beginning and/or end default clock snapshots: "
663 "stream-id=%" PRIu64
", "
664 "stream-name=\"%s\"",
665 ir_stream
, bt_stream_get_id(ir_stream
),
666 bt_stream_get_name(ir_stream
));
667 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
672 * Not supported: discarded packets with default clock
673 * snapshots, but packet beginning/end without default clock
676 if (!fs_sink
->ignore_discarded_packets
&&
677 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc
) &&
678 !packets_have_beginning_end_cs
) {
679 BT_COMP_LOGE("Unsupported stream: discarded packets have "
680 "default clock snapshots, but packets have no "
681 "beginning and/or end default clock snapshots: "
683 "stream-id=%" PRIu64
", "
684 "stream-name=\"%s\"",
685 ir_stream
, bt_stream_get_id(ir_stream
),
686 bt_stream_get_name(ir_stream
));
687 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
691 stream
= borrow_stream(fs_sink
, ir_stream
);
693 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
697 BT_COMP_LOGI("Created new, empty stream file: "
698 "stream-id=%" PRIu64
", stream-name=\"%s\", "
699 "trace-name=\"%s\", path=\"%s/%s\"",
700 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
701 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
702 stream
->trace
->path
->str
, stream
->file_name
->str
);
709 bt_component_class_sink_consume_method_status
handle_stream_end_msg(
710 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
712 bt_component_class_sink_consume_method_status status
=
713 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
714 const bt_stream
*ir_stream
=
715 bt_message_stream_end_borrow_stream_const(msg
);
716 struct fs_sink_stream
*stream
;
718 stream
= borrow_stream(fs_sink
, ir_stream
);
720 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
724 if (G_UNLIKELY(!stream
->sc
->has_packets
&&
725 stream
->packet_state
.is_open
)) {
726 /* Close stream's current artificial packet */
727 int ret
= fs_sink_stream_close_packet(stream
, NULL
);
730 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
735 BT_COMP_LOGI("Closing stream file: "
736 "stream-id=%" PRIu64
", stream-name=\"%s\", "
737 "trace-name=\"%s\", path=\"%s/%s\"",
738 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
739 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
740 stream
->trace
->path
->str
, stream
->file_name
->str
);
743 * This destroys the stream object and frees all its resources,
744 * closing the stream file.
746 g_hash_table_remove(stream
->trace
->streams
, ir_stream
);
753 bt_component_class_sink_consume_method_status
handle_discarded_events_msg(
754 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
756 bt_component_class_sink_consume_method_status status
=
757 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
758 const bt_stream
*ir_stream
=
759 bt_message_discarded_events_borrow_stream_const(msg
);
760 struct fs_sink_stream
*stream
;
761 const bt_clock_snapshot
*cs
= NULL
;
762 bt_property_availability avail
;
765 stream
= borrow_stream(fs_sink
, ir_stream
);
767 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
771 if (fs_sink
->ignore_discarded_events
) {
772 BT_COMP_LOGI("Ignoring discarded events message: "
773 "stream-id=%" PRIu64
", stream-name=\"%s\", "
774 "trace-name=\"%s\", path=\"%s/%s\"",
775 bt_stream_get_id(ir_stream
),
776 bt_stream_get_name(ir_stream
),
778 bt_stream_borrow_trace_const(ir_stream
)),
779 stream
->trace
->path
->str
, stream
->file_name
->str
);
783 if (stream
->discarded_events_state
.in_range
) {
784 BT_COMP_LOGE("Unsupported contiguous discarded events message: "
785 "stream-id=%" PRIu64
", stream-name=\"%s\", "
786 "trace-name=\"%s\", path=\"%s/%s\"",
787 bt_stream_get_id(ir_stream
),
788 bt_stream_get_name(ir_stream
),
790 bt_stream_borrow_trace_const(ir_stream
)),
791 stream
->trace
->path
->str
, stream
->file_name
->str
);
792 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
797 * If we're currently in an opened packet (got a packet
798 * beginning message, but no packet end message yet), we do not
799 * support having a discarded events message with a time range
800 * because we require that the discarded events message's time
801 * range go from a packet's end time to the next packet's end
804 if (stream
->packet_state
.is_open
&&
805 stream
->sc
->discarded_events_has_ts
) {
806 BT_COMP_LOGE("Unsupported discarded events message with "
807 "default clock snapshots occurring within a packet: "
808 "stream-id=%" PRIu64
", stream-name=\"%s\", "
809 "trace-name=\"%s\", path=\"%s/%s\"",
810 bt_stream_get_id(ir_stream
),
811 bt_stream_get_name(ir_stream
),
813 bt_stream_borrow_trace_const(ir_stream
)),
814 stream
->trace
->path
->str
, stream
->file_name
->str
);
815 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
819 if (stream
->sc
->discarded_events_has_ts
) {
821 * Make the stream's state be in the time range of a
822 * discarded events message since we have the message's
823 * time range (`stream->sc->discarded_events_has_ts`).
825 stream
->discarded_events_state
.in_range
= true;
828 * The clock snapshot values will be validated when
829 * handling the next packet beginning and end messages
830 * (next calls to handle_packet_beginning_msg() and
831 * handle_packet_end_msg()).
833 cs
= bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
836 stream
->discarded_events_state
.beginning_cs
=
837 bt_clock_snapshot_get_value(cs
);
838 cs
= bt_message_discarded_events_borrow_end_default_clock_snapshot_const(
841 stream
->discarded_events_state
.end_cs
= bt_clock_snapshot_get_value(cs
);
844 avail
= bt_message_discarded_events_get_count(msg
, &count
);
845 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
847 * There's no specific count of discarded events: set it
848 * to 1 so that we know that we at least discarded
854 stream
->packet_state
.discarded_events_counter
+= count
;
861 bt_component_class_sink_consume_method_status
handle_discarded_packets_msg(
862 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
864 bt_component_class_sink_consume_method_status status
=
865 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
866 const bt_stream
*ir_stream
=
867 bt_message_discarded_packets_borrow_stream_const(msg
);
868 struct fs_sink_stream
*stream
;
869 const bt_clock_snapshot
*cs
= NULL
;
870 bt_property_availability avail
;
873 stream
= borrow_stream(fs_sink
, ir_stream
);
875 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
879 if (fs_sink
->ignore_discarded_packets
) {
880 BT_COMP_LOGI("Ignoring discarded packets message: "
881 "stream-id=%" PRIu64
", stream-name=\"%s\", "
882 "trace-name=\"%s\", path=\"%s/%s\"",
883 bt_stream_get_id(ir_stream
),
884 bt_stream_get_name(ir_stream
),
886 bt_stream_borrow_trace_const(ir_stream
)),
887 stream
->trace
->path
->str
, stream
->file_name
->str
);
891 if (stream
->discarded_packets_state
.in_range
) {
892 BT_COMP_LOGE("Unsupported contiguous discarded packets message: "
893 "stream-id=%" PRIu64
", stream-name=\"%s\", "
894 "trace-name=\"%s\", path=\"%s/%s\"",
895 bt_stream_get_id(ir_stream
),
896 bt_stream_get_name(ir_stream
),
898 bt_stream_borrow_trace_const(ir_stream
)),
899 stream
->trace
->path
->str
, stream
->file_name
->str
);
900 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
905 * Discarded packets messages are guaranteed to occur between
908 BT_ASSERT(!stream
->packet_state
.is_open
);
910 if (stream
->sc
->discarded_packets_has_ts
) {
912 * Make the stream's state be in the time range of a
913 * discarded packets message since we have the message's
914 * time range (`stream->sc->discarded_packets_has_ts`).
916 stream
->discarded_packets_state
.in_range
= true;
919 * The clock snapshot values will be validated when
920 * handling the next packet beginning message (next call
921 * to handle_packet_beginning_msg()).
923 cs
= bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
926 stream
->discarded_packets_state
.beginning_cs
=
927 bt_clock_snapshot_get_value(cs
);
928 cs
= bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(
931 stream
->discarded_packets_state
.end_cs
=
932 bt_clock_snapshot_get_value(cs
);
935 avail
= bt_message_discarded_packets_get_count(msg
, &count
);
936 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
938 * There's no specific count of discarded packets: set
939 * it to 1 so that we know that we at least discarded
945 stream
->packet_state
.seq_num
+= count
;
952 void put_messages(bt_message_array_const msgs
, uint64_t count
)
956 for (i
= 0; i
< count
; i
++) {
957 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
962 bt_component_class_sink_consume_method_status
ctf_fs_sink_consume(
963 bt_self_component_sink
*self_comp
)
965 bt_component_class_sink_consume_method_status status
=
966 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
967 struct fs_sink_comp
*fs_sink
;
968 bt_message_iterator_next_status next_status
;
969 uint64_t msg_count
= 0;
970 bt_message_array_const msgs
;
972 fs_sink
= bt_self_component_get_data(
973 bt_self_component_sink_as_self_component(self_comp
));
974 BT_ASSERT_DBG(fs_sink
);
975 BT_ASSERT_DBG(fs_sink
->upstream_iter
);
977 /* Consume messages */
978 next_status
= bt_message_iterator_next(
979 fs_sink
->upstream_iter
, &msgs
, &msg_count
);
980 if (next_status
< 0) {
981 status
= (int) next_status
;
985 switch (next_status
) {
986 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK
:
990 for (i
= 0; i
< msg_count
; i
++) {
991 const bt_message
*msg
= msgs
[i
];
995 switch (bt_message_get_type(msg
)) {
996 case BT_MESSAGE_TYPE_EVENT
:
997 status
= handle_event_msg(fs_sink
, msg
);
999 case BT_MESSAGE_TYPE_PACKET_BEGINNING
:
1000 status
= handle_packet_beginning_msg(
1003 case BT_MESSAGE_TYPE_PACKET_END
:
1004 status
= handle_packet_end_msg(
1007 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY
:
1009 BT_COMP_LOGD_STR("Ignoring message iterator inactivity message.");
1011 case BT_MESSAGE_TYPE_STREAM_BEGINNING
:
1012 status
= handle_stream_beginning_msg(
1015 case BT_MESSAGE_TYPE_STREAM_END
:
1016 status
= handle_stream_end_msg(
1019 case BT_MESSAGE_TYPE_DISCARDED_EVENTS
:
1020 status
= handle_discarded_events_msg(
1023 case BT_MESSAGE_TYPE_DISCARDED_PACKETS
:
1024 status
= handle_discarded_packets_msg(
1031 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
1033 if (status
!= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
) {
1034 BT_COMP_LOGE("Failed to handle message: "
1035 "generated CTF traces could be incomplete: "
1036 "output-dir-path=\"%s\"",
1037 fs_sink
->output_dir_path
->str
);
1044 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN
:
1045 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN
;
1047 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END
:
1048 /* TODO: Finalize all traces (should already be done?) */
1049 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END
;
1058 BT_ASSERT(status
!= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
);
1059 put_messages(msgs
, msg_count
);
1066 bt_component_class_sink_graph_is_configured_method_status
1067 ctf_fs_sink_graph_is_configured(
1068 bt_self_component_sink
*self_comp
)
1070 bt_component_class_sink_graph_is_configured_method_status status
;
1071 bt_message_iterator_create_from_sink_component_status
1073 struct fs_sink_comp
*fs_sink
= bt_self_component_get_data(
1074 bt_self_component_sink_as_self_component(self_comp
));
1077 bt_message_iterator_create_from_sink_component(
1079 bt_self_component_sink_borrow_input_port_by_name(
1080 self_comp
, in_port_name
), &fs_sink
->upstream_iter
);
1081 if (msg_iter_status
!= BT_MESSAGE_ITERATOR_CREATE_FROM_SINK_COMPONENT_STATUS_OK
) {
1082 status
= (int) msg_iter_status
;
1086 status
= BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK
;
1092 void ctf_fs_sink_finalize(bt_self_component_sink
*self_comp
)
1094 struct fs_sink_comp
*fs_sink
= bt_self_component_get_data(
1095 bt_self_component_sink_as_self_component(self_comp
));
1097 destroy_fs_sink_comp(fs_sink
);