2 * SPDX-License-Identifier: MIT
4 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
11 #include <babeltrace2/babeltrace.h>
13 #define BT_COMP_LOG_SELF_COMP (fs_sink->self_comp)
14 #define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level)
15 #define BT_LOG_TAG "PLUGIN/SINK.CTF.FS"
16 #include "logging/comp-logging.h"
18 #include "common/assert.h"
19 #include "ctfser/ctfser.h"
21 #include "plugins/common/param-validation/param-validation.h"
23 #include "fs-sink-ctf-meta.hpp"
24 #include "fs-sink-stream.hpp"
25 #include "fs-sink-trace.hpp"
26 #include "fs-sink.hpp"
27 #include "translate-ctf-ir-to-tsdl.hpp"
28 #include "translate-trace-ir-to-ctf-ir.hpp"
30 static const char * const in_port_name
= "in";
32 static bt_component_class_initialize_method_status
33 ensure_output_dir_exists(struct fs_sink_comp
*fs_sink
)
35 bt_component_class_initialize_method_status status
=
36 BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
;
39 ret
= g_mkdir_with_parents(fs_sink
->output_dir_path
->str
, 0755);
41 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(fs_sink
->self_comp
,
42 "Cannot create directories for output directory",
43 ": output-dir-path=\"%s\"", fs_sink
->output_dir_path
->str
);
44 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
52 static bt_param_validation_map_value_entry_descr fs_sink_params_descr
[] = {
53 {"path", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY
,
54 bt_param_validation_value_descr::makeString()},
55 {"assume-single-trace", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
56 bt_param_validation_value_descr::makeBool()},
57 {"ignore-discarded-events", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
58 bt_param_validation_value_descr::makeBool()},
59 {"ignore-discarded-packets", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
60 bt_param_validation_value_descr::makeBool()},
61 {"quiet", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
62 bt_param_validation_value_descr::makeBool()},
63 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END
};
65 static bt_component_class_initialize_method_status
configure_component(struct fs_sink_comp
*fs_sink
,
66 const bt_value
*params
)
68 bt_component_class_initialize_method_status status
;
69 const bt_value
*value
;
70 enum bt_param_validation_status validation_status
;
71 gchar
*validation_error
;
74 bt_param_validation_validate(params
, fs_sink_params_descr
, &validation_error
);
75 if (validation_status
== BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR
) {
76 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
77 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "%s", validation_error
);
79 } else if (validation_status
== BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR
) {
80 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
84 value
= bt_value_map_borrow_entry_value_const(params
, "path");
85 g_string_assign(fs_sink
->output_dir_path
, bt_value_string_get(value
));
87 value
= bt_value_map_borrow_entry_value_const(params
, "assume-single-trace");
89 fs_sink
->assume_single_trace
= (bool) bt_value_bool_get(value
);
92 value
= bt_value_map_borrow_entry_value_const(params
, "ignore-discarded-events");
94 fs_sink
->ignore_discarded_events
= (bool) bt_value_bool_get(value
);
97 value
= bt_value_map_borrow_entry_value_const(params
, "ignore-discarded-packets");
99 fs_sink
->ignore_discarded_packets
= (bool) bt_value_bool_get(value
);
102 value
= bt_value_map_borrow_entry_value_const(params
, "quiet");
104 fs_sink
->quiet
= (bool) bt_value_bool_get(value
);
107 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
;
110 g_free(validation_error
);
114 static void destroy_fs_sink_comp(struct fs_sink_comp
*fs_sink
)
120 if (fs_sink
->output_dir_path
) {
121 g_string_free(fs_sink
->output_dir_path
, TRUE
);
122 fs_sink
->output_dir_path
= NULL
;
125 if (fs_sink
->traces
) {
126 g_hash_table_destroy(fs_sink
->traces
);
127 fs_sink
->traces
= NULL
;
130 BT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(fs_sink
->upstream_iter
);
137 bt_component_class_initialize_method_status
ctf_fs_sink_init(bt_self_component_sink
*self_comp_sink
,
138 bt_self_component_sink_configuration
*,
139 const bt_value
*params
, void *)
141 bt_component_class_initialize_method_status status
;
142 bt_self_component_add_port_status add_port_status
;
143 struct fs_sink_comp
*fs_sink
= NULL
;
144 bt_self_component
*self_comp
= bt_self_component_sink_as_self_component(self_comp_sink
);
145 bt_logging_level log_level
=
146 bt_component_get_logging_level(bt_self_component_as_component(self_comp
));
148 fs_sink
= g_new0(struct fs_sink_comp
, 1);
150 BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR
, log_level
, self_comp
,
151 "Failed to allocate one CTF FS sink structure.");
152 BT_CURRENT_THREAD_ERROR_APPEND_CAUSE_FROM_COMPONENT(
153 self_comp
, "Failed to allocate one CTF FS sink structure.");
154 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
158 fs_sink
->log_level
= log_level
;
159 fs_sink
->self_comp
= self_comp
;
160 fs_sink
->output_dir_path
= g_string_new(NULL
);
161 status
= configure_component(fs_sink
, params
);
162 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
163 /* configure_component() logs errors */
167 if (fs_sink
->assume_single_trace
&&
168 g_file_test(fs_sink
->output_dir_path
->str
, G_FILE_TEST_EXISTS
)) {
169 BT_COMP_LOGE_APPEND_CAUSE(self_comp
,
170 "Single trace mode, but output path exists: output-path=\"%s\"",
171 fs_sink
->output_dir_path
->str
);
172 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
176 status
= ensure_output_dir_exists(fs_sink
);
177 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
178 /* ensure_output_dir_exists() logs errors */
182 fs_sink
->traces
= g_hash_table_new_full(g_direct_hash
, g_direct_equal
, NULL
,
183 (GDestroyNotify
) fs_sink_trace_destroy
);
184 if (!fs_sink
->traces
) {
185 BT_COMP_LOGE_APPEND_CAUSE(self_comp
, "Failed to allocate one GHashTable.");
186 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
191 bt_self_component_sink_add_input_port(self_comp_sink
, in_port_name
, NULL
, NULL
);
192 if (add_port_status
!= BT_SELF_COMPONENT_ADD_PORT_STATUS_OK
) {
193 status
= (bt_component_class_initialize_method_status
) add_port_status
;
194 BT_COMP_LOGE_APPEND_CAUSE(self_comp
, "Failed to add input port.");
198 bt_self_component_set_data(self_comp
, fs_sink
);
201 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
202 destroy_fs_sink_comp(fs_sink
);
208 static inline struct fs_sink_stream
*borrow_stream(struct fs_sink_comp
*fs_sink
,
209 const bt_stream
*ir_stream
)
211 const bt_trace
*ir_trace
= bt_stream_borrow_trace_const(ir_stream
);
212 struct fs_sink_trace
*trace
;
213 struct fs_sink_stream
*stream
= NULL
;
215 trace
= (fs_sink_trace
*) g_hash_table_lookup(fs_sink
->traces
, ir_trace
);
216 if (G_UNLIKELY(!trace
)) {
217 if (fs_sink
->assume_single_trace
&& g_hash_table_size(fs_sink
->traces
) > 0) {
218 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
219 "Single trace mode, but getting more than one trace: "
220 "stream-name=\"%s\"",
221 bt_stream_get_name(ir_stream
));
225 trace
= fs_sink_trace_create(fs_sink
, ir_trace
);
231 stream
= (fs_sink_stream
*) g_hash_table_lookup(trace
->streams
, ir_stream
);
232 if (G_UNLIKELY(!stream
)) {
233 stream
= fs_sink_stream_create(trace
, ir_stream
);
243 static inline bt_component_class_sink_consume_method_status
244 handle_event_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
247 bt_component_class_sink_consume_method_status status
=
248 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
249 const bt_event
*ir_event
= bt_message_event_borrow_event_const(msg
);
250 const bt_stream
*ir_stream
= bt_event_borrow_stream_const(ir_event
);
251 struct fs_sink_stream
*stream
;
252 struct fs_sink_ctf_event_class
*ec
= NULL
;
253 const bt_clock_snapshot
*cs
= NULL
;
255 stream
= borrow_stream(fs_sink
, ir_stream
);
256 if (G_UNLIKELY(!stream
)) {
257 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
258 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
262 ret
= try_translate_event_class_trace_ir_to_ctf_ir(fs_sink
, stream
->sc
,
263 bt_event_borrow_class_const(ir_event
), &ec
);
265 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to translate event class to CTF IR.");
266 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
272 if (stream
->sc
->default_clock_class
) {
273 cs
= bt_message_event_borrow_default_clock_snapshot_const(msg
);
277 * If this event's stream does not support packets, then we
278 * lazily create artificial packets.
280 * The size of an artificial packet is arbitrarily at least
281 * 4 MiB (it usually is greater because we close it when
282 * comes the time to write a new event and the packet's content
283 * size is >= 4 MiB), except the last one which can be smaller.
285 if (G_UNLIKELY(!stream
->sc
->has_packets
)) {
286 if (stream
->packet_state
.is_open
&&
287 bt_ctfser_get_offset_in_current_packet_bits(&stream
->ctfser
) / 8 >= 4 * 1024 * 1024) {
289 * Stream's current packet is larger than 4 MiB:
290 * close it. A new packet will be opened just
293 ret
= fs_sink_stream_close_packet(stream
, NULL
);
295 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to close packet.");
296 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
301 if (!stream
->packet_state
.is_open
) {
302 /* Stream's packet is not currently opened: open it */
303 ret
= fs_sink_stream_open_packet(stream
, NULL
, NULL
);
305 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to open packet.");
306 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
312 BT_ASSERT_DBG(stream
->packet_state
.is_open
);
313 ret
= fs_sink_stream_write_event(stream
, cs
, ir_event
, ec
);
314 if (G_UNLIKELY(ret
)) {
315 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to write event.");
316 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
324 static inline bt_component_class_sink_consume_method_status
325 handle_packet_beginning_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
328 bt_component_class_sink_consume_method_status status
=
329 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
330 const bt_packet
*ir_packet
= bt_message_packet_beginning_borrow_packet_const(msg
);
331 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
332 struct fs_sink_stream
*stream
;
333 const bt_clock_snapshot
*cs
= NULL
;
335 stream
= borrow_stream(fs_sink
, ir_stream
);
336 if (G_UNLIKELY(!stream
)) {
337 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
338 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
342 if (stream
->sc
->packets_have_ts_begin
) {
343 cs
= bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg
);
348 * If we previously received a discarded events message with
349 * a time range, make sure that its beginning time matches what's
350 * expected for CTF 1.8, that is:
352 * * Its beginning time is the previous packet's end
353 * time (or the current packet's beginning time if
354 * this is the first packet).
356 * We check this here instead of in handle_packet_end_msg()
357 * because we want to catch any incompatible message as early as
358 * possible to report the error.
360 * Validation of the discarded events message's end time is
361 * performed in handle_packet_end_msg().
363 if (stream
->discarded_events_state
.in_range
) {
364 uint64_t expected_cs
;
367 * `stream->discarded_events_state.in_range` is only set
368 * when the stream class's discarded events have a time
371 * It is required that the packet beginning and end
372 * messages for this stream class have times when
373 * discarded events have a time range.
375 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
376 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
377 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
379 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
380 /* We're opening the first packet */
381 expected_cs
= bt_clock_snapshot_get_value(cs
);
383 expected_cs
= stream
->prev_packet_state
.end_cs
;
386 if (stream
->discarded_events_state
.beginning_cs
!= expected_cs
) {
387 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
388 "Incompatible discarded events message: "
389 "unexpected beginning time: "
390 "beginning-cs-val=%" PRIu64
", "
391 "expected-beginning-cs-val=%" PRIu64
", "
392 "stream-id=%" PRIu64
", stream-name=\"%s\", "
393 "trace-name=\"%s\", path=\"%s/%s\"",
394 stream
->discarded_events_state
.beginning_cs
, expected_cs
,
395 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
396 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
397 stream
->trace
->path
->str
, stream
->file_name
->str
);
398 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
404 * If we previously received a discarded packets message with a
405 * time range, make sure that its beginning and end times match
406 * what's expected for CTF 1.8, that is:
408 * * Its beginning time is the previous packet's end time.
410 * * Its end time is the current packet's beginning time.
412 if (stream
->discarded_packets_state
.in_range
) {
413 uint64_t expected_end_cs
;
416 * `stream->discarded_packets_state.in_range` is only
417 * set when the stream class's discarded packets have a
420 * It is required that the packet beginning and end
421 * messages for this stream class have times when
422 * discarded packets have a time range.
424 BT_ASSERT(stream
->sc
->discarded_packets_has_ts
);
425 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
426 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
429 * It is not supported to have a discarded packets
430 * message _before_ the first packet: we cannot validate
431 * that its beginning time is compatible with CTF 1.8 in
434 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
435 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
436 "Incompatible discarded packets message "
437 "occurring before the stream's first packet: "
438 "stream-id=%" PRIu64
", stream-name=\"%s\", "
439 "trace-name=\"%s\", path=\"%s/%s\"",
440 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
441 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
442 stream
->trace
->path
->str
, stream
->file_name
->str
);
443 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
447 if (stream
->discarded_packets_state
.beginning_cs
!= stream
->prev_packet_state
.end_cs
) {
448 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
449 "Incompatible discarded packets message: "
450 "unexpected beginning time: "
451 "beginning-cs-val=%" PRIu64
", "
452 "expected-beginning-cs-val=%" PRIu64
", "
453 "stream-id=%" PRIu64
", stream-name=\"%s\", "
454 "trace-name=\"%s\", path=\"%s/%s\"",
455 stream
->discarded_packets_state
.beginning_cs
,
456 stream
->prev_packet_state
.end_cs
, bt_stream_get_id(ir_stream
),
457 bt_stream_get_name(ir_stream
),
458 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
459 stream
->trace
->path
->str
, stream
->file_name
->str
);
460 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
464 expected_end_cs
= bt_clock_snapshot_get_value(cs
);
466 if (stream
->discarded_packets_state
.end_cs
!= expected_end_cs
) {
467 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
468 "Incompatible discarded packets message: "
469 "unexpected end time: "
470 "end-cs-val=%" PRIu64
", "
471 "expected-end-cs-val=%" PRIu64
", "
472 "stream-id=%" PRIu64
", stream-name=\"%s\", "
473 "trace-name=\"%s\", path=\"%s/%s\"",
474 stream
->discarded_packets_state
.end_cs
, expected_end_cs
,
475 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
476 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
477 stream
->trace
->path
->str
, stream
->file_name
->str
);
478 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
484 * We're not in a discarded packets time range anymore since we
485 * require that the discarded packets time ranges go from one
486 * packet's end time to the next packet's beginning time, and
487 * we're handling a packet beginning message here.
489 stream
->discarded_packets_state
.in_range
= false;
491 ret
= fs_sink_stream_open_packet(stream
, cs
, ir_packet
);
493 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to open packet.");
494 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
502 static inline bt_component_class_sink_consume_method_status
503 handle_packet_end_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
506 bt_component_class_sink_consume_method_status status
=
507 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
508 const bt_packet
*ir_packet
= bt_message_packet_end_borrow_packet_const(msg
);
509 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
510 struct fs_sink_stream
*stream
;
511 const bt_clock_snapshot
*cs
= NULL
;
513 stream
= borrow_stream(fs_sink
, ir_stream
);
514 if (G_UNLIKELY(!stream
)) {
515 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
516 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
520 if (stream
->sc
->packets_have_ts_end
) {
521 cs
= bt_message_packet_end_borrow_default_clock_snapshot_const(msg
);
526 * If we previously received a discarded events message with
527 * a time range, make sure that its end time matches what's
528 * expected for CTF 1.8, that is:
530 * * Its end time is the current packet's end time.
532 * Validation of the discarded events message's beginning time
533 * is performed in handle_packet_beginning_msg().
535 if (stream
->discarded_events_state
.in_range
) {
536 uint64_t expected_cs
;
539 * `stream->discarded_events_state.in_range` is only set
540 * when the stream class's discarded events have a time
543 * It is required that the packet beginning and end
544 * messages for this stream class have times when
545 * discarded events have a time range.
547 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
548 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
549 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
551 expected_cs
= bt_clock_snapshot_get_value(cs
);
553 if (stream
->discarded_events_state
.end_cs
!= expected_cs
) {
554 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
555 "Incompatible discarded events message: "
556 "unexpected end time: "
557 "end-cs-val=%" PRIu64
", "
558 "expected-end-cs-val=%" PRIu64
", "
559 "stream-id=%" PRIu64
", stream-name=\"%s\", "
560 "trace-name=\"%s\", path=\"%s/%s\"",
561 stream
->discarded_events_state
.end_cs
, expected_cs
,
562 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
563 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
564 stream
->trace
->path
->str
, stream
->file_name
->str
);
565 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
570 ret
= fs_sink_stream_close_packet(stream
, cs
);
572 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to close packet.");
573 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
578 * We're not in a discarded events time range anymore since we
579 * require that the discarded events time ranges go from one
580 * packet's end time to the next packet's end time, and we're
581 * handling a packet end message here.
583 stream
->discarded_events_state
.in_range
= false;
589 static inline bt_component_class_sink_consume_method_status
590 handle_stream_beginning_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
592 bt_component_class_sink_consume_method_status status
=
593 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
594 const bt_stream
*ir_stream
= bt_message_stream_beginning_borrow_stream_const(msg
);
595 const bt_stream_class
*ir_sc
= bt_stream_borrow_class_const(ir_stream
);
596 struct fs_sink_stream
*stream
;
597 bool packets_have_beginning_end_cs
=
598 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc
) &&
599 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc
);
602 * Not supported: discarded events or discarded packets support
603 * without packets support. Packets are the way to know where
604 * discarded events/packets occurred in CTF 1.8.
606 if (!bt_stream_class_supports_packets(ir_sc
)) {
607 BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc
));
609 if (!fs_sink
->ignore_discarded_events
&& bt_stream_class_supports_discarded_events(ir_sc
)) {
610 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
611 "Unsupported stream: "
612 "stream does not support packets, "
613 "but supports discarded events: "
615 "stream-id=%" PRIu64
", "
616 "stream-name=\"%s\"",
617 ir_stream
, bt_stream_get_id(ir_stream
),
618 bt_stream_get_name(ir_stream
));
619 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
625 * Not supported: discarded events with default clock snapshots,
626 * but packet beginning/end without default clock snapshot.
628 if (!fs_sink
->ignore_discarded_events
&&
629 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc
) &&
630 !packets_have_beginning_end_cs
) {
631 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
632 "Unsupported stream: discarded events have "
633 "default clock snapshots, but packets have no "
634 "beginning and/or end default clock snapshots: "
636 "stream-id=%" PRIu64
", "
637 "stream-name=\"%s\"",
638 ir_stream
, bt_stream_get_id(ir_stream
),
639 bt_stream_get_name(ir_stream
));
640 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
645 * Not supported: discarded packets with default clock
646 * snapshots, but packet beginning/end without default clock
649 if (!fs_sink
->ignore_discarded_packets
&&
650 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc
) &&
651 !packets_have_beginning_end_cs
) {
652 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
653 "Unsupported stream: discarded packets have "
654 "default clock snapshots, but packets have no "
655 "beginning and/or end default clock snapshots: "
657 "stream-id=%" PRIu64
", "
658 "stream-name=\"%s\"",
659 ir_stream
, bt_stream_get_id(ir_stream
),
660 bt_stream_get_name(ir_stream
));
661 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
665 stream
= borrow_stream(fs_sink
, ir_stream
);
667 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
668 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
672 BT_COMP_LOGI("Created new, empty stream file: "
673 "stream-id=%" PRIu64
", stream-name=\"%s\", "
674 "trace-name=\"%s\", path=\"%s/%s\"",
675 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
676 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
677 stream
->trace
->path
->str
, stream
->file_name
->str
);
683 static inline bt_component_class_sink_consume_method_status
684 handle_stream_end_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
686 bt_component_class_sink_consume_method_status status
=
687 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
688 const bt_stream
*ir_stream
= bt_message_stream_end_borrow_stream_const(msg
);
689 struct fs_sink_stream
*stream
;
691 stream
= borrow_stream(fs_sink
, ir_stream
);
693 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
694 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
698 if (G_UNLIKELY(!stream
->sc
->has_packets
&& stream
->packet_state
.is_open
)) {
699 /* Close stream's current artificial packet */
700 int ret
= fs_sink_stream_close_packet(stream
, NULL
);
703 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to close packet.");
704 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
709 BT_COMP_LOGI("Closing stream file: "
710 "stream-id=%" PRIu64
", stream-name=\"%s\", "
711 "trace-name=\"%s\", path=\"%s/%s\"",
712 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
713 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
714 stream
->trace
->path
->str
, stream
->file_name
->str
);
717 * This destroys the stream object and frees all its resources,
718 * closing the stream file.
720 g_hash_table_remove(stream
->trace
->streams
, ir_stream
);
726 static inline bt_component_class_sink_consume_method_status
727 handle_discarded_events_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
729 bt_component_class_sink_consume_method_status status
=
730 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
731 const bt_stream
*ir_stream
= bt_message_discarded_events_borrow_stream_const(msg
);
732 struct fs_sink_stream
*stream
;
733 const bt_clock_snapshot
*cs
= NULL
;
734 bt_property_availability avail
;
737 stream
= borrow_stream(fs_sink
, ir_stream
);
739 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
740 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
744 if (fs_sink
->ignore_discarded_events
) {
745 BT_COMP_LOGI("Ignoring discarded events message: "
746 "stream-id=%" PRIu64
", stream-name=\"%s\", "
747 "trace-name=\"%s\", path=\"%s/%s\"",
748 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
749 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
750 stream
->trace
->path
->str
, stream
->file_name
->str
);
754 if (stream
->discarded_events_state
.in_range
) {
755 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
756 "Unsupported contiguous discarded events message: "
757 "stream-id=%" PRIu64
", stream-name=\"%s\", "
758 "trace-name=\"%s\", path=\"%s/%s\"",
759 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
760 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
761 stream
->trace
->path
->str
, stream
->file_name
->str
);
762 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
767 * If we're currently in an opened packet (got a packet
768 * beginning message, but no packet end message yet), we do not
769 * support having a discarded events message with a time range
770 * because we require that the discarded events message's time
771 * range go from a packet's end time to the next packet's end
774 if (stream
->packet_state
.is_open
&& stream
->sc
->discarded_events_has_ts
) {
775 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
776 "Unsupported discarded events message with "
777 "default clock snapshots occurring within a packet: "
778 "stream-id=%" PRIu64
", stream-name=\"%s\", "
779 "trace-name=\"%s\", path=\"%s/%s\"",
780 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
781 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
782 stream
->trace
->path
->str
, stream
->file_name
->str
);
783 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
787 if (stream
->sc
->discarded_events_has_ts
) {
789 * Make the stream's state be in the time range of a
790 * discarded events message since we have the message's
791 * time range (`stream->sc->discarded_events_has_ts`).
793 stream
->discarded_events_state
.in_range
= true;
796 * The clock snapshot values will be validated when
797 * handling the next packet beginning and end messages
798 * (next calls to handle_packet_beginning_msg() and
799 * handle_packet_end_msg()).
801 cs
= bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg
);
803 stream
->discarded_events_state
.beginning_cs
= bt_clock_snapshot_get_value(cs
);
804 cs
= bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg
);
806 stream
->discarded_events_state
.end_cs
= bt_clock_snapshot_get_value(cs
);
809 avail
= bt_message_discarded_events_get_count(msg
, &count
);
810 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
812 * There's no specific count of discarded events: set it
813 * to 1 so that we know that we at least discarded
819 stream
->packet_state
.discarded_events_counter
+= count
;
825 static inline bt_component_class_sink_consume_method_status
826 handle_discarded_packets_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
828 bt_component_class_sink_consume_method_status status
=
829 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
830 const bt_stream
*ir_stream
= bt_message_discarded_packets_borrow_stream_const(msg
);
831 struct fs_sink_stream
*stream
;
832 const bt_clock_snapshot
*cs
= NULL
;
833 bt_property_availability avail
;
836 stream
= borrow_stream(fs_sink
, ir_stream
);
838 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
839 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
843 if (fs_sink
->ignore_discarded_packets
) {
844 BT_COMP_LOGI("Ignoring discarded packets message: "
845 "stream-id=%" PRIu64
", stream-name=\"%s\", "
846 "trace-name=\"%s\", path=\"%s/%s\"",
847 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
848 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
849 stream
->trace
->path
->str
, stream
->file_name
->str
);
853 if (stream
->discarded_packets_state
.in_range
) {
854 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
855 "Unsupported contiguous discarded packets message: "
856 "stream-id=%" PRIu64
", stream-name=\"%s\", "
857 "trace-name=\"%s\", path=\"%s/%s\"",
858 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
859 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
860 stream
->trace
->path
->str
, stream
->file_name
->str
);
861 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
866 * Discarded packets messages are guaranteed to occur between
869 BT_ASSERT(!stream
->packet_state
.is_open
);
871 if (stream
->sc
->discarded_packets_has_ts
) {
873 * Make the stream's state be in the time range of a
874 * discarded packets message since we have the message's
875 * time range (`stream->sc->discarded_packets_has_ts`).
877 stream
->discarded_packets_state
.in_range
= true;
880 * The clock snapshot values will be validated when
881 * handling the next packet beginning message (next call
882 * to handle_packet_beginning_msg()).
884 cs
= bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg
);
886 stream
->discarded_packets_state
.beginning_cs
= bt_clock_snapshot_get_value(cs
);
887 cs
= bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg
);
889 stream
->discarded_packets_state
.end_cs
= bt_clock_snapshot_get_value(cs
);
892 avail
= bt_message_discarded_packets_get_count(msg
, &count
);
893 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
895 * There's no specific count of discarded packets: set
896 * it to 1 so that we know that we at least discarded
902 stream
->packet_state
.seq_num
+= count
;
908 static inline void put_messages(bt_message_array_const msgs
, uint64_t count
)
912 for (i
= 0; i
< count
; i
++) {
913 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
917 bt_component_class_sink_consume_method_status
ctf_fs_sink_consume(bt_self_component_sink
*self_comp
)
919 bt_component_class_sink_consume_method_status status
=
920 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
921 struct fs_sink_comp
*fs_sink
;
922 bt_message_iterator_next_status next_status
;
923 uint64_t msg_count
= 0;
924 bt_message_array_const msgs
;
926 fs_sink
= (fs_sink_comp
*) bt_self_component_get_data(
927 bt_self_component_sink_as_self_component(self_comp
));
928 BT_ASSERT_DBG(fs_sink
);
929 BT_ASSERT_DBG(fs_sink
->upstream_iter
);
931 /* Consume messages */
932 next_status
= bt_message_iterator_next(fs_sink
->upstream_iter
, &msgs
, &msg_count
);
933 if (next_status
< 0) {
934 status
= (bt_component_class_sink_consume_method_status
) next_status
;
935 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
936 "Failed to get next message from upstream iterator.");
940 switch (next_status
) {
941 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK
:
945 for (i
= 0; i
< msg_count
; i
++) {
946 const bt_message
*msg
= msgs
[i
];
950 switch (bt_message_get_type(msg
)) {
951 case BT_MESSAGE_TYPE_EVENT
:
952 status
= handle_event_msg(fs_sink
, msg
);
954 case BT_MESSAGE_TYPE_PACKET_BEGINNING
:
955 status
= handle_packet_beginning_msg(fs_sink
, msg
);
957 case BT_MESSAGE_TYPE_PACKET_END
:
958 status
= handle_packet_end_msg(fs_sink
, msg
);
960 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY
:
962 BT_COMP_LOGD_STR("Ignoring message iterator inactivity message.");
964 case BT_MESSAGE_TYPE_STREAM_BEGINNING
:
965 status
= handle_stream_beginning_msg(fs_sink
, msg
);
967 case BT_MESSAGE_TYPE_STREAM_END
:
968 status
= handle_stream_end_msg(fs_sink
, msg
);
970 case BT_MESSAGE_TYPE_DISCARDED_EVENTS
:
971 status
= handle_discarded_events_msg(fs_sink
, msg
);
973 case BT_MESSAGE_TYPE_DISCARDED_PACKETS
:
974 status
= handle_discarded_packets_msg(fs_sink
, msg
);
980 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
982 if (status
!= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
) {
983 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
984 "Failed to handle message: "
985 "generated CTF traces could be incomplete: "
986 "output-dir-path=\"%s\"",
987 fs_sink
->output_dir_path
->str
);
994 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN
:
995 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN
;
997 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END
:
998 /* TODO: Finalize all traces (should already be done?) */
999 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END
;
1008 BT_ASSERT(status
!= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
);
1009 put_messages(msgs
, msg_count
);
1015 bt_component_class_sink_graph_is_configured_method_status
1016 ctf_fs_sink_graph_is_configured(bt_self_component_sink
*self_comp
)
1018 bt_component_class_sink_graph_is_configured_method_status status
;
1019 bt_message_iterator_create_from_sink_component_status msg_iter_status
;
1020 fs_sink_comp
*fs_sink
= (fs_sink_comp
*) bt_self_component_get_data(
1021 bt_self_component_sink_as_self_component(self_comp
));
1023 msg_iter_status
= bt_message_iterator_create_from_sink_component(
1024 self_comp
, bt_self_component_sink_borrow_input_port_by_name(self_comp
, in_port_name
),
1025 &fs_sink
->upstream_iter
);
1026 if (msg_iter_status
!= BT_MESSAGE_ITERATOR_CREATE_FROM_SINK_COMPONENT_STATUS_OK
) {
1027 status
= (bt_component_class_sink_graph_is_configured_method_status
) msg_iter_status
;
1028 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to create upstream iterator.");
1032 status
= BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK
;
1037 void ctf_fs_sink_finalize(bt_self_component_sink
*self_comp
)
1039 fs_sink_comp
*fs_sink
= (fs_sink_comp
*) bt_self_component_get_data(
1040 bt_self_component_sink_as_self_component(self_comp
));
1042 destroy_fs_sink_comp(fs_sink
);