2 * SPDX-License-Identifier: MIT
4 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
7 #define BT_COMP_LOG_SELF_COMP (fs_sink->self_comp)
8 #define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level)
9 #define BT_LOG_TAG "PLUGIN/SINK.CTF.FS"
10 #include "logging/comp-logging.h"
12 #include <babeltrace2/babeltrace.h>
16 #include "common/assert.h"
17 #include "ctfser/ctfser.h"
18 #include "plugins/common/param-validation/param-validation.h"
20 #include "fs-sink.hpp"
21 #include "fs-sink-trace.hpp"
22 #include "fs-sink-stream.hpp"
23 #include "fs-sink-ctf-meta.hpp"
24 #include "translate-trace-ir-to-ctf-ir.hpp"
25 #include "translate-ctf-ir-to-tsdl.hpp"
27 static const char * const in_port_name
= "in";
29 static bt_component_class_initialize_method_status
30 ensure_output_dir_exists(struct fs_sink_comp
*fs_sink
)
32 bt_component_class_initialize_method_status status
=
33 BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
;
36 ret
= g_mkdir_with_parents(fs_sink
->output_dir_path
->str
, 0755);
38 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(fs_sink
->self_comp
,
39 "Cannot create directories for output directory",
40 ": output-dir-path=\"%s\"", fs_sink
->output_dir_path
->str
);
41 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
49 static bt_param_validation_map_value_entry_descr fs_sink_params_descr
[] = {
51 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY
,
52 {bt_param_validation_value_descr::string_t
}},
53 {"assume-single-trace",
54 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
55 {bt_param_validation_value_descr::bool_t
}},
56 {"ignore-discarded-events",
57 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
58 {bt_param_validation_value_descr::bool_t
}},
59 {"ignore-discarded-packets",
60 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
61 {bt_param_validation_value_descr::bool_t
}},
63 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
64 {bt_param_validation_value_descr::bool_t
}},
65 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END
};
67 static bt_component_class_initialize_method_status
configure_component(struct fs_sink_comp
*fs_sink
,
68 const bt_value
*params
)
70 bt_component_class_initialize_method_status status
;
71 const bt_value
*value
;
72 enum bt_param_validation_status validation_status
;
73 gchar
*validation_error
;
76 bt_param_validation_validate(params
, fs_sink_params_descr
, &validation_error
);
77 if (validation_status
== BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR
) {
78 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
79 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "%s", validation_error
);
81 } else if (validation_status
== BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR
) {
82 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
86 value
= bt_value_map_borrow_entry_value_const(params
, "path");
87 g_string_assign(fs_sink
->output_dir_path
, bt_value_string_get(value
));
89 value
= bt_value_map_borrow_entry_value_const(params
, "assume-single-trace");
91 fs_sink
->assume_single_trace
= (bool) bt_value_bool_get(value
);
94 value
= bt_value_map_borrow_entry_value_const(params
, "ignore-discarded-events");
96 fs_sink
->ignore_discarded_events
= (bool) bt_value_bool_get(value
);
99 value
= bt_value_map_borrow_entry_value_const(params
, "ignore-discarded-packets");
101 fs_sink
->ignore_discarded_packets
= (bool) bt_value_bool_get(value
);
104 value
= bt_value_map_borrow_entry_value_const(params
, "quiet");
106 fs_sink
->quiet
= (bool) bt_value_bool_get(value
);
109 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
;
112 g_free(validation_error
);
116 static void destroy_fs_sink_comp(struct fs_sink_comp
*fs_sink
)
122 if (fs_sink
->output_dir_path
) {
123 g_string_free(fs_sink
->output_dir_path
, TRUE
);
124 fs_sink
->output_dir_path
= NULL
;
127 if (fs_sink
->traces
) {
128 g_hash_table_destroy(fs_sink
->traces
);
129 fs_sink
->traces
= NULL
;
132 BT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(fs_sink
->upstream_iter
);
140 bt_component_class_initialize_method_status
141 ctf_fs_sink_init(bt_self_component_sink
*self_comp_sink
,
142 bt_self_component_sink_configuration
*config
, const bt_value
*params
,
143 void *init_method_data
)
145 bt_component_class_initialize_method_status status
;
146 bt_self_component_add_port_status add_port_status
;
147 struct fs_sink_comp
*fs_sink
= NULL
;
148 bt_self_component
*self_comp
= bt_self_component_sink_as_self_component(self_comp_sink
);
149 bt_logging_level log_level
=
150 bt_component_get_logging_level(bt_self_component_as_component(self_comp
));
152 fs_sink
= g_new0(struct fs_sink_comp
, 1);
154 BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR
, log_level
, self_comp
,
155 "Failed to allocate one CTF FS sink structure.");
156 BT_CURRENT_THREAD_ERROR_APPEND_CAUSE_FROM_COMPONENT(
157 self_comp
, "Failed to allocate one CTF FS sink structure.");
158 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
162 fs_sink
->log_level
= log_level
;
163 fs_sink
->self_comp
= self_comp
;
164 fs_sink
->output_dir_path
= g_string_new(NULL
);
165 status
= configure_component(fs_sink
, params
);
166 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
167 /* configure_component() logs errors */
171 if (fs_sink
->assume_single_trace
&&
172 g_file_test(fs_sink
->output_dir_path
->str
, G_FILE_TEST_EXISTS
)) {
173 BT_COMP_LOGE_APPEND_CAUSE(self_comp
,
174 "Single trace mode, but output path exists: output-path=\"%s\"",
175 fs_sink
->output_dir_path
->str
);
176 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
180 status
= ensure_output_dir_exists(fs_sink
);
181 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
182 /* ensure_output_dir_exists() logs errors */
186 fs_sink
->traces
= g_hash_table_new_full(g_direct_hash
, g_direct_equal
, NULL
,
187 (GDestroyNotify
) fs_sink_trace_destroy
);
188 if (!fs_sink
->traces
) {
189 BT_COMP_LOGE_APPEND_CAUSE(self_comp
, "Failed to allocate one GHashTable.");
190 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
195 bt_self_component_sink_add_input_port(self_comp_sink
, in_port_name
, NULL
, NULL
);
196 if (add_port_status
!= BT_SELF_COMPONENT_ADD_PORT_STATUS_OK
) {
197 status
= (bt_component_class_initialize_method_status
) add_port_status
;
198 BT_COMP_LOGE_APPEND_CAUSE(self_comp
, "Failed to add input port.");
202 bt_self_component_set_data(self_comp
, fs_sink
);
205 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
206 destroy_fs_sink_comp(fs_sink
);
212 static inline struct fs_sink_stream
*borrow_stream(struct fs_sink_comp
*fs_sink
,
213 const bt_stream
*ir_stream
)
215 const bt_trace
*ir_trace
= bt_stream_borrow_trace_const(ir_stream
);
216 struct fs_sink_trace
*trace
;
217 struct fs_sink_stream
*stream
= NULL
;
219 trace
= (fs_sink_trace
*) g_hash_table_lookup(fs_sink
->traces
, ir_trace
);
220 if (G_UNLIKELY(!trace
)) {
221 if (fs_sink
->assume_single_trace
&& g_hash_table_size(fs_sink
->traces
) > 0) {
222 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
223 "Single trace mode, but getting more than one trace: "
224 "stream-name=\"%s\"",
225 bt_stream_get_name(ir_stream
));
229 trace
= fs_sink_trace_create(fs_sink
, ir_trace
);
235 stream
= (fs_sink_stream
*) g_hash_table_lookup(trace
->streams
, ir_stream
);
236 if (G_UNLIKELY(!stream
)) {
237 stream
= fs_sink_stream_create(trace
, ir_stream
);
247 static inline bt_component_class_sink_consume_method_status
248 handle_event_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
251 bt_component_class_sink_consume_method_status status
=
252 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
253 const bt_event
*ir_event
= bt_message_event_borrow_event_const(msg
);
254 const bt_stream
*ir_stream
= bt_event_borrow_stream_const(ir_event
);
255 struct fs_sink_stream
*stream
;
256 struct fs_sink_ctf_event_class
*ec
= NULL
;
257 const bt_clock_snapshot
*cs
= NULL
;
259 stream
= borrow_stream(fs_sink
, ir_stream
);
260 if (G_UNLIKELY(!stream
)) {
261 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
262 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
266 ret
= try_translate_event_class_trace_ir_to_ctf_ir(fs_sink
, stream
->sc
,
267 bt_event_borrow_class_const(ir_event
), &ec
);
269 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to translate event class to CTF IR.");
270 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
276 if (stream
->sc
->default_clock_class
) {
277 cs
= bt_message_event_borrow_default_clock_snapshot_const(msg
);
281 * If this event's stream does not support packets, then we
282 * lazily create artificial packets.
284 * The size of an artificial packet is arbitrarily at least
285 * 4 MiB (it usually is greater because we close it when
286 * comes the time to write a new event and the packet's content
287 * size is >= 4 MiB), except the last one which can be smaller.
289 if (G_UNLIKELY(!stream
->sc
->has_packets
)) {
290 if (stream
->packet_state
.is_open
&&
291 bt_ctfser_get_offset_in_current_packet_bits(&stream
->ctfser
) / 8 >= 4 * 1024 * 1024) {
293 * Stream's current packet is larger than 4 MiB:
294 * close it. A new packet will be opened just
297 ret
= fs_sink_stream_close_packet(stream
, NULL
);
299 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to close packet.");
300 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
305 if (!stream
->packet_state
.is_open
) {
306 /* Stream's packet is not currently opened: open it */
307 ret
= fs_sink_stream_open_packet(stream
, NULL
, NULL
);
309 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to open packet.");
310 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
316 BT_ASSERT_DBG(stream
->packet_state
.is_open
);
317 ret
= fs_sink_stream_write_event(stream
, cs
, ir_event
, ec
);
318 if (G_UNLIKELY(ret
)) {
319 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to write event.");
320 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
328 static inline bt_component_class_sink_consume_method_status
329 handle_packet_beginning_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
332 bt_component_class_sink_consume_method_status status
=
333 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
334 const bt_packet
*ir_packet
= bt_message_packet_beginning_borrow_packet_const(msg
);
335 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
336 struct fs_sink_stream
*stream
;
337 const bt_clock_snapshot
*cs
= NULL
;
339 stream
= borrow_stream(fs_sink
, ir_stream
);
340 if (G_UNLIKELY(!stream
)) {
341 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
342 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
346 if (stream
->sc
->packets_have_ts_begin
) {
347 cs
= bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg
);
352 * If we previously received a discarded events message with
353 * a time range, make sure that its beginning time matches what's
354 * expected for CTF 1.8, that is:
356 * * Its beginning time is the previous packet's end
357 * time (or the current packet's beginning time if
358 * this is the first packet).
360 * We check this here instead of in handle_packet_end_msg()
361 * because we want to catch any incompatible message as early as
362 * possible to report the error.
364 * Validation of the discarded events message's end time is
365 * performed in handle_packet_end_msg().
367 if (stream
->discarded_events_state
.in_range
) {
368 uint64_t expected_cs
;
371 * `stream->discarded_events_state.in_range` is only set
372 * when the stream class's discarded events have a time
375 * It is required that the packet beginning and end
376 * messages for this stream class have times when
377 * discarded events have a time range.
379 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
380 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
381 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
383 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
384 /* We're opening the first packet */
385 expected_cs
= bt_clock_snapshot_get_value(cs
);
387 expected_cs
= stream
->prev_packet_state
.end_cs
;
390 if (stream
->discarded_events_state
.beginning_cs
!= expected_cs
) {
391 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
392 "Incompatible discarded events message: "
393 "unexpected beginning time: "
394 "beginning-cs-val=%" PRIu64
", "
395 "expected-beginning-cs-val=%" PRIu64
", "
396 "stream-id=%" PRIu64
", stream-name=\"%s\", "
397 "trace-name=\"%s\", path=\"%s/%s\"",
398 stream
->discarded_events_state
.beginning_cs
, expected_cs
,
399 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
400 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
401 stream
->trace
->path
->str
, stream
->file_name
->str
);
402 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
408 * If we previously received a discarded packets message with a
409 * time range, make sure that its beginning and end times match
410 * what's expected for CTF 1.8, that is:
412 * * Its beginning time is the previous packet's end time.
414 * * Its end time is the current packet's beginning time.
416 if (stream
->discarded_packets_state
.in_range
) {
417 uint64_t expected_end_cs
;
420 * `stream->discarded_packets_state.in_range` is only
421 * set when the stream class's discarded packets have a
424 * It is required that the packet beginning and end
425 * messages for this stream class have times when
426 * discarded packets have a time range.
428 BT_ASSERT(stream
->sc
->discarded_packets_has_ts
);
429 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
430 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
433 * It is not supported to have a discarded packets
434 * message _before_ the first packet: we cannot validate
435 * that its beginning time is compatible with CTF 1.8 in
438 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
439 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
440 "Incompatible discarded packets message "
441 "occurring before the stream's first packet: "
442 "stream-id=%" PRIu64
", stream-name=\"%s\", "
443 "trace-name=\"%s\", path=\"%s/%s\"",
444 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
445 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
446 stream
->trace
->path
->str
, stream
->file_name
->str
);
447 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
451 if (stream
->discarded_packets_state
.beginning_cs
!= stream
->prev_packet_state
.end_cs
) {
452 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
453 "Incompatible discarded packets message: "
454 "unexpected beginning time: "
455 "beginning-cs-val=%" PRIu64
", "
456 "expected-beginning-cs-val=%" PRIu64
", "
457 "stream-id=%" PRIu64
", stream-name=\"%s\", "
458 "trace-name=\"%s\", path=\"%s/%s\"",
459 stream
->discarded_packets_state
.beginning_cs
,
460 stream
->prev_packet_state
.end_cs
, bt_stream_get_id(ir_stream
),
461 bt_stream_get_name(ir_stream
),
462 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
463 stream
->trace
->path
->str
, stream
->file_name
->str
);
464 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
468 expected_end_cs
= bt_clock_snapshot_get_value(cs
);
470 if (stream
->discarded_packets_state
.end_cs
!= expected_end_cs
) {
471 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
472 "Incompatible discarded packets message: "
473 "unexpected end time: "
474 "end-cs-val=%" PRIu64
", "
475 "expected-end-cs-val=%" PRIu64
", "
476 "stream-id=%" PRIu64
", stream-name=\"%s\", "
477 "trace-name=\"%s\", path=\"%s/%s\"",
478 stream
->discarded_packets_state
.end_cs
, expected_end_cs
,
479 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
480 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
481 stream
->trace
->path
->str
, stream
->file_name
->str
);
482 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
488 * We're not in a discarded packets time range anymore since we
489 * require that the discarded packets time ranges go from one
490 * packet's end time to the next packet's beginning time, and
491 * we're handling a packet beginning message here.
493 stream
->discarded_packets_state
.in_range
= false;
495 ret
= fs_sink_stream_open_packet(stream
, cs
, ir_packet
);
497 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to open packet.");
498 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
506 static inline bt_component_class_sink_consume_method_status
507 handle_packet_end_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
510 bt_component_class_sink_consume_method_status status
=
511 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
512 const bt_packet
*ir_packet
= bt_message_packet_end_borrow_packet_const(msg
);
513 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
514 struct fs_sink_stream
*stream
;
515 const bt_clock_snapshot
*cs
= NULL
;
517 stream
= borrow_stream(fs_sink
, ir_stream
);
518 if (G_UNLIKELY(!stream
)) {
519 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
520 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
524 if (stream
->sc
->packets_have_ts_end
) {
525 cs
= bt_message_packet_end_borrow_default_clock_snapshot_const(msg
);
530 * If we previously received a discarded events message with
531 * a time range, make sure that its end time matches what's
532 * expected for CTF 1.8, that is:
534 * * Its end time is the current packet's end time.
536 * Validation of the discarded events message's beginning time
537 * is performed in handle_packet_beginning_msg().
539 if (stream
->discarded_events_state
.in_range
) {
540 uint64_t expected_cs
;
543 * `stream->discarded_events_state.in_range` is only set
544 * when the stream class's discarded events have a time
547 * It is required that the packet beginning and end
548 * messages for this stream class have times when
549 * discarded events have a time range.
551 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
552 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
553 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
555 expected_cs
= bt_clock_snapshot_get_value(cs
);
557 if (stream
->discarded_events_state
.end_cs
!= expected_cs
) {
558 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
559 "Incompatible discarded events message: "
560 "unexpected end time: "
561 "end-cs-val=%" PRIu64
", "
562 "expected-end-cs-val=%" PRIu64
", "
563 "stream-id=%" PRIu64
", stream-name=\"%s\", "
564 "trace-name=\"%s\", path=\"%s/%s\"",
565 stream
->discarded_events_state
.end_cs
, expected_cs
,
566 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
567 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
568 stream
->trace
->path
->str
, stream
->file_name
->str
);
569 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
574 ret
= fs_sink_stream_close_packet(stream
, cs
);
576 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to close packet.");
577 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
582 * We're not in a discarded events time range anymore since we
583 * require that the discarded events time ranges go from one
584 * packet's end time to the next packet's end time, and we're
585 * handling a packet end message here.
587 stream
->discarded_events_state
.in_range
= false;
593 static inline bt_component_class_sink_consume_method_status
594 handle_stream_beginning_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
596 bt_component_class_sink_consume_method_status status
=
597 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
598 const bt_stream
*ir_stream
= bt_message_stream_beginning_borrow_stream_const(msg
);
599 const bt_stream_class
*ir_sc
= bt_stream_borrow_class_const(ir_stream
);
600 struct fs_sink_stream
*stream
;
601 bool packets_have_beginning_end_cs
=
602 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc
) &&
603 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc
);
606 * Not supported: discarded events or discarded packets support
607 * without packets support. Packets are the way to know where
608 * discarded events/packets occurred in CTF 1.8.
610 if (!bt_stream_class_supports_packets(ir_sc
)) {
611 BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc
));
613 if (!fs_sink
->ignore_discarded_events
&& bt_stream_class_supports_discarded_events(ir_sc
)) {
614 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
615 "Unsupported stream: "
616 "stream does not support packets, "
617 "but supports discarded events: "
619 "stream-id=%" PRIu64
", "
620 "stream-name=\"%s\"",
621 ir_stream
, bt_stream_get_id(ir_stream
),
622 bt_stream_get_name(ir_stream
));
623 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
629 * Not supported: discarded events with default clock snapshots,
630 * but packet beginning/end without default clock snapshot.
632 if (!fs_sink
->ignore_discarded_events
&&
633 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc
) &&
634 !packets_have_beginning_end_cs
) {
635 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
636 "Unsupported stream: discarded events have "
637 "default clock snapshots, but packets have no "
638 "beginning and/or end default clock snapshots: "
640 "stream-id=%" PRIu64
", "
641 "stream-name=\"%s\"",
642 ir_stream
, bt_stream_get_id(ir_stream
),
643 bt_stream_get_name(ir_stream
));
644 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
649 * Not supported: discarded packets with default clock
650 * snapshots, but packet beginning/end without default clock
653 if (!fs_sink
->ignore_discarded_packets
&&
654 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc
) &&
655 !packets_have_beginning_end_cs
) {
656 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
657 "Unsupported stream: discarded packets have "
658 "default clock snapshots, but packets have no "
659 "beginning and/or end default clock snapshots: "
661 "stream-id=%" PRIu64
", "
662 "stream-name=\"%s\"",
663 ir_stream
, bt_stream_get_id(ir_stream
),
664 bt_stream_get_name(ir_stream
));
665 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
669 stream
= borrow_stream(fs_sink
, ir_stream
);
671 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
672 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
676 BT_COMP_LOGI("Created new, empty stream file: "
677 "stream-id=%" PRIu64
", stream-name=\"%s\", "
678 "trace-name=\"%s\", path=\"%s/%s\"",
679 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
680 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
681 stream
->trace
->path
->str
, stream
->file_name
->str
);
687 static inline bt_component_class_sink_consume_method_status
688 handle_stream_end_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
690 bt_component_class_sink_consume_method_status status
=
691 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
692 const bt_stream
*ir_stream
= bt_message_stream_end_borrow_stream_const(msg
);
693 struct fs_sink_stream
*stream
;
695 stream
= borrow_stream(fs_sink
, ir_stream
);
697 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
698 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
702 if (G_UNLIKELY(!stream
->sc
->has_packets
&& stream
->packet_state
.is_open
)) {
703 /* Close stream's current artificial packet */
704 int ret
= fs_sink_stream_close_packet(stream
, NULL
);
707 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to close packet.");
708 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
713 BT_COMP_LOGI("Closing stream file: "
714 "stream-id=%" PRIu64
", stream-name=\"%s\", "
715 "trace-name=\"%s\", path=\"%s/%s\"",
716 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
717 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
718 stream
->trace
->path
->str
, stream
->file_name
->str
);
721 * This destroys the stream object and frees all its resources,
722 * closing the stream file.
724 g_hash_table_remove(stream
->trace
->streams
, ir_stream
);
730 static inline bt_component_class_sink_consume_method_status
731 handle_discarded_events_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
733 bt_component_class_sink_consume_method_status status
=
734 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
735 const bt_stream
*ir_stream
= bt_message_discarded_events_borrow_stream_const(msg
);
736 struct fs_sink_stream
*stream
;
737 const bt_clock_snapshot
*cs
= NULL
;
738 bt_property_availability avail
;
741 stream
= borrow_stream(fs_sink
, ir_stream
);
743 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
744 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
748 if (fs_sink
->ignore_discarded_events
) {
749 BT_COMP_LOGI("Ignoring discarded events message: "
750 "stream-id=%" PRIu64
", stream-name=\"%s\", "
751 "trace-name=\"%s\", path=\"%s/%s\"",
752 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
753 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
754 stream
->trace
->path
->str
, stream
->file_name
->str
);
758 if (stream
->discarded_events_state
.in_range
) {
759 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
760 "Unsupported contiguous discarded events message: "
761 "stream-id=%" PRIu64
", stream-name=\"%s\", "
762 "trace-name=\"%s\", path=\"%s/%s\"",
763 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
764 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
765 stream
->trace
->path
->str
, stream
->file_name
->str
);
766 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
771 * If we're currently in an opened packet (got a packet
772 * beginning message, but no packet end message yet), we do not
773 * support having a discarded events message with a time range
774 * because we require that the discarded events message's time
775 * range go from a packet's end time to the next packet's end
778 if (stream
->packet_state
.is_open
&& stream
->sc
->discarded_events_has_ts
) {
779 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
780 "Unsupported discarded events message with "
781 "default clock snapshots occurring within a packet: "
782 "stream-id=%" PRIu64
", stream-name=\"%s\", "
783 "trace-name=\"%s\", path=\"%s/%s\"",
784 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
785 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
786 stream
->trace
->path
->str
, stream
->file_name
->str
);
787 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
791 if (stream
->sc
->discarded_events_has_ts
) {
793 * Make the stream's state be in the time range of a
794 * discarded events message since we have the message's
795 * time range (`stream->sc->discarded_events_has_ts`).
797 stream
->discarded_events_state
.in_range
= true;
800 * The clock snapshot values will be validated when
801 * handling the next packet beginning and end messages
802 * (next calls to handle_packet_beginning_msg() and
803 * handle_packet_end_msg()).
805 cs
= bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg
);
807 stream
->discarded_events_state
.beginning_cs
= bt_clock_snapshot_get_value(cs
);
808 cs
= bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg
);
810 stream
->discarded_events_state
.end_cs
= bt_clock_snapshot_get_value(cs
);
813 avail
= bt_message_discarded_events_get_count(msg
, &count
);
814 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
816 * There's no specific count of discarded events: set it
817 * to 1 so that we know that we at least discarded
823 stream
->packet_state
.discarded_events_counter
+= count
;
829 static inline bt_component_class_sink_consume_method_status
830 handle_discarded_packets_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
832 bt_component_class_sink_consume_method_status status
=
833 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
834 const bt_stream
*ir_stream
= bt_message_discarded_packets_borrow_stream_const(msg
);
835 struct fs_sink_stream
*stream
;
836 const bt_clock_snapshot
*cs
= NULL
;
837 bt_property_availability avail
;
840 stream
= borrow_stream(fs_sink
, ir_stream
);
842 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
843 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
847 if (fs_sink
->ignore_discarded_packets
) {
848 BT_COMP_LOGI("Ignoring discarded packets message: "
849 "stream-id=%" PRIu64
", stream-name=\"%s\", "
850 "trace-name=\"%s\", path=\"%s/%s\"",
851 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
852 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
853 stream
->trace
->path
->str
, stream
->file_name
->str
);
857 if (stream
->discarded_packets_state
.in_range
) {
858 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
859 "Unsupported contiguous discarded packets message: "
860 "stream-id=%" PRIu64
", stream-name=\"%s\", "
861 "trace-name=\"%s\", path=\"%s/%s\"",
862 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
863 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
864 stream
->trace
->path
->str
, stream
->file_name
->str
);
865 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
870 * Discarded packets messages are guaranteed to occur between
873 BT_ASSERT(!stream
->packet_state
.is_open
);
875 if (stream
->sc
->discarded_packets_has_ts
) {
877 * Make the stream's state be in the time range of a
878 * discarded packets message since we have the message's
879 * time range (`stream->sc->discarded_packets_has_ts`).
881 stream
->discarded_packets_state
.in_range
= true;
884 * The clock snapshot values will be validated when
885 * handling the next packet beginning message (next call
886 * to handle_packet_beginning_msg()).
888 cs
= bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg
);
890 stream
->discarded_packets_state
.beginning_cs
= bt_clock_snapshot_get_value(cs
);
891 cs
= bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg
);
893 stream
->discarded_packets_state
.end_cs
= bt_clock_snapshot_get_value(cs
);
896 avail
= bt_message_discarded_packets_get_count(msg
, &count
);
897 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
899 * There's no specific count of discarded packets: set
900 * it to 1 so that we know that we at least discarded
906 stream
->packet_state
.seq_num
+= count
;
912 static inline void put_messages(bt_message_array_const msgs
, uint64_t count
)
916 for (i
= 0; i
< count
; i
++) {
917 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
922 bt_component_class_sink_consume_method_status
ctf_fs_sink_consume(bt_self_component_sink
*self_comp
)
924 bt_component_class_sink_consume_method_status status
=
925 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
926 struct fs_sink_comp
*fs_sink
;
927 bt_message_iterator_next_status next_status
;
928 uint64_t msg_count
= 0;
929 bt_message_array_const msgs
;
931 fs_sink
= (fs_sink_comp
*) bt_self_component_get_data(
932 bt_self_component_sink_as_self_component(self_comp
));
933 BT_ASSERT_DBG(fs_sink
);
934 BT_ASSERT_DBG(fs_sink
->upstream_iter
);
936 /* Consume messages */
937 next_status
= bt_message_iterator_next(fs_sink
->upstream_iter
, &msgs
, &msg_count
);
938 if (next_status
< 0) {
939 status
= (bt_component_class_sink_consume_method_status
) next_status
;
940 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
941 "Failed to get next message from upstream iterator.");
945 switch (next_status
) {
946 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK
:
950 for (i
= 0; i
< msg_count
; i
++) {
951 const bt_message
*msg
= msgs
[i
];
955 switch (bt_message_get_type(msg
)) {
956 case BT_MESSAGE_TYPE_EVENT
:
957 status
= handle_event_msg(fs_sink
, msg
);
959 case BT_MESSAGE_TYPE_PACKET_BEGINNING
:
960 status
= handle_packet_beginning_msg(fs_sink
, msg
);
962 case BT_MESSAGE_TYPE_PACKET_END
:
963 status
= handle_packet_end_msg(fs_sink
, msg
);
965 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY
:
967 BT_COMP_LOGD_STR("Ignoring message iterator inactivity message.");
969 case BT_MESSAGE_TYPE_STREAM_BEGINNING
:
970 status
= handle_stream_beginning_msg(fs_sink
, msg
);
972 case BT_MESSAGE_TYPE_STREAM_END
:
973 status
= handle_stream_end_msg(fs_sink
, msg
);
975 case BT_MESSAGE_TYPE_DISCARDED_EVENTS
:
976 status
= handle_discarded_events_msg(fs_sink
, msg
);
978 case BT_MESSAGE_TYPE_DISCARDED_PACKETS
:
979 status
= handle_discarded_packets_msg(fs_sink
, msg
);
985 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
987 if (status
!= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
) {
988 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
989 "Failed to handle message: "
990 "generated CTF traces could be incomplete: "
991 "output-dir-path=\"%s\"",
992 fs_sink
->output_dir_path
->str
);
999 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN
:
1000 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN
;
1002 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END
:
1003 /* TODO: Finalize all traces (should already be done?) */
1004 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END
;
1013 BT_ASSERT(status
!= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
);
1014 put_messages(msgs
, msg_count
);
1021 bt_component_class_sink_graph_is_configured_method_status
1022 ctf_fs_sink_graph_is_configured(bt_self_component_sink
*self_comp
)
1024 bt_component_class_sink_graph_is_configured_method_status status
;
1025 bt_message_iterator_create_from_sink_component_status msg_iter_status
;
1026 fs_sink_comp
*fs_sink
= (fs_sink_comp
*) bt_self_component_get_data(
1027 bt_self_component_sink_as_self_component(self_comp
));
1029 msg_iter_status
= bt_message_iterator_create_from_sink_component(
1030 self_comp
, bt_self_component_sink_borrow_input_port_by_name(self_comp
, in_port_name
),
1031 &fs_sink
->upstream_iter
);
1032 if (msg_iter_status
!= BT_MESSAGE_ITERATOR_CREATE_FROM_SINK_COMPONENT_STATUS_OK
) {
1033 status
= (bt_component_class_sink_graph_is_configured_method_status
) msg_iter_status
;
1034 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to create upstream iterator.");
1038 status
= BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK
;
1044 void ctf_fs_sink_finalize(bt_self_component_sink
*self_comp
)
1046 fs_sink_comp
*fs_sink
= (fs_sink_comp
*) bt_self_component_get_data(
1047 bt_self_component_sink_as_self_component(self_comp
));
1049 destroy_fs_sink_comp(fs_sink
);