2 * SPDX-License-Identifier: MIT
4 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
7 #define BT_COMP_LOG_SELF_COMP (fs_sink->self_comp)
8 #define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level)
9 #define BT_LOG_TAG "PLUGIN/SINK.CTF.FS"
10 #include "logging/comp-logging.h"
12 #include <babeltrace2/babeltrace.h>
16 #include "common/assert.h"
17 #include "ctfser/ctfser.h"
18 #include "plugins/common/param-validation/param-validation.h"
21 #include "fs-sink-trace.h"
22 #include "fs-sink-stream.h"
23 #include "fs-sink-ctf-meta.h"
24 #include "translate-trace-ir-to-ctf-ir.h"
25 #include "translate-ctf-ir-to-tsdl.h"
28 const char * const in_port_name
= "in";
31 bt_component_class_initialize_method_status
ensure_output_dir_exists(
32 struct fs_sink_comp
*fs_sink
)
34 bt_component_class_initialize_method_status status
=
35 BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
;
38 ret
= g_mkdir_with_parents(fs_sink
->output_dir_path
->str
, 0755);
40 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(fs_sink
->self_comp
,
41 "Cannot create directories for output directory",
42 ": output-dir-path=\"%s\"",
43 fs_sink
->output_dir_path
->str
);
44 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
52 static struct bt_param_validation_map_value_entry_descr fs_sink_params_descr
[] = {
53 { "path", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY
, { .type
= BT_VALUE_TYPE_STRING
} },
54 { "assume-single-trace", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
, { .type
= BT_VALUE_TYPE_BOOL
} },
55 { "ignore-discarded-events", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
, { .type
= BT_VALUE_TYPE_BOOL
} },
56 { "ignore-discarded-packets", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
, { .type
= BT_VALUE_TYPE_BOOL
} },
57 { "quiet", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
, { .type
= BT_VALUE_TYPE_BOOL
} },
58 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END
62 bt_component_class_initialize_method_status
63 configure_component(struct fs_sink_comp
*fs_sink
, const bt_value
*params
)
65 bt_component_class_initialize_method_status status
;
66 const bt_value
*value
;
67 enum bt_param_validation_status validation_status
;
68 gchar
*validation_error
;
70 validation_status
= bt_param_validation_validate(params
,
71 fs_sink_params_descr
, &validation_error
);
72 if (validation_status
== BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR
) {
73 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
74 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
75 "%s", validation_error
);
77 } else if (validation_status
== BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR
) {
78 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
82 value
= bt_value_map_borrow_entry_value_const(params
, "path");
83 g_string_assign(fs_sink
->output_dir_path
,
84 bt_value_string_get(value
));
86 value
= bt_value_map_borrow_entry_value_const(params
,
87 "assume-single-trace");
89 fs_sink
->assume_single_trace
= (bool) bt_value_bool_get(value
);
92 value
= bt_value_map_borrow_entry_value_const(params
,
93 "ignore-discarded-events");
95 fs_sink
->ignore_discarded_events
=
96 (bool) bt_value_bool_get(value
);
99 value
= bt_value_map_borrow_entry_value_const(params
,
100 "ignore-discarded-packets");
102 fs_sink
->ignore_discarded_packets
=
103 (bool) bt_value_bool_get(value
);
106 value
= bt_value_map_borrow_entry_value_const(params
,
109 fs_sink
->quiet
= (bool) bt_value_bool_get(value
);
112 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
;
115 g_free(validation_error
);
120 void destroy_fs_sink_comp(struct fs_sink_comp
*fs_sink
)
126 if (fs_sink
->output_dir_path
) {
127 g_string_free(fs_sink
->output_dir_path
, TRUE
);
128 fs_sink
->output_dir_path
= NULL
;
131 if (fs_sink
->traces
) {
132 g_hash_table_destroy(fs_sink
->traces
);
133 fs_sink
->traces
= NULL
;
136 BT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(
137 fs_sink
->upstream_iter
);
145 bt_component_class_initialize_method_status
ctf_fs_sink_init(
146 bt_self_component_sink
*self_comp_sink
,
147 bt_self_component_sink_configuration
*config
,
148 const bt_value
*params
,
149 void *init_method_data
)
151 bt_component_class_initialize_method_status status
;
152 bt_self_component_add_port_status add_port_status
;
153 struct fs_sink_comp
*fs_sink
= NULL
;
154 bt_self_component
*self_comp
=
155 bt_self_component_sink_as_self_component(self_comp_sink
);
156 bt_logging_level log_level
= bt_component_get_logging_level(
157 bt_self_component_as_component(self_comp
));
159 fs_sink
= g_new0(struct fs_sink_comp
, 1);
161 BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR
, log_level
, self_comp
,
162 "Failed to allocate one CTF FS sink structure.");
163 BT_CURRENT_THREAD_ERROR_APPEND_CAUSE_FROM_COMPONENT(
164 self_comp
, "Failed to allocate one CTF FS sink structure.");
165 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
169 fs_sink
->log_level
= log_level
;
170 fs_sink
->self_comp
= self_comp
;
171 fs_sink
->output_dir_path
= g_string_new(NULL
);
172 status
= configure_component(fs_sink
, params
);
173 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
174 /* configure_component() logs errors */
178 if (fs_sink
->assume_single_trace
&&
179 g_file_test(fs_sink
->output_dir_path
->str
,
180 G_FILE_TEST_EXISTS
)) {
181 BT_COMP_LOGE_APPEND_CAUSE(self_comp
,
182 "Single trace mode, but output path exists: output-path=\"%s\"",
183 fs_sink
->output_dir_path
->str
);
184 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
188 status
= ensure_output_dir_exists(fs_sink
);
189 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
190 /* ensure_output_dir_exists() logs errors */
194 fs_sink
->traces
= g_hash_table_new_full(g_direct_hash
, g_direct_equal
,
195 NULL
, (GDestroyNotify
) fs_sink_trace_destroy
);
196 if (!fs_sink
->traces
) {
197 BT_COMP_LOGE_APPEND_CAUSE(self_comp
, "Failed to allocate one GHashTable.");
198 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
202 add_port_status
= bt_self_component_sink_add_input_port(
203 self_comp_sink
, in_port_name
, NULL
, NULL
);
204 if (add_port_status
!= BT_SELF_COMPONENT_ADD_PORT_STATUS_OK
) {
205 status
= (int) add_port_status
;
206 BT_COMP_LOGE_APPEND_CAUSE(self_comp
, "Failed to add input port.");
210 bt_self_component_set_data(self_comp
, fs_sink
);
213 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
214 destroy_fs_sink_comp(fs_sink
);
221 struct fs_sink_stream
*borrow_stream(struct fs_sink_comp
*fs_sink
,
222 const bt_stream
*ir_stream
)
224 const bt_trace
*ir_trace
= bt_stream_borrow_trace_const(ir_stream
);
225 struct fs_sink_trace
*trace
;
226 struct fs_sink_stream
*stream
= NULL
;
228 trace
= g_hash_table_lookup(fs_sink
->traces
, ir_trace
);
229 if (G_UNLIKELY(!trace
)) {
230 if (fs_sink
->assume_single_trace
&&
231 g_hash_table_size(fs_sink
->traces
) > 0) {
232 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
233 "Single trace mode, but getting more than one trace: "
234 "stream-name=\"%s\"",
235 bt_stream_get_name(ir_stream
));
239 trace
= fs_sink_trace_create(fs_sink
, ir_trace
);
245 stream
= g_hash_table_lookup(trace
->streams
, ir_stream
);
246 if (G_UNLIKELY(!stream
)) {
247 stream
= fs_sink_stream_create(trace
, ir_stream
);
258 bt_component_class_sink_consume_method_status
handle_event_msg(
259 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
262 bt_component_class_sink_consume_method_status status
=
263 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
264 const bt_event
*ir_event
= bt_message_event_borrow_event_const(msg
);
265 const bt_stream
*ir_stream
= bt_event_borrow_stream_const(ir_event
);
266 struct fs_sink_stream
*stream
;
267 struct fs_sink_ctf_event_class
*ec
= NULL
;
268 const bt_clock_snapshot
*cs
= NULL
;
270 stream
= borrow_stream(fs_sink
, ir_stream
);
271 if (G_UNLIKELY(!stream
)) {
272 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
273 "Failed to borrow stream.");
274 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
278 ret
= try_translate_event_class_trace_ir_to_ctf_ir(fs_sink
,
279 stream
->sc
, bt_event_borrow_class_const(ir_event
), &ec
);
281 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
282 "Failed to translate event class to CTF IR.");
283 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
289 if (stream
->sc
->default_clock_class
) {
290 cs
= bt_message_event_borrow_default_clock_snapshot_const(
295 * If this event's stream does not support packets, then we
296 * lazily create artificial packets.
298 * The size of an artificial packet is arbitrarily at least
299 * 4 MiB (it usually is greater because we close it when
300 * comes the time to write a new event and the packet's content
301 * size is >= 4 MiB), except the last one which can be smaller.
303 if (G_UNLIKELY(!stream
->sc
->has_packets
)) {
304 if (stream
->packet_state
.is_open
&&
305 bt_ctfser_get_offset_in_current_packet_bits(&stream
->ctfser
) / 8 >=
308 * Stream's current packet is larger than 4 MiB:
309 * close it. A new packet will be opened just
312 ret
= fs_sink_stream_close_packet(stream
, NULL
);
314 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
315 "Failed to close packet.");
316 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
321 if (!stream
->packet_state
.is_open
) {
322 /* Stream's packet is not currently opened: open it */
323 ret
= fs_sink_stream_open_packet(stream
, NULL
, NULL
);
325 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
326 "Failed to open packet.");
327 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
333 BT_ASSERT_DBG(stream
->packet_state
.is_open
);
334 ret
= fs_sink_stream_write_event(stream
, cs
, ir_event
, ec
);
335 if (G_UNLIKELY(ret
)) {
336 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
337 "Failed to write event.");
338 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
347 bt_component_class_sink_consume_method_status
handle_packet_beginning_msg(
348 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
351 bt_component_class_sink_consume_method_status status
=
352 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
353 const bt_packet
*ir_packet
=
354 bt_message_packet_beginning_borrow_packet_const(msg
);
355 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
356 struct fs_sink_stream
*stream
;
357 const bt_clock_snapshot
*cs
= NULL
;
359 stream
= borrow_stream(fs_sink
, ir_stream
);
360 if (G_UNLIKELY(!stream
)) {
361 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
362 "Failed to borrow stream.");
363 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
367 if (stream
->sc
->packets_have_ts_begin
) {
368 cs
= bt_message_packet_beginning_borrow_default_clock_snapshot_const(
374 * If we previously received a discarded events message with
375 * a time range, make sure that its beginning time matches what's
376 * expected for CTF 1.8, that is:
378 * * Its beginning time is the previous packet's end
379 * time (or the current packet's beginning time if
380 * this is the first packet).
382 * We check this here instead of in handle_packet_end_msg()
383 * because we want to catch any incompatible message as early as
384 * possible to report the error.
386 * Validation of the discarded events message's end time is
387 * performed in handle_packet_end_msg().
389 if (stream
->discarded_events_state
.in_range
) {
390 uint64_t expected_cs
;
393 * `stream->discarded_events_state.in_range` is only set
394 * when the stream class's discarded events have a time
397 * It is required that the packet beginning and end
398 * messages for this stream class have times when
399 * discarded events have a time range.
401 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
402 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
403 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
405 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
406 /* We're opening the first packet */
407 expected_cs
= bt_clock_snapshot_get_value(cs
);
409 expected_cs
= stream
->prev_packet_state
.end_cs
;
412 if (stream
->discarded_events_state
.beginning_cs
!=
414 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
415 "Incompatible discarded events message: "
416 "unexpected beginning time: "
417 "beginning-cs-val=%" PRIu64
", "
418 "expected-beginning-cs-val=%" PRIu64
", "
419 "stream-id=%" PRIu64
", stream-name=\"%s\", "
420 "trace-name=\"%s\", path=\"%s/%s\"",
421 stream
->discarded_events_state
.beginning_cs
,
423 bt_stream_get_id(ir_stream
),
424 bt_stream_get_name(ir_stream
),
426 bt_stream_borrow_trace_const(ir_stream
)),
427 stream
->trace
->path
->str
, stream
->file_name
->str
);
428 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
434 * If we previously received a discarded packets message with a
435 * time range, make sure that its beginning and end times match
436 * what's expected for CTF 1.8, that is:
438 * * Its beginning time is the previous packet's end time.
440 * * Its end time is the current packet's beginning time.
442 if (stream
->discarded_packets_state
.in_range
) {
443 uint64_t expected_end_cs
;
446 * `stream->discarded_packets_state.in_range` is only
447 * set when the stream class's discarded packets have a
450 * It is required that the packet beginning and end
451 * messages for this stream class have times when
452 * discarded packets have a time range.
454 BT_ASSERT(stream
->sc
->discarded_packets_has_ts
);
455 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
456 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
459 * It is not supported to have a discarded packets
460 * message _before_ the first packet: we cannot validate
461 * that its beginning time is compatible with CTF 1.8 in
464 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
465 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
466 "Incompatible discarded packets message "
467 "occurring before the stream's first packet: "
468 "stream-id=%" PRIu64
", stream-name=\"%s\", "
469 "trace-name=\"%s\", path=\"%s/%s\"",
470 bt_stream_get_id(ir_stream
),
471 bt_stream_get_name(ir_stream
),
473 bt_stream_borrow_trace_const(ir_stream
)),
474 stream
->trace
->path
->str
, stream
->file_name
->str
);
475 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
479 if (stream
->discarded_packets_state
.beginning_cs
!=
480 stream
->prev_packet_state
.end_cs
) {
481 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
482 "Incompatible discarded packets message: "
483 "unexpected beginning time: "
484 "beginning-cs-val=%" PRIu64
", "
485 "expected-beginning-cs-val=%" PRIu64
", "
486 "stream-id=%" PRIu64
", stream-name=\"%s\", "
487 "trace-name=\"%s\", path=\"%s/%s\"",
488 stream
->discarded_packets_state
.beginning_cs
,
489 stream
->prev_packet_state
.end_cs
,
490 bt_stream_get_id(ir_stream
),
491 bt_stream_get_name(ir_stream
),
493 bt_stream_borrow_trace_const(ir_stream
)),
494 stream
->trace
->path
->str
, stream
->file_name
->str
);
495 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
499 expected_end_cs
= bt_clock_snapshot_get_value(cs
);
501 if (stream
->discarded_packets_state
.end_cs
!=
503 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
504 "Incompatible discarded packets message: "
505 "unexpected end time: "
506 "end-cs-val=%" PRIu64
", "
507 "expected-end-cs-val=%" PRIu64
", "
508 "stream-id=%" PRIu64
", stream-name=\"%s\", "
509 "trace-name=\"%s\", path=\"%s/%s\"",
510 stream
->discarded_packets_state
.end_cs
,
512 bt_stream_get_id(ir_stream
),
513 bt_stream_get_name(ir_stream
),
515 bt_stream_borrow_trace_const(ir_stream
)),
516 stream
->trace
->path
->str
, stream
->file_name
->str
);
517 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
523 * We're not in a discarded packets time range anymore since we
524 * require that the discarded packets time ranges go from one
525 * packet's end time to the next packet's beginning time, and
526 * we're handling a packet beginning message here.
528 stream
->discarded_packets_state
.in_range
= false;
530 ret
= fs_sink_stream_open_packet(stream
, cs
, ir_packet
);
532 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
533 "Failed to open packet.");
534 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
543 bt_component_class_sink_consume_method_status
handle_packet_end_msg(
544 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
547 bt_component_class_sink_consume_method_status status
=
548 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
549 const bt_packet
*ir_packet
=
550 bt_message_packet_end_borrow_packet_const(msg
);
551 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
552 struct fs_sink_stream
*stream
;
553 const bt_clock_snapshot
*cs
= NULL
;
555 stream
= borrow_stream(fs_sink
, ir_stream
);
556 if (G_UNLIKELY(!stream
)) {
557 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
558 "Failed to borrow stream.");
559 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
563 if (stream
->sc
->packets_have_ts_end
) {
564 cs
= bt_message_packet_end_borrow_default_clock_snapshot_const(
570 * If we previously received a discarded events message with
571 * a time range, make sure that its end time matches what's
572 * expected for CTF 1.8, that is:
574 * * Its end time is the current packet's end time.
576 * Validation of the discarded events message's beginning time
577 * is performed in handle_packet_beginning_msg().
579 if (stream
->discarded_events_state
.in_range
) {
580 uint64_t expected_cs
;
583 * `stream->discarded_events_state.in_range` is only set
584 * when the stream class's discarded events have a time
587 * It is required that the packet beginning and end
588 * messages for this stream class have times when
589 * discarded events have a time range.
591 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
592 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
593 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
595 expected_cs
= bt_clock_snapshot_get_value(cs
);
597 if (stream
->discarded_events_state
.end_cs
!= expected_cs
) {
598 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
599 "Incompatible discarded events message: "
600 "unexpected end time: "
601 "end-cs-val=%" PRIu64
", "
602 "expected-end-cs-val=%" PRIu64
", "
603 "stream-id=%" PRIu64
", stream-name=\"%s\", "
604 "trace-name=\"%s\", path=\"%s/%s\"",
605 stream
->discarded_events_state
.end_cs
,
607 bt_stream_get_id(ir_stream
),
608 bt_stream_get_name(ir_stream
),
610 bt_stream_borrow_trace_const(ir_stream
)),
611 stream
->trace
->path
->str
, stream
->file_name
->str
);
612 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
617 ret
= fs_sink_stream_close_packet(stream
, cs
);
619 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
620 "Failed to close packet.");
621 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
626 * We're not in a discarded events time range anymore since we
627 * require that the discarded events time ranges go from one
628 * packet's end time to the next packet's end time, and we're
629 * handling a packet end message here.
631 stream
->discarded_events_state
.in_range
= false;
638 bt_component_class_sink_consume_method_status
handle_stream_beginning_msg(
639 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
641 bt_component_class_sink_consume_method_status status
=
642 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
643 const bt_stream
*ir_stream
=
644 bt_message_stream_beginning_borrow_stream_const(msg
);
645 const bt_stream_class
*ir_sc
=
646 bt_stream_borrow_class_const(ir_stream
);
647 struct fs_sink_stream
*stream
;
648 bool packets_have_beginning_end_cs
=
649 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc
) &&
650 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc
);
653 * Not supported: discarded events or discarded packets support
654 * without packets support. Packets are the way to know where
655 * discarded events/packets occurred in CTF 1.8.
657 if (!bt_stream_class_supports_packets(ir_sc
)) {
658 BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc
));
660 if (!fs_sink
->ignore_discarded_events
&&
661 bt_stream_class_supports_discarded_events(ir_sc
)) {
662 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
663 "Unsupported stream: "
664 "stream does not support packets, "
665 "but supports discarded events: "
667 "stream-id=%" PRIu64
", "
668 "stream-name=\"%s\"",
669 ir_stream
, bt_stream_get_id(ir_stream
),
670 bt_stream_get_name(ir_stream
));
671 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
677 * Not supported: discarded events with default clock snapshots,
678 * but packet beginning/end without default clock snapshot.
680 if (!fs_sink
->ignore_discarded_events
&&
681 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc
) &&
682 !packets_have_beginning_end_cs
) {
683 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
684 "Unsupported stream: discarded events have "
685 "default clock snapshots, but packets have no "
686 "beginning and/or end default clock snapshots: "
688 "stream-id=%" PRIu64
", "
689 "stream-name=\"%s\"",
690 ir_stream
, bt_stream_get_id(ir_stream
),
691 bt_stream_get_name(ir_stream
));
692 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
697 * Not supported: discarded packets with default clock
698 * snapshots, but packet beginning/end without default clock
701 if (!fs_sink
->ignore_discarded_packets
&&
702 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc
) &&
703 !packets_have_beginning_end_cs
) {
704 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
705 "Unsupported stream: discarded packets have "
706 "default clock snapshots, but packets have no "
707 "beginning and/or end default clock snapshots: "
709 "stream-id=%" PRIu64
", "
710 "stream-name=\"%s\"",
711 ir_stream
, bt_stream_get_id(ir_stream
),
712 bt_stream_get_name(ir_stream
));
713 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
717 stream
= borrow_stream(fs_sink
, ir_stream
);
719 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
720 "Failed to borrow stream.");
721 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
725 BT_COMP_LOGI("Created new, empty stream file: "
726 "stream-id=%" PRIu64
", stream-name=\"%s\", "
727 "trace-name=\"%s\", path=\"%s/%s\"",
728 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
729 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
730 stream
->trace
->path
->str
, stream
->file_name
->str
);
737 bt_component_class_sink_consume_method_status
handle_stream_end_msg(
738 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
740 bt_component_class_sink_consume_method_status status
=
741 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
742 const bt_stream
*ir_stream
=
743 bt_message_stream_end_borrow_stream_const(msg
);
744 struct fs_sink_stream
*stream
;
746 stream
= borrow_stream(fs_sink
, ir_stream
);
748 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
749 "Failed to borrow stream.");
750 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
754 if (G_UNLIKELY(!stream
->sc
->has_packets
&&
755 stream
->packet_state
.is_open
)) {
756 /* Close stream's current artificial packet */
757 int ret
= fs_sink_stream_close_packet(stream
, NULL
);
760 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
761 "Failed to close packet.");
762 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
767 BT_COMP_LOGI("Closing stream file: "
768 "stream-id=%" PRIu64
", stream-name=\"%s\", "
769 "trace-name=\"%s\", path=\"%s/%s\"",
770 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
771 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
772 stream
->trace
->path
->str
, stream
->file_name
->str
);
775 * This destroys the stream object and frees all its resources,
776 * closing the stream file.
778 g_hash_table_remove(stream
->trace
->streams
, ir_stream
);
785 bt_component_class_sink_consume_method_status
handle_discarded_events_msg(
786 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
788 bt_component_class_sink_consume_method_status status
=
789 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
790 const bt_stream
*ir_stream
=
791 bt_message_discarded_events_borrow_stream_const(msg
);
792 struct fs_sink_stream
*stream
;
793 const bt_clock_snapshot
*cs
= NULL
;
794 bt_property_availability avail
;
797 stream
= borrow_stream(fs_sink
, ir_stream
);
799 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
800 "Failed to borrow stream.");
801 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
805 if (fs_sink
->ignore_discarded_events
) {
806 BT_COMP_LOGI("Ignoring discarded events message: "
807 "stream-id=%" PRIu64
", stream-name=\"%s\", "
808 "trace-name=\"%s\", path=\"%s/%s\"",
809 bt_stream_get_id(ir_stream
),
810 bt_stream_get_name(ir_stream
),
812 bt_stream_borrow_trace_const(ir_stream
)),
813 stream
->trace
->path
->str
, stream
->file_name
->str
);
817 if (stream
->discarded_events_state
.in_range
) {
818 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
819 "Unsupported contiguous discarded events message: "
820 "stream-id=%" PRIu64
", stream-name=\"%s\", "
821 "trace-name=\"%s\", path=\"%s/%s\"",
822 bt_stream_get_id(ir_stream
),
823 bt_stream_get_name(ir_stream
),
825 bt_stream_borrow_trace_const(ir_stream
)),
826 stream
->trace
->path
->str
, stream
->file_name
->str
);
827 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
832 * If we're currently in an opened packet (got a packet
833 * beginning message, but no packet end message yet), we do not
834 * support having a discarded events message with a time range
835 * because we require that the discarded events message's time
836 * range go from a packet's end time to the next packet's end
839 if (stream
->packet_state
.is_open
&&
840 stream
->sc
->discarded_events_has_ts
) {
841 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
842 "Unsupported discarded events message with "
843 "default clock snapshots occurring within a packet: "
844 "stream-id=%" PRIu64
", stream-name=\"%s\", "
845 "trace-name=\"%s\", path=\"%s/%s\"",
846 bt_stream_get_id(ir_stream
),
847 bt_stream_get_name(ir_stream
),
849 bt_stream_borrow_trace_const(ir_stream
)),
850 stream
->trace
->path
->str
, stream
->file_name
->str
);
851 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
855 if (stream
->sc
->discarded_events_has_ts
) {
857 * Make the stream's state be in the time range of a
858 * discarded events message since we have the message's
859 * time range (`stream->sc->discarded_events_has_ts`).
861 stream
->discarded_events_state
.in_range
= true;
864 * The clock snapshot values will be validated when
865 * handling the next packet beginning and end messages
866 * (next calls to handle_packet_beginning_msg() and
867 * handle_packet_end_msg()).
869 cs
= bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
872 stream
->discarded_events_state
.beginning_cs
=
873 bt_clock_snapshot_get_value(cs
);
874 cs
= bt_message_discarded_events_borrow_end_default_clock_snapshot_const(
877 stream
->discarded_events_state
.end_cs
= bt_clock_snapshot_get_value(cs
);
880 avail
= bt_message_discarded_events_get_count(msg
, &count
);
881 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
883 * There's no specific count of discarded events: set it
884 * to 1 so that we know that we at least discarded
890 stream
->packet_state
.discarded_events_counter
+= count
;
897 bt_component_class_sink_consume_method_status
handle_discarded_packets_msg(
898 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
900 bt_component_class_sink_consume_method_status status
=
901 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
902 const bt_stream
*ir_stream
=
903 bt_message_discarded_packets_borrow_stream_const(msg
);
904 struct fs_sink_stream
*stream
;
905 const bt_clock_snapshot
*cs
= NULL
;
906 bt_property_availability avail
;
909 stream
= borrow_stream(fs_sink
, ir_stream
);
911 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
912 "Failed to borrow stream.");
913 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
917 if (fs_sink
->ignore_discarded_packets
) {
918 BT_COMP_LOGI("Ignoring discarded packets message: "
919 "stream-id=%" PRIu64
", stream-name=\"%s\", "
920 "trace-name=\"%s\", path=\"%s/%s\"",
921 bt_stream_get_id(ir_stream
),
922 bt_stream_get_name(ir_stream
),
924 bt_stream_borrow_trace_const(ir_stream
)),
925 stream
->trace
->path
->str
, stream
->file_name
->str
);
929 if (stream
->discarded_packets_state
.in_range
) {
930 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
931 "Unsupported contiguous discarded packets message: "
932 "stream-id=%" PRIu64
", stream-name=\"%s\", "
933 "trace-name=\"%s\", path=\"%s/%s\"",
934 bt_stream_get_id(ir_stream
),
935 bt_stream_get_name(ir_stream
),
937 bt_stream_borrow_trace_const(ir_stream
)),
938 stream
->trace
->path
->str
, stream
->file_name
->str
);
939 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
944 * Discarded packets messages are guaranteed to occur between
947 BT_ASSERT(!stream
->packet_state
.is_open
);
949 if (stream
->sc
->discarded_packets_has_ts
) {
951 * Make the stream's state be in the time range of a
952 * discarded packets message since we have the message's
953 * time range (`stream->sc->discarded_packets_has_ts`).
955 stream
->discarded_packets_state
.in_range
= true;
958 * The clock snapshot values will be validated when
959 * handling the next packet beginning message (next call
960 * to handle_packet_beginning_msg()).
962 cs
= bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
965 stream
->discarded_packets_state
.beginning_cs
=
966 bt_clock_snapshot_get_value(cs
);
967 cs
= bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(
970 stream
->discarded_packets_state
.end_cs
=
971 bt_clock_snapshot_get_value(cs
);
974 avail
= bt_message_discarded_packets_get_count(msg
, &count
);
975 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
977 * There's no specific count of discarded packets: set
978 * it to 1 so that we know that we at least discarded
984 stream
->packet_state
.seq_num
+= count
;
991 void put_messages(bt_message_array_const msgs
, uint64_t count
)
995 for (i
= 0; i
< count
; i
++) {
996 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
1001 bt_component_class_sink_consume_method_status
ctf_fs_sink_consume(
1002 bt_self_component_sink
*self_comp
)
1004 bt_component_class_sink_consume_method_status status
=
1005 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
1006 struct fs_sink_comp
*fs_sink
;
1007 bt_message_iterator_next_status next_status
;
1008 uint64_t msg_count
= 0;
1009 bt_message_array_const msgs
;
1011 fs_sink
= bt_self_component_get_data(
1012 bt_self_component_sink_as_self_component(self_comp
));
1013 BT_ASSERT_DBG(fs_sink
);
1014 BT_ASSERT_DBG(fs_sink
->upstream_iter
);
1016 /* Consume messages */
1017 next_status
= bt_message_iterator_next(
1018 fs_sink
->upstream_iter
, &msgs
, &msg_count
);
1019 if (next_status
< 0) {
1020 status
= (int) next_status
;
1021 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
1022 "Failed to get next message from upstream iterator.");
1026 switch (next_status
) {
1027 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK
:
1031 for (i
= 0; i
< msg_count
; i
++) {
1032 const bt_message
*msg
= msgs
[i
];
1036 switch (bt_message_get_type(msg
)) {
1037 case BT_MESSAGE_TYPE_EVENT
:
1038 status
= handle_event_msg(fs_sink
, msg
);
1040 case BT_MESSAGE_TYPE_PACKET_BEGINNING
:
1041 status
= handle_packet_beginning_msg(
1044 case BT_MESSAGE_TYPE_PACKET_END
:
1045 status
= handle_packet_end_msg(
1048 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY
:
1050 BT_COMP_LOGD_STR("Ignoring message iterator inactivity message.");
1052 case BT_MESSAGE_TYPE_STREAM_BEGINNING
:
1053 status
= handle_stream_beginning_msg(
1056 case BT_MESSAGE_TYPE_STREAM_END
:
1057 status
= handle_stream_end_msg(
1060 case BT_MESSAGE_TYPE_DISCARDED_EVENTS
:
1061 status
= handle_discarded_events_msg(
1064 case BT_MESSAGE_TYPE_DISCARDED_PACKETS
:
1065 status
= handle_discarded_packets_msg(
1072 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
1074 if (status
!= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
) {
1075 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
1076 "Failed to handle message: "
1077 "generated CTF traces could be incomplete: "
1078 "output-dir-path=\"%s\"",
1079 fs_sink
->output_dir_path
->str
);
1086 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN
:
1087 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN
;
1089 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END
:
1090 /* TODO: Finalize all traces (should already be done?) */
1091 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END
;
1100 BT_ASSERT(status
!= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
);
1101 put_messages(msgs
, msg_count
);
1108 bt_component_class_sink_graph_is_configured_method_status
1109 ctf_fs_sink_graph_is_configured(
1110 bt_self_component_sink
*self_comp
)
1112 bt_component_class_sink_graph_is_configured_method_status status
;
1113 bt_message_iterator_create_from_sink_component_status
1115 struct fs_sink_comp
*fs_sink
= bt_self_component_get_data(
1116 bt_self_component_sink_as_self_component(self_comp
));
1119 bt_message_iterator_create_from_sink_component(
1121 bt_self_component_sink_borrow_input_port_by_name(
1122 self_comp
, in_port_name
), &fs_sink
->upstream_iter
);
1123 if (msg_iter_status
!= BT_MESSAGE_ITERATOR_CREATE_FROM_SINK_COMPONENT_STATUS_OK
) {
1124 status
= (int) msg_iter_status
;
1125 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
1126 "Failed to create upstream iterator.");
1130 status
= BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK
;
1136 void ctf_fs_sink_finalize(bt_self_component_sink
*self_comp
)
1138 struct fs_sink_comp
*fs_sink
= bt_self_component_get_data(
1139 bt_self_component_sink_as_self_component(self_comp
));
1141 destroy_fs_sink_comp(fs_sink
);