2 * SPDX-License-Identifier: MIT
4 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
7 #define BT_COMP_LOG_SELF_COMP (fs_sink->self_comp)
8 #define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level)
9 #define BT_LOG_TAG "PLUGIN/SINK.CTF.FS"
10 #include "logging/comp-logging.h"
12 #include <babeltrace2/babeltrace.h>
16 #include "common/assert.h"
17 #include "ctfser/ctfser.h"
18 #include "plugins/common/param-validation/param-validation.h"
20 #include "fs-sink.hpp"
21 #include "fs-sink-trace.hpp"
22 #include "fs-sink-stream.hpp"
23 #include "fs-sink-ctf-meta.hpp"
24 #include "translate-trace-ir-to-ctf-ir.hpp"
25 #include "translate-ctf-ir-to-tsdl.hpp"
27 static const char * const in_port_name
= "in";
29 static bt_component_class_initialize_method_status
30 ensure_output_dir_exists(struct fs_sink_comp
*fs_sink
)
32 bt_component_class_initialize_method_status status
=
33 BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
;
36 ret
= g_mkdir_with_parents(fs_sink
->output_dir_path
->str
, 0755);
38 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(fs_sink
->self_comp
,
39 "Cannot create directories for output directory",
40 ": output-dir-path=\"%s\"", fs_sink
->output_dir_path
->str
);
41 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
49 static bt_param_validation_map_value_entry_descr fs_sink_params_descr
[] = {
50 {"path", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY
,
51 bt_param_validation_value_descr::makeString()},
52 {"assume-single-trace", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
53 bt_param_validation_value_descr::makeBool()},
54 {"ignore-discarded-events", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
55 bt_param_validation_value_descr::makeBool()},
56 {"ignore-discarded-packets", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
57 bt_param_validation_value_descr::makeBool()},
58 {"quiet", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
59 bt_param_validation_value_descr::makeBool()},
60 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END
};
62 static bt_component_class_initialize_method_status
configure_component(struct fs_sink_comp
*fs_sink
,
63 const bt_value
*params
)
65 bt_component_class_initialize_method_status status
;
66 const bt_value
*value
;
67 enum bt_param_validation_status validation_status
;
68 gchar
*validation_error
;
71 bt_param_validation_validate(params
, fs_sink_params_descr
, &validation_error
);
72 if (validation_status
== BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR
) {
73 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
74 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "%s", validation_error
);
76 } else if (validation_status
== BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR
) {
77 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
81 value
= bt_value_map_borrow_entry_value_const(params
, "path");
82 g_string_assign(fs_sink
->output_dir_path
, bt_value_string_get(value
));
84 value
= bt_value_map_borrow_entry_value_const(params
, "assume-single-trace");
86 fs_sink
->assume_single_trace
= (bool) bt_value_bool_get(value
);
89 value
= bt_value_map_borrow_entry_value_const(params
, "ignore-discarded-events");
91 fs_sink
->ignore_discarded_events
= (bool) bt_value_bool_get(value
);
94 value
= bt_value_map_borrow_entry_value_const(params
, "ignore-discarded-packets");
96 fs_sink
->ignore_discarded_packets
= (bool) bt_value_bool_get(value
);
99 value
= bt_value_map_borrow_entry_value_const(params
, "quiet");
101 fs_sink
->quiet
= (bool) bt_value_bool_get(value
);
104 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
;
107 g_free(validation_error
);
111 static void destroy_fs_sink_comp(struct fs_sink_comp
*fs_sink
)
117 if (fs_sink
->output_dir_path
) {
118 g_string_free(fs_sink
->output_dir_path
, TRUE
);
119 fs_sink
->output_dir_path
= NULL
;
122 if (fs_sink
->traces
) {
123 g_hash_table_destroy(fs_sink
->traces
);
124 fs_sink
->traces
= NULL
;
127 BT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(fs_sink
->upstream_iter
);
134 bt_component_class_initialize_method_status
ctf_fs_sink_init(bt_self_component_sink
*self_comp_sink
,
135 bt_self_component_sink_configuration
*,
136 const bt_value
*params
, void *)
138 bt_component_class_initialize_method_status status
;
139 bt_self_component_add_port_status add_port_status
;
140 struct fs_sink_comp
*fs_sink
= NULL
;
141 bt_self_component
*self_comp
= bt_self_component_sink_as_self_component(self_comp_sink
);
142 bt_logging_level log_level
=
143 bt_component_get_logging_level(bt_self_component_as_component(self_comp
));
145 fs_sink
= g_new0(struct fs_sink_comp
, 1);
147 BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR
, log_level
, self_comp
,
148 "Failed to allocate one CTF FS sink structure.");
149 BT_CURRENT_THREAD_ERROR_APPEND_CAUSE_FROM_COMPONENT(
150 self_comp
, "Failed to allocate one CTF FS sink structure.");
151 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
155 fs_sink
->log_level
= log_level
;
156 fs_sink
->self_comp
= self_comp
;
157 fs_sink
->output_dir_path
= g_string_new(NULL
);
158 status
= configure_component(fs_sink
, params
);
159 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
160 /* configure_component() logs errors */
164 if (fs_sink
->assume_single_trace
&&
165 g_file_test(fs_sink
->output_dir_path
->str
, G_FILE_TEST_EXISTS
)) {
166 BT_COMP_LOGE_APPEND_CAUSE(self_comp
,
167 "Single trace mode, but output path exists: output-path=\"%s\"",
168 fs_sink
->output_dir_path
->str
);
169 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
173 status
= ensure_output_dir_exists(fs_sink
);
174 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
175 /* ensure_output_dir_exists() logs errors */
179 fs_sink
->traces
= g_hash_table_new_full(g_direct_hash
, g_direct_equal
, NULL
,
180 (GDestroyNotify
) fs_sink_trace_destroy
);
181 if (!fs_sink
->traces
) {
182 BT_COMP_LOGE_APPEND_CAUSE(self_comp
, "Failed to allocate one GHashTable.");
183 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
188 bt_self_component_sink_add_input_port(self_comp_sink
, in_port_name
, NULL
, NULL
);
189 if (add_port_status
!= BT_SELF_COMPONENT_ADD_PORT_STATUS_OK
) {
190 status
= (bt_component_class_initialize_method_status
) add_port_status
;
191 BT_COMP_LOGE_APPEND_CAUSE(self_comp
, "Failed to add input port.");
195 bt_self_component_set_data(self_comp
, fs_sink
);
198 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
199 destroy_fs_sink_comp(fs_sink
);
205 static inline struct fs_sink_stream
*borrow_stream(struct fs_sink_comp
*fs_sink
,
206 const bt_stream
*ir_stream
)
208 const bt_trace
*ir_trace
= bt_stream_borrow_trace_const(ir_stream
);
209 struct fs_sink_trace
*trace
;
210 struct fs_sink_stream
*stream
= NULL
;
212 trace
= (fs_sink_trace
*) g_hash_table_lookup(fs_sink
->traces
, ir_trace
);
213 if (G_UNLIKELY(!trace
)) {
214 if (fs_sink
->assume_single_trace
&& g_hash_table_size(fs_sink
->traces
) > 0) {
215 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
216 "Single trace mode, but getting more than one trace: "
217 "stream-name=\"%s\"",
218 bt_stream_get_name(ir_stream
));
222 trace
= fs_sink_trace_create(fs_sink
, ir_trace
);
228 stream
= (fs_sink_stream
*) g_hash_table_lookup(trace
->streams
, ir_stream
);
229 if (G_UNLIKELY(!stream
)) {
230 stream
= fs_sink_stream_create(trace
, ir_stream
);
240 static inline bt_component_class_sink_consume_method_status
241 handle_event_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
244 bt_component_class_sink_consume_method_status status
=
245 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
246 const bt_event
*ir_event
= bt_message_event_borrow_event_const(msg
);
247 const bt_stream
*ir_stream
= bt_event_borrow_stream_const(ir_event
);
248 struct fs_sink_stream
*stream
;
249 struct fs_sink_ctf_event_class
*ec
= NULL
;
250 const bt_clock_snapshot
*cs
= NULL
;
252 stream
= borrow_stream(fs_sink
, ir_stream
);
253 if (G_UNLIKELY(!stream
)) {
254 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
255 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
259 ret
= try_translate_event_class_trace_ir_to_ctf_ir(fs_sink
, stream
->sc
,
260 bt_event_borrow_class_const(ir_event
), &ec
);
262 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to translate event class to CTF IR.");
263 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
269 if (stream
->sc
->default_clock_class
) {
270 cs
= bt_message_event_borrow_default_clock_snapshot_const(msg
);
274 * If this event's stream does not support packets, then we
275 * lazily create artificial packets.
277 * The size of an artificial packet is arbitrarily at least
278 * 4 MiB (it usually is greater because we close it when
279 * comes the time to write a new event and the packet's content
280 * size is >= 4 MiB), except the last one which can be smaller.
282 if (G_UNLIKELY(!stream
->sc
->has_packets
)) {
283 if (stream
->packet_state
.is_open
&&
284 bt_ctfser_get_offset_in_current_packet_bits(&stream
->ctfser
) / 8 >= 4 * 1024 * 1024) {
286 * Stream's current packet is larger than 4 MiB:
287 * close it. A new packet will be opened just
290 ret
= fs_sink_stream_close_packet(stream
, NULL
);
292 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to close packet.");
293 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
298 if (!stream
->packet_state
.is_open
) {
299 /* Stream's packet is not currently opened: open it */
300 ret
= fs_sink_stream_open_packet(stream
, NULL
, NULL
);
302 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to open packet.");
303 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
309 BT_ASSERT_DBG(stream
->packet_state
.is_open
);
310 ret
= fs_sink_stream_write_event(stream
, cs
, ir_event
, ec
);
311 if (G_UNLIKELY(ret
)) {
312 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to write event.");
313 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
321 static inline bt_component_class_sink_consume_method_status
322 handle_packet_beginning_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
325 bt_component_class_sink_consume_method_status status
=
326 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
327 const bt_packet
*ir_packet
= bt_message_packet_beginning_borrow_packet_const(msg
);
328 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
329 struct fs_sink_stream
*stream
;
330 const bt_clock_snapshot
*cs
= NULL
;
332 stream
= borrow_stream(fs_sink
, ir_stream
);
333 if (G_UNLIKELY(!stream
)) {
334 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
335 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
339 if (stream
->sc
->packets_have_ts_begin
) {
340 cs
= bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg
);
345 * If we previously received a discarded events message with
346 * a time range, make sure that its beginning time matches what's
347 * expected for CTF 1.8, that is:
349 * * Its beginning time is the previous packet's end
350 * time (or the current packet's beginning time if
351 * this is the first packet).
353 * We check this here instead of in handle_packet_end_msg()
354 * because we want to catch any incompatible message as early as
355 * possible to report the error.
357 * Validation of the discarded events message's end time is
358 * performed in handle_packet_end_msg().
360 if (stream
->discarded_events_state
.in_range
) {
361 uint64_t expected_cs
;
364 * `stream->discarded_events_state.in_range` is only set
365 * when the stream class's discarded events have a time
368 * It is required that the packet beginning and end
369 * messages for this stream class have times when
370 * discarded events have a time range.
372 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
373 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
374 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
376 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
377 /* We're opening the first packet */
378 expected_cs
= bt_clock_snapshot_get_value(cs
);
380 expected_cs
= stream
->prev_packet_state
.end_cs
;
383 if (stream
->discarded_events_state
.beginning_cs
!= expected_cs
) {
384 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
385 "Incompatible discarded events message: "
386 "unexpected beginning time: "
387 "beginning-cs-val=%" PRIu64
", "
388 "expected-beginning-cs-val=%" PRIu64
", "
389 "stream-id=%" PRIu64
", stream-name=\"%s\", "
390 "trace-name=\"%s\", path=\"%s/%s\"",
391 stream
->discarded_events_state
.beginning_cs
, expected_cs
,
392 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
393 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
394 stream
->trace
->path
->str
, stream
->file_name
->str
);
395 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
401 * If we previously received a discarded packets message with a
402 * time range, make sure that its beginning and end times match
403 * what's expected for CTF 1.8, that is:
405 * * Its beginning time is the previous packet's end time.
407 * * Its end time is the current packet's beginning time.
409 if (stream
->discarded_packets_state
.in_range
) {
410 uint64_t expected_end_cs
;
413 * `stream->discarded_packets_state.in_range` is only
414 * set when the stream class's discarded packets have a
417 * It is required that the packet beginning and end
418 * messages for this stream class have times when
419 * discarded packets have a time range.
421 BT_ASSERT(stream
->sc
->discarded_packets_has_ts
);
422 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
423 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
426 * It is not supported to have a discarded packets
427 * message _before_ the first packet: we cannot validate
428 * that its beginning time is compatible with CTF 1.8 in
431 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
432 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
433 "Incompatible discarded packets message "
434 "occurring before the stream's first packet: "
435 "stream-id=%" PRIu64
", stream-name=\"%s\", "
436 "trace-name=\"%s\", path=\"%s/%s\"",
437 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
438 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
439 stream
->trace
->path
->str
, stream
->file_name
->str
);
440 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
444 if (stream
->discarded_packets_state
.beginning_cs
!= stream
->prev_packet_state
.end_cs
) {
445 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
446 "Incompatible discarded packets message: "
447 "unexpected beginning time: "
448 "beginning-cs-val=%" PRIu64
", "
449 "expected-beginning-cs-val=%" PRIu64
", "
450 "stream-id=%" PRIu64
", stream-name=\"%s\", "
451 "trace-name=\"%s\", path=\"%s/%s\"",
452 stream
->discarded_packets_state
.beginning_cs
,
453 stream
->prev_packet_state
.end_cs
, bt_stream_get_id(ir_stream
),
454 bt_stream_get_name(ir_stream
),
455 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
456 stream
->trace
->path
->str
, stream
->file_name
->str
);
457 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
461 expected_end_cs
= bt_clock_snapshot_get_value(cs
);
463 if (stream
->discarded_packets_state
.end_cs
!= expected_end_cs
) {
464 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
465 "Incompatible discarded packets message: "
466 "unexpected end time: "
467 "end-cs-val=%" PRIu64
", "
468 "expected-end-cs-val=%" PRIu64
", "
469 "stream-id=%" PRIu64
", stream-name=\"%s\", "
470 "trace-name=\"%s\", path=\"%s/%s\"",
471 stream
->discarded_packets_state
.end_cs
, expected_end_cs
,
472 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
473 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
474 stream
->trace
->path
->str
, stream
->file_name
->str
);
475 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
481 * We're not in a discarded packets time range anymore since we
482 * require that the discarded packets time ranges go from one
483 * packet's end time to the next packet's beginning time, and
484 * we're handling a packet beginning message here.
486 stream
->discarded_packets_state
.in_range
= false;
488 ret
= fs_sink_stream_open_packet(stream
, cs
, ir_packet
);
490 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to open packet.");
491 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
499 static inline bt_component_class_sink_consume_method_status
500 handle_packet_end_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
503 bt_component_class_sink_consume_method_status status
=
504 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
505 const bt_packet
*ir_packet
= bt_message_packet_end_borrow_packet_const(msg
);
506 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
507 struct fs_sink_stream
*stream
;
508 const bt_clock_snapshot
*cs
= NULL
;
510 stream
= borrow_stream(fs_sink
, ir_stream
);
511 if (G_UNLIKELY(!stream
)) {
512 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
513 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
517 if (stream
->sc
->packets_have_ts_end
) {
518 cs
= bt_message_packet_end_borrow_default_clock_snapshot_const(msg
);
523 * If we previously received a discarded events message with
524 * a time range, make sure that its end time matches what's
525 * expected for CTF 1.8, that is:
527 * * Its end time is the current packet's end time.
529 * Validation of the discarded events message's beginning time
530 * is performed in handle_packet_beginning_msg().
532 if (stream
->discarded_events_state
.in_range
) {
533 uint64_t expected_cs
;
536 * `stream->discarded_events_state.in_range` is only set
537 * when the stream class's discarded events have a time
540 * It is required that the packet beginning and end
541 * messages for this stream class have times when
542 * discarded events have a time range.
544 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
545 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
546 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
548 expected_cs
= bt_clock_snapshot_get_value(cs
);
550 if (stream
->discarded_events_state
.end_cs
!= expected_cs
) {
551 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
552 "Incompatible discarded events message: "
553 "unexpected end time: "
554 "end-cs-val=%" PRIu64
", "
555 "expected-end-cs-val=%" PRIu64
", "
556 "stream-id=%" PRIu64
", stream-name=\"%s\", "
557 "trace-name=\"%s\", path=\"%s/%s\"",
558 stream
->discarded_events_state
.end_cs
, expected_cs
,
559 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
560 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
561 stream
->trace
->path
->str
, stream
->file_name
->str
);
562 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
567 ret
= fs_sink_stream_close_packet(stream
, cs
);
569 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to close packet.");
570 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
575 * We're not in a discarded events time range anymore since we
576 * require that the discarded events time ranges go from one
577 * packet's end time to the next packet's end time, and we're
578 * handling a packet end message here.
580 stream
->discarded_events_state
.in_range
= false;
586 static inline bt_component_class_sink_consume_method_status
587 handle_stream_beginning_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
589 bt_component_class_sink_consume_method_status status
=
590 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
591 const bt_stream
*ir_stream
= bt_message_stream_beginning_borrow_stream_const(msg
);
592 const bt_stream_class
*ir_sc
= bt_stream_borrow_class_const(ir_stream
);
593 struct fs_sink_stream
*stream
;
594 bool packets_have_beginning_end_cs
=
595 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc
) &&
596 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc
);
599 * Not supported: discarded events or discarded packets support
600 * without packets support. Packets are the way to know where
601 * discarded events/packets occurred in CTF 1.8.
603 if (!bt_stream_class_supports_packets(ir_sc
)) {
604 BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc
));
606 if (!fs_sink
->ignore_discarded_events
&& bt_stream_class_supports_discarded_events(ir_sc
)) {
607 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
608 "Unsupported stream: "
609 "stream does not support packets, "
610 "but supports discarded events: "
612 "stream-id=%" PRIu64
", "
613 "stream-name=\"%s\"",
614 ir_stream
, bt_stream_get_id(ir_stream
),
615 bt_stream_get_name(ir_stream
));
616 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
622 * Not supported: discarded events with default clock snapshots,
623 * but packet beginning/end without default clock snapshot.
625 if (!fs_sink
->ignore_discarded_events
&&
626 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc
) &&
627 !packets_have_beginning_end_cs
) {
628 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
629 "Unsupported stream: discarded events have "
630 "default clock snapshots, but packets have no "
631 "beginning and/or end default clock snapshots: "
633 "stream-id=%" PRIu64
", "
634 "stream-name=\"%s\"",
635 ir_stream
, bt_stream_get_id(ir_stream
),
636 bt_stream_get_name(ir_stream
));
637 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
642 * Not supported: discarded packets with default clock
643 * snapshots, but packet beginning/end without default clock
646 if (!fs_sink
->ignore_discarded_packets
&&
647 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc
) &&
648 !packets_have_beginning_end_cs
) {
649 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
650 "Unsupported stream: discarded packets have "
651 "default clock snapshots, but packets have no "
652 "beginning and/or end default clock snapshots: "
654 "stream-id=%" PRIu64
", "
655 "stream-name=\"%s\"",
656 ir_stream
, bt_stream_get_id(ir_stream
),
657 bt_stream_get_name(ir_stream
));
658 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
662 stream
= borrow_stream(fs_sink
, ir_stream
);
664 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
665 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
669 BT_COMP_LOGI("Created new, empty stream file: "
670 "stream-id=%" PRIu64
", stream-name=\"%s\", "
671 "trace-name=\"%s\", path=\"%s/%s\"",
672 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
673 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
674 stream
->trace
->path
->str
, stream
->file_name
->str
);
680 static inline bt_component_class_sink_consume_method_status
681 handle_stream_end_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
683 bt_component_class_sink_consume_method_status status
=
684 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
685 const bt_stream
*ir_stream
= bt_message_stream_end_borrow_stream_const(msg
);
686 struct fs_sink_stream
*stream
;
688 stream
= borrow_stream(fs_sink
, ir_stream
);
690 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
691 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
695 if (G_UNLIKELY(!stream
->sc
->has_packets
&& stream
->packet_state
.is_open
)) {
696 /* Close stream's current artificial packet */
697 int ret
= fs_sink_stream_close_packet(stream
, NULL
);
700 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to close packet.");
701 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
706 BT_COMP_LOGI("Closing stream file: "
707 "stream-id=%" PRIu64
", stream-name=\"%s\", "
708 "trace-name=\"%s\", path=\"%s/%s\"",
709 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
710 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
711 stream
->trace
->path
->str
, stream
->file_name
->str
);
714 * This destroys the stream object and frees all its resources,
715 * closing the stream file.
717 g_hash_table_remove(stream
->trace
->streams
, ir_stream
);
723 static inline bt_component_class_sink_consume_method_status
724 handle_discarded_events_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
726 bt_component_class_sink_consume_method_status status
=
727 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
728 const bt_stream
*ir_stream
= bt_message_discarded_events_borrow_stream_const(msg
);
729 struct fs_sink_stream
*stream
;
730 const bt_clock_snapshot
*cs
= NULL
;
731 bt_property_availability avail
;
734 stream
= borrow_stream(fs_sink
, ir_stream
);
736 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
737 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
741 if (fs_sink
->ignore_discarded_events
) {
742 BT_COMP_LOGI("Ignoring discarded events message: "
743 "stream-id=%" PRIu64
", stream-name=\"%s\", "
744 "trace-name=\"%s\", path=\"%s/%s\"",
745 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
746 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
747 stream
->trace
->path
->str
, stream
->file_name
->str
);
751 if (stream
->discarded_events_state
.in_range
) {
752 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
753 "Unsupported contiguous discarded events message: "
754 "stream-id=%" PRIu64
", stream-name=\"%s\", "
755 "trace-name=\"%s\", path=\"%s/%s\"",
756 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
757 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
758 stream
->trace
->path
->str
, stream
->file_name
->str
);
759 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
764 * If we're currently in an opened packet (got a packet
765 * beginning message, but no packet end message yet), we do not
766 * support having a discarded events message with a time range
767 * because we require that the discarded events message's time
768 * range go from a packet's end time to the next packet's end
771 if (stream
->packet_state
.is_open
&& stream
->sc
->discarded_events_has_ts
) {
772 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
773 "Unsupported discarded events message with "
774 "default clock snapshots occurring within a packet: "
775 "stream-id=%" PRIu64
", stream-name=\"%s\", "
776 "trace-name=\"%s\", path=\"%s/%s\"",
777 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
778 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
779 stream
->trace
->path
->str
, stream
->file_name
->str
);
780 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
784 if (stream
->sc
->discarded_events_has_ts
) {
786 * Make the stream's state be in the time range of a
787 * discarded events message since we have the message's
788 * time range (`stream->sc->discarded_events_has_ts`).
790 stream
->discarded_events_state
.in_range
= true;
793 * The clock snapshot values will be validated when
794 * handling the next packet beginning and end messages
795 * (next calls to handle_packet_beginning_msg() and
796 * handle_packet_end_msg()).
798 cs
= bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg
);
800 stream
->discarded_events_state
.beginning_cs
= bt_clock_snapshot_get_value(cs
);
801 cs
= bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg
);
803 stream
->discarded_events_state
.end_cs
= bt_clock_snapshot_get_value(cs
);
806 avail
= bt_message_discarded_events_get_count(msg
, &count
);
807 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
809 * There's no specific count of discarded events: set it
810 * to 1 so that we know that we at least discarded
816 stream
->packet_state
.discarded_events_counter
+= count
;
822 static inline bt_component_class_sink_consume_method_status
823 handle_discarded_packets_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
825 bt_component_class_sink_consume_method_status status
=
826 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
827 const bt_stream
*ir_stream
= bt_message_discarded_packets_borrow_stream_const(msg
);
828 struct fs_sink_stream
*stream
;
829 const bt_clock_snapshot
*cs
= NULL
;
830 bt_property_availability avail
;
833 stream
= borrow_stream(fs_sink
, ir_stream
);
835 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
836 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
840 if (fs_sink
->ignore_discarded_packets
) {
841 BT_COMP_LOGI("Ignoring discarded packets message: "
842 "stream-id=%" PRIu64
", stream-name=\"%s\", "
843 "trace-name=\"%s\", path=\"%s/%s\"",
844 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
845 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
846 stream
->trace
->path
->str
, stream
->file_name
->str
);
850 if (stream
->discarded_packets_state
.in_range
) {
851 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
852 "Unsupported contiguous discarded packets message: "
853 "stream-id=%" PRIu64
", stream-name=\"%s\", "
854 "trace-name=\"%s\", path=\"%s/%s\"",
855 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
856 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
857 stream
->trace
->path
->str
, stream
->file_name
->str
);
858 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
863 * Discarded packets messages are guaranteed to occur between
866 BT_ASSERT(!stream
->packet_state
.is_open
);
868 if (stream
->sc
->discarded_packets_has_ts
) {
870 * Make the stream's state be in the time range of a
871 * discarded packets message since we have the message's
872 * time range (`stream->sc->discarded_packets_has_ts`).
874 stream
->discarded_packets_state
.in_range
= true;
877 * The clock snapshot values will be validated when
878 * handling the next packet beginning message (next call
879 * to handle_packet_beginning_msg()).
881 cs
= bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg
);
883 stream
->discarded_packets_state
.beginning_cs
= bt_clock_snapshot_get_value(cs
);
884 cs
= bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg
);
886 stream
->discarded_packets_state
.end_cs
= bt_clock_snapshot_get_value(cs
);
889 avail
= bt_message_discarded_packets_get_count(msg
, &count
);
890 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
892 * There's no specific count of discarded packets: set
893 * it to 1 so that we know that we at least discarded
899 stream
->packet_state
.seq_num
+= count
;
905 static inline void put_messages(bt_message_array_const msgs
, uint64_t count
)
909 for (i
= 0; i
< count
; i
++) {
910 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
914 bt_component_class_sink_consume_method_status
ctf_fs_sink_consume(bt_self_component_sink
*self_comp
)
916 bt_component_class_sink_consume_method_status status
=
917 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
918 struct fs_sink_comp
*fs_sink
;
919 bt_message_iterator_next_status next_status
;
920 uint64_t msg_count
= 0;
921 bt_message_array_const msgs
;
923 fs_sink
= (fs_sink_comp
*) bt_self_component_get_data(
924 bt_self_component_sink_as_self_component(self_comp
));
925 BT_ASSERT_DBG(fs_sink
);
926 BT_ASSERT_DBG(fs_sink
->upstream_iter
);
928 /* Consume messages */
929 next_status
= bt_message_iterator_next(fs_sink
->upstream_iter
, &msgs
, &msg_count
);
930 if (next_status
< 0) {
931 status
= (bt_component_class_sink_consume_method_status
) next_status
;
932 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
933 "Failed to get next message from upstream iterator.");
937 switch (next_status
) {
938 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK
:
942 for (i
= 0; i
< msg_count
; i
++) {
943 const bt_message
*msg
= msgs
[i
];
947 switch (bt_message_get_type(msg
)) {
948 case BT_MESSAGE_TYPE_EVENT
:
949 status
= handle_event_msg(fs_sink
, msg
);
951 case BT_MESSAGE_TYPE_PACKET_BEGINNING
:
952 status
= handle_packet_beginning_msg(fs_sink
, msg
);
954 case BT_MESSAGE_TYPE_PACKET_END
:
955 status
= handle_packet_end_msg(fs_sink
, msg
);
957 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY
:
959 BT_COMP_LOGD_STR("Ignoring message iterator inactivity message.");
961 case BT_MESSAGE_TYPE_STREAM_BEGINNING
:
962 status
= handle_stream_beginning_msg(fs_sink
, msg
);
964 case BT_MESSAGE_TYPE_STREAM_END
:
965 status
= handle_stream_end_msg(fs_sink
, msg
);
967 case BT_MESSAGE_TYPE_DISCARDED_EVENTS
:
968 status
= handle_discarded_events_msg(fs_sink
, msg
);
970 case BT_MESSAGE_TYPE_DISCARDED_PACKETS
:
971 status
= handle_discarded_packets_msg(fs_sink
, msg
);
977 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
979 if (status
!= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
) {
980 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
981 "Failed to handle message: "
982 "generated CTF traces could be incomplete: "
983 "output-dir-path=\"%s\"",
984 fs_sink
->output_dir_path
->str
);
991 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN
:
992 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN
;
994 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END
:
995 /* TODO: Finalize all traces (should already be done?) */
996 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END
;
1005 BT_ASSERT(status
!= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
);
1006 put_messages(msgs
, msg_count
);
1012 bt_component_class_sink_graph_is_configured_method_status
1013 ctf_fs_sink_graph_is_configured(bt_self_component_sink
*self_comp
)
1015 bt_component_class_sink_graph_is_configured_method_status status
;
1016 bt_message_iterator_create_from_sink_component_status msg_iter_status
;
1017 fs_sink_comp
*fs_sink
= (fs_sink_comp
*) bt_self_component_get_data(
1018 bt_self_component_sink_as_self_component(self_comp
));
1020 msg_iter_status
= bt_message_iterator_create_from_sink_component(
1021 self_comp
, bt_self_component_sink_borrow_input_port_by_name(self_comp
, in_port_name
),
1022 &fs_sink
->upstream_iter
);
1023 if (msg_iter_status
!= BT_MESSAGE_ITERATOR_CREATE_FROM_SINK_COMPONENT_STATUS_OK
) {
1024 status
= (bt_component_class_sink_graph_is_configured_method_status
) msg_iter_status
;
1025 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to create upstream iterator.");
1029 status
= BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK
;
1034 void ctf_fs_sink_finalize(bt_self_component_sink
*self_comp
)
1036 fs_sink_comp
*fs_sink
= (fs_sink_comp
*) bt_self_component_get_data(
1037 bt_self_component_sink_as_self_component(self_comp
));
1039 destroy_fs_sink_comp(fs_sink
);