2 * SPDX-License-Identifier: MIT
4 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
10 #include <babeltrace2/babeltrace.h>
12 #define BT_COMP_LOG_SELF_COMP (fs_sink->self_comp)
13 #define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level)
14 #define BT_LOG_TAG "PLUGIN/SINK.CTF.FS"
15 #include "logging/comp-logging.h"
17 #include "common/assert.h"
18 #include "ctfser/ctfser.h"
20 #include "plugins/common/param-validation/param-validation.h"
22 #include "fs-sink-ctf-meta.hpp"
23 #include "fs-sink-stream.hpp"
24 #include "fs-sink-trace.hpp"
25 #include "fs-sink.hpp"
26 #include "translate-ctf-ir-to-tsdl.hpp"
27 #include "translate-trace-ir-to-ctf-ir.hpp"
29 static const char * const in_port_name
= "in";
31 static bt_component_class_initialize_method_status
32 ensure_output_dir_exists(struct fs_sink_comp
*fs_sink
)
34 bt_component_class_initialize_method_status status
=
35 BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
;
38 ret
= g_mkdir_with_parents(fs_sink
->output_dir_path
->str
, 0755);
40 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(fs_sink
->self_comp
,
41 "Cannot create directories for output directory",
42 ": output-dir-path=\"%s\"", fs_sink
->output_dir_path
->str
);
43 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
51 static bt_param_validation_map_value_entry_descr fs_sink_params_descr
[] = {
52 {"path", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY
,
53 bt_param_validation_value_descr::makeString()},
54 {"assume-single-trace", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
55 bt_param_validation_value_descr::makeBool()},
56 {"ignore-discarded-events", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
57 bt_param_validation_value_descr::makeBool()},
58 {"ignore-discarded-packets", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
59 bt_param_validation_value_descr::makeBool()},
60 {"quiet", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
61 bt_param_validation_value_descr::makeBool()},
62 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END
};
64 static bt_component_class_initialize_method_status
configure_component(struct fs_sink_comp
*fs_sink
,
65 const bt_value
*params
)
67 bt_component_class_initialize_method_status status
;
68 const bt_value
*value
;
69 enum bt_param_validation_status validation_status
;
70 gchar
*validation_error
;
73 bt_param_validation_validate(params
, fs_sink_params_descr
, &validation_error
);
74 if (validation_status
== BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR
) {
75 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
76 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "%s", validation_error
);
78 } else if (validation_status
== BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR
) {
79 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
83 value
= bt_value_map_borrow_entry_value_const(params
, "path");
84 g_string_assign(fs_sink
->output_dir_path
, bt_value_string_get(value
));
86 value
= bt_value_map_borrow_entry_value_const(params
, "assume-single-trace");
88 fs_sink
->assume_single_trace
= (bool) bt_value_bool_get(value
);
91 value
= bt_value_map_borrow_entry_value_const(params
, "ignore-discarded-events");
93 fs_sink
->ignore_discarded_events
= (bool) bt_value_bool_get(value
);
96 value
= bt_value_map_borrow_entry_value_const(params
, "ignore-discarded-packets");
98 fs_sink
->ignore_discarded_packets
= (bool) bt_value_bool_get(value
);
101 value
= bt_value_map_borrow_entry_value_const(params
, "quiet");
103 fs_sink
->quiet
= (bool) bt_value_bool_get(value
);
106 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
;
109 g_free(validation_error
);
113 static void destroy_fs_sink_comp(struct fs_sink_comp
*fs_sink
)
119 if (fs_sink
->output_dir_path
) {
120 g_string_free(fs_sink
->output_dir_path
, TRUE
);
121 fs_sink
->output_dir_path
= NULL
;
124 if (fs_sink
->traces
) {
125 g_hash_table_destroy(fs_sink
->traces
);
126 fs_sink
->traces
= NULL
;
129 BT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(fs_sink
->upstream_iter
);
136 bt_component_class_initialize_method_status
ctf_fs_sink_init(bt_self_component_sink
*self_comp_sink
,
137 bt_self_component_sink_configuration
*,
138 const bt_value
*params
, void *)
140 bt_component_class_initialize_method_status status
;
141 bt_self_component_add_port_status add_port_status
;
142 struct fs_sink_comp
*fs_sink
= NULL
;
143 bt_self_component
*self_comp
= bt_self_component_sink_as_self_component(self_comp_sink
);
144 bt_logging_level log_level
=
145 bt_component_get_logging_level(bt_self_component_as_component(self_comp
));
147 fs_sink
= g_new0(struct fs_sink_comp
, 1);
149 BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR
, log_level
, self_comp
,
150 "Failed to allocate one CTF FS sink structure.");
151 BT_CURRENT_THREAD_ERROR_APPEND_CAUSE_FROM_COMPONENT(
152 self_comp
, "Failed to allocate one CTF FS sink structure.");
153 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
157 fs_sink
->log_level
= log_level
;
158 fs_sink
->self_comp
= self_comp
;
159 fs_sink
->output_dir_path
= g_string_new(NULL
);
160 status
= configure_component(fs_sink
, params
);
161 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
162 /* configure_component() logs errors */
166 if (fs_sink
->assume_single_trace
&&
167 g_file_test(fs_sink
->output_dir_path
->str
, G_FILE_TEST_EXISTS
)) {
168 BT_COMP_LOGE_APPEND_CAUSE(self_comp
,
169 "Single trace mode, but output path exists: output-path=\"%s\"",
170 fs_sink
->output_dir_path
->str
);
171 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
175 status
= ensure_output_dir_exists(fs_sink
);
176 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
177 /* ensure_output_dir_exists() logs errors */
181 fs_sink
->traces
= g_hash_table_new_full(g_direct_hash
, g_direct_equal
, NULL
,
182 (GDestroyNotify
) fs_sink_trace_destroy
);
183 if (!fs_sink
->traces
) {
184 BT_COMP_LOGE_APPEND_CAUSE(self_comp
, "Failed to allocate one GHashTable.");
185 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
190 bt_self_component_sink_add_input_port(self_comp_sink
, in_port_name
, NULL
, NULL
);
191 if (add_port_status
!= BT_SELF_COMPONENT_ADD_PORT_STATUS_OK
) {
192 status
= (bt_component_class_initialize_method_status
) add_port_status
;
193 BT_COMP_LOGE_APPEND_CAUSE(self_comp
, "Failed to add input port.");
197 bt_self_component_set_data(self_comp
, fs_sink
);
200 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
201 destroy_fs_sink_comp(fs_sink
);
207 static inline struct fs_sink_stream
*borrow_stream(struct fs_sink_comp
*fs_sink
,
208 const bt_stream
*ir_stream
)
210 const bt_trace
*ir_trace
= bt_stream_borrow_trace_const(ir_stream
);
211 struct fs_sink_trace
*trace
;
212 struct fs_sink_stream
*stream
= NULL
;
214 trace
= (fs_sink_trace
*) g_hash_table_lookup(fs_sink
->traces
, ir_trace
);
215 if (G_UNLIKELY(!trace
)) {
216 if (fs_sink
->assume_single_trace
&& g_hash_table_size(fs_sink
->traces
) > 0) {
217 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
218 "Single trace mode, but getting more than one trace: "
219 "stream-name=\"%s\"",
220 bt_stream_get_name(ir_stream
));
224 trace
= fs_sink_trace_create(fs_sink
, ir_trace
);
230 stream
= (fs_sink_stream
*) g_hash_table_lookup(trace
->streams
, ir_stream
);
231 if (G_UNLIKELY(!stream
)) {
232 stream
= fs_sink_stream_create(trace
, ir_stream
);
242 static inline bt_component_class_sink_consume_method_status
243 handle_event_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
246 bt_component_class_sink_consume_method_status status
=
247 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
248 const bt_event
*ir_event
= bt_message_event_borrow_event_const(msg
);
249 const bt_stream
*ir_stream
= bt_event_borrow_stream_const(ir_event
);
250 struct fs_sink_stream
*stream
;
251 struct fs_sink_ctf_event_class
*ec
= NULL
;
252 const bt_clock_snapshot
*cs
= NULL
;
254 stream
= borrow_stream(fs_sink
, ir_stream
);
255 if (G_UNLIKELY(!stream
)) {
256 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
257 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
261 ret
= try_translate_event_class_trace_ir_to_ctf_ir(fs_sink
, stream
->sc
,
262 bt_event_borrow_class_const(ir_event
), &ec
);
264 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to translate event class to CTF IR.");
265 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
271 if (stream
->sc
->default_clock_class
) {
272 cs
= bt_message_event_borrow_default_clock_snapshot_const(msg
);
276 * If this event's stream does not support packets, then we
277 * lazily create artificial packets.
279 * The size of an artificial packet is arbitrarily at least
280 * 4 MiB (it usually is greater because we close it when
281 * comes the time to write a new event and the packet's content
282 * size is >= 4 MiB), except the last one which can be smaller.
284 if (G_UNLIKELY(!stream
->sc
->has_packets
)) {
285 if (stream
->packet_state
.is_open
&&
286 bt_ctfser_get_offset_in_current_packet_bits(&stream
->ctfser
) / 8 >= 4 * 1024 * 1024) {
288 * Stream's current packet is larger than 4 MiB:
289 * close it. A new packet will be opened just
292 ret
= fs_sink_stream_close_packet(stream
, NULL
);
294 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to close packet.");
295 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
300 if (!stream
->packet_state
.is_open
) {
301 /* Stream's packet is not currently opened: open it */
302 ret
= fs_sink_stream_open_packet(stream
, NULL
, NULL
);
304 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to open packet.");
305 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
311 BT_ASSERT_DBG(stream
->packet_state
.is_open
);
312 ret
= fs_sink_stream_write_event(stream
, cs
, ir_event
, ec
);
313 if (G_UNLIKELY(ret
)) {
314 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to write event.");
315 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
323 static inline bt_component_class_sink_consume_method_status
324 handle_packet_beginning_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
327 bt_component_class_sink_consume_method_status status
=
328 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
329 const bt_packet
*ir_packet
= bt_message_packet_beginning_borrow_packet_const(msg
);
330 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
331 struct fs_sink_stream
*stream
;
332 const bt_clock_snapshot
*cs
= NULL
;
334 stream
= borrow_stream(fs_sink
, ir_stream
);
335 if (G_UNLIKELY(!stream
)) {
336 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
337 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
341 if (stream
->sc
->packets_have_ts_begin
) {
342 cs
= bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg
);
347 * If we previously received a discarded events message with
348 * a time range, make sure that its beginning time matches what's
349 * expected for CTF 1.8, that is:
351 * * Its beginning time is the previous packet's end
352 * time (or the current packet's beginning time if
353 * this is the first packet).
355 * We check this here instead of in handle_packet_end_msg()
356 * because we want to catch any incompatible message as early as
357 * possible to report the error.
359 * Validation of the discarded events message's end time is
360 * performed in handle_packet_end_msg().
362 if (stream
->discarded_events_state
.in_range
) {
363 uint64_t expected_cs
;
366 * `stream->discarded_events_state.in_range` is only set
367 * when the stream class's discarded events have a time
370 * It is required that the packet beginning and end
371 * messages for this stream class have times when
372 * discarded events have a time range.
374 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
375 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
376 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
378 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
379 /* We're opening the first packet */
380 expected_cs
= bt_clock_snapshot_get_value(cs
);
382 expected_cs
= stream
->prev_packet_state
.end_cs
;
385 if (stream
->discarded_events_state
.beginning_cs
!= expected_cs
) {
386 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
387 "Incompatible discarded events message: "
388 "unexpected beginning time: "
389 "beginning-cs-val=%" PRIu64
", "
390 "expected-beginning-cs-val=%" PRIu64
", "
391 "stream-id=%" PRIu64
", stream-name=\"%s\", "
392 "trace-name=\"%s\", path=\"%s/%s\"",
393 stream
->discarded_events_state
.beginning_cs
, expected_cs
,
394 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
395 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
396 stream
->trace
->path
->str
, stream
->file_name
->str
);
397 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
403 * If we previously received a discarded packets message with a
404 * time range, make sure that its beginning and end times match
405 * what's expected for CTF 1.8, that is:
407 * * Its beginning time is the previous packet's end time.
409 * * Its end time is the current packet's beginning time.
411 if (stream
->discarded_packets_state
.in_range
) {
412 uint64_t expected_end_cs
;
415 * `stream->discarded_packets_state.in_range` is only
416 * set when the stream class's discarded packets have a
419 * It is required that the packet beginning and end
420 * messages for this stream class have times when
421 * discarded packets have a time range.
423 BT_ASSERT(stream
->sc
->discarded_packets_has_ts
);
424 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
425 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
428 * It is not supported to have a discarded packets
429 * message _before_ the first packet: we cannot validate
430 * that its beginning time is compatible with CTF 1.8 in
433 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
434 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
435 "Incompatible discarded packets message "
436 "occurring before the stream's first packet: "
437 "stream-id=%" PRIu64
", stream-name=\"%s\", "
438 "trace-name=\"%s\", path=\"%s/%s\"",
439 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
440 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
441 stream
->trace
->path
->str
, stream
->file_name
->str
);
442 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
446 if (stream
->discarded_packets_state
.beginning_cs
!= stream
->prev_packet_state
.end_cs
) {
447 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
448 "Incompatible discarded packets message: "
449 "unexpected beginning time: "
450 "beginning-cs-val=%" PRIu64
", "
451 "expected-beginning-cs-val=%" PRIu64
", "
452 "stream-id=%" PRIu64
", stream-name=\"%s\", "
453 "trace-name=\"%s\", path=\"%s/%s\"",
454 stream
->discarded_packets_state
.beginning_cs
,
455 stream
->prev_packet_state
.end_cs
, bt_stream_get_id(ir_stream
),
456 bt_stream_get_name(ir_stream
),
457 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
458 stream
->trace
->path
->str
, stream
->file_name
->str
);
459 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
463 expected_end_cs
= bt_clock_snapshot_get_value(cs
);
465 if (stream
->discarded_packets_state
.end_cs
!= expected_end_cs
) {
466 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
467 "Incompatible discarded packets message: "
468 "unexpected end time: "
469 "end-cs-val=%" PRIu64
", "
470 "expected-end-cs-val=%" PRIu64
", "
471 "stream-id=%" PRIu64
", stream-name=\"%s\", "
472 "trace-name=\"%s\", path=\"%s/%s\"",
473 stream
->discarded_packets_state
.end_cs
, expected_end_cs
,
474 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
475 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
476 stream
->trace
->path
->str
, stream
->file_name
->str
);
477 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
483 * We're not in a discarded packets time range anymore since we
484 * require that the discarded packets time ranges go from one
485 * packet's end time to the next packet's beginning time, and
486 * we're handling a packet beginning message here.
488 stream
->discarded_packets_state
.in_range
= false;
490 ret
= fs_sink_stream_open_packet(stream
, cs
, ir_packet
);
492 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to open packet.");
493 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
501 static inline bt_component_class_sink_consume_method_status
502 handle_packet_end_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
505 bt_component_class_sink_consume_method_status status
=
506 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
507 const bt_packet
*ir_packet
= bt_message_packet_end_borrow_packet_const(msg
);
508 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
509 struct fs_sink_stream
*stream
;
510 const bt_clock_snapshot
*cs
= NULL
;
512 stream
= borrow_stream(fs_sink
, ir_stream
);
513 if (G_UNLIKELY(!stream
)) {
514 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
515 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
519 if (stream
->sc
->packets_have_ts_end
) {
520 cs
= bt_message_packet_end_borrow_default_clock_snapshot_const(msg
);
525 * If we previously received a discarded events message with
526 * a time range, make sure that its end time matches what's
527 * expected for CTF 1.8, that is:
529 * * Its end time is the current packet's end time.
531 * Validation of the discarded events message's beginning time
532 * is performed in handle_packet_beginning_msg().
534 if (stream
->discarded_events_state
.in_range
) {
535 uint64_t expected_cs
;
538 * `stream->discarded_events_state.in_range` is only set
539 * when the stream class's discarded events have a time
542 * It is required that the packet beginning and end
543 * messages for this stream class have times when
544 * discarded events have a time range.
546 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
547 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
548 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
550 expected_cs
= bt_clock_snapshot_get_value(cs
);
552 if (stream
->discarded_events_state
.end_cs
!= expected_cs
) {
553 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
554 "Incompatible discarded events message: "
555 "unexpected end time: "
556 "end-cs-val=%" PRIu64
", "
557 "expected-end-cs-val=%" PRIu64
", "
558 "stream-id=%" PRIu64
", stream-name=\"%s\", "
559 "trace-name=\"%s\", path=\"%s/%s\"",
560 stream
->discarded_events_state
.end_cs
, expected_cs
,
561 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
562 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
563 stream
->trace
->path
->str
, stream
->file_name
->str
);
564 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
569 ret
= fs_sink_stream_close_packet(stream
, cs
);
571 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to close packet.");
572 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
577 * We're not in a discarded events time range anymore since we
578 * require that the discarded events time ranges go from one
579 * packet's end time to the next packet's end time, and we're
580 * handling a packet end message here.
582 stream
->discarded_events_state
.in_range
= false;
588 static inline bt_component_class_sink_consume_method_status
589 handle_stream_beginning_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
591 bt_component_class_sink_consume_method_status status
=
592 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
593 const bt_stream
*ir_stream
= bt_message_stream_beginning_borrow_stream_const(msg
);
594 const bt_stream_class
*ir_sc
= bt_stream_borrow_class_const(ir_stream
);
595 struct fs_sink_stream
*stream
;
596 bool packets_have_beginning_end_cs
=
597 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc
) &&
598 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc
);
601 * Not supported: discarded events or discarded packets support
602 * without packets support. Packets are the way to know where
603 * discarded events/packets occurred in CTF 1.8.
605 if (!bt_stream_class_supports_packets(ir_sc
)) {
606 BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc
));
608 if (!fs_sink
->ignore_discarded_events
&& bt_stream_class_supports_discarded_events(ir_sc
)) {
609 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
610 "Unsupported stream: "
611 "stream does not support packets, "
612 "but supports discarded events: "
614 "stream-id=%" PRIu64
", "
615 "stream-name=\"%s\"",
616 ir_stream
, bt_stream_get_id(ir_stream
),
617 bt_stream_get_name(ir_stream
));
618 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
624 * Not supported: discarded events with default clock snapshots,
625 * but packet beginning/end without default clock snapshot.
627 if (!fs_sink
->ignore_discarded_events
&&
628 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc
) &&
629 !packets_have_beginning_end_cs
) {
630 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
631 "Unsupported stream: discarded events have "
632 "default clock snapshots, but packets have no "
633 "beginning and/or end default clock snapshots: "
635 "stream-id=%" PRIu64
", "
636 "stream-name=\"%s\"",
637 ir_stream
, bt_stream_get_id(ir_stream
),
638 bt_stream_get_name(ir_stream
));
639 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
644 * Not supported: discarded packets with default clock
645 * snapshots, but packet beginning/end without default clock
648 if (!fs_sink
->ignore_discarded_packets
&&
649 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc
) &&
650 !packets_have_beginning_end_cs
) {
651 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
652 "Unsupported stream: discarded packets have "
653 "default clock snapshots, but packets have no "
654 "beginning and/or end default clock snapshots: "
656 "stream-id=%" PRIu64
", "
657 "stream-name=\"%s\"",
658 ir_stream
, bt_stream_get_id(ir_stream
),
659 bt_stream_get_name(ir_stream
));
660 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
664 stream
= borrow_stream(fs_sink
, ir_stream
);
666 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
667 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
671 BT_COMP_LOGI("Created new, empty stream file: "
672 "stream-id=%" PRIu64
", stream-name=\"%s\", "
673 "trace-name=\"%s\", path=\"%s/%s\"",
674 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
675 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
676 stream
->trace
->path
->str
, stream
->file_name
->str
);
682 static inline bt_component_class_sink_consume_method_status
683 handle_stream_end_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
685 bt_component_class_sink_consume_method_status status
=
686 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
687 const bt_stream
*ir_stream
= bt_message_stream_end_borrow_stream_const(msg
);
688 struct fs_sink_stream
*stream
;
690 stream
= borrow_stream(fs_sink
, ir_stream
);
692 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
693 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
697 if (G_UNLIKELY(!stream
->sc
->has_packets
&& stream
->packet_state
.is_open
)) {
698 /* Close stream's current artificial packet */
699 int ret
= fs_sink_stream_close_packet(stream
, NULL
);
702 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to close packet.");
703 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
708 BT_COMP_LOGI("Closing stream file: "
709 "stream-id=%" PRIu64
", stream-name=\"%s\", "
710 "trace-name=\"%s\", path=\"%s/%s\"",
711 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
712 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
713 stream
->trace
->path
->str
, stream
->file_name
->str
);
716 * This destroys the stream object and frees all its resources,
717 * closing the stream file.
719 g_hash_table_remove(stream
->trace
->streams
, ir_stream
);
725 static inline bt_component_class_sink_consume_method_status
726 handle_discarded_events_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
728 bt_component_class_sink_consume_method_status status
=
729 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
730 const bt_stream
*ir_stream
= bt_message_discarded_events_borrow_stream_const(msg
);
731 struct fs_sink_stream
*stream
;
732 const bt_clock_snapshot
*cs
= NULL
;
733 bt_property_availability avail
;
736 stream
= borrow_stream(fs_sink
, ir_stream
);
738 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
739 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
743 if (fs_sink
->ignore_discarded_events
) {
744 BT_COMP_LOGI("Ignoring discarded events message: "
745 "stream-id=%" PRIu64
", stream-name=\"%s\", "
746 "trace-name=\"%s\", path=\"%s/%s\"",
747 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
748 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
749 stream
->trace
->path
->str
, stream
->file_name
->str
);
753 if (stream
->discarded_events_state
.in_range
) {
754 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
755 "Unsupported contiguous discarded events message: "
756 "stream-id=%" PRIu64
", stream-name=\"%s\", "
757 "trace-name=\"%s\", path=\"%s/%s\"",
758 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
759 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
760 stream
->trace
->path
->str
, stream
->file_name
->str
);
761 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
766 * If we're currently in an opened packet (got a packet
767 * beginning message, but no packet end message yet), we do not
768 * support having a discarded events message with a time range
769 * because we require that the discarded events message's time
770 * range go from a packet's end time to the next packet's end
773 if (stream
->packet_state
.is_open
&& stream
->sc
->discarded_events_has_ts
) {
774 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
775 "Unsupported discarded events message with "
776 "default clock snapshots occurring within a packet: "
777 "stream-id=%" PRIu64
", stream-name=\"%s\", "
778 "trace-name=\"%s\", path=\"%s/%s\"",
779 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
780 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
781 stream
->trace
->path
->str
, stream
->file_name
->str
);
782 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
786 if (stream
->sc
->discarded_events_has_ts
) {
788 * Make the stream's state be in the time range of a
789 * discarded events message since we have the message's
790 * time range (`stream->sc->discarded_events_has_ts`).
792 stream
->discarded_events_state
.in_range
= true;
795 * The clock snapshot values will be validated when
796 * handling the next packet beginning and end messages
797 * (next calls to handle_packet_beginning_msg() and
798 * handle_packet_end_msg()).
800 cs
= bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg
);
802 stream
->discarded_events_state
.beginning_cs
= bt_clock_snapshot_get_value(cs
);
803 cs
= bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg
);
805 stream
->discarded_events_state
.end_cs
= bt_clock_snapshot_get_value(cs
);
808 avail
= bt_message_discarded_events_get_count(msg
, &count
);
809 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
811 * There's no specific count of discarded events: set it
812 * to 1 so that we know that we at least discarded
818 stream
->packet_state
.discarded_events_counter
+= count
;
824 static inline bt_component_class_sink_consume_method_status
825 handle_discarded_packets_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
827 bt_component_class_sink_consume_method_status status
=
828 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
829 const bt_stream
*ir_stream
= bt_message_discarded_packets_borrow_stream_const(msg
);
830 struct fs_sink_stream
*stream
;
831 const bt_clock_snapshot
*cs
= NULL
;
832 bt_property_availability avail
;
835 stream
= borrow_stream(fs_sink
, ir_stream
);
837 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
838 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
842 if (fs_sink
->ignore_discarded_packets
) {
843 BT_COMP_LOGI("Ignoring discarded packets message: "
844 "stream-id=%" PRIu64
", stream-name=\"%s\", "
845 "trace-name=\"%s\", path=\"%s/%s\"",
846 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
847 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
848 stream
->trace
->path
->str
, stream
->file_name
->str
);
852 if (stream
->discarded_packets_state
.in_range
) {
853 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
854 "Unsupported contiguous discarded packets message: "
855 "stream-id=%" PRIu64
", stream-name=\"%s\", "
856 "trace-name=\"%s\", path=\"%s/%s\"",
857 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
858 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
859 stream
->trace
->path
->str
, stream
->file_name
->str
);
860 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
865 * Discarded packets messages are guaranteed to occur between
868 BT_ASSERT(!stream
->packet_state
.is_open
);
870 if (stream
->sc
->discarded_packets_has_ts
) {
872 * Make the stream's state be in the time range of a
873 * discarded packets message since we have the message's
874 * time range (`stream->sc->discarded_packets_has_ts`).
876 stream
->discarded_packets_state
.in_range
= true;
879 * The clock snapshot values will be validated when
880 * handling the next packet beginning message (next call
881 * to handle_packet_beginning_msg()).
883 cs
= bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg
);
885 stream
->discarded_packets_state
.beginning_cs
= bt_clock_snapshot_get_value(cs
);
886 cs
= bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg
);
888 stream
->discarded_packets_state
.end_cs
= bt_clock_snapshot_get_value(cs
);
891 avail
= bt_message_discarded_packets_get_count(msg
, &count
);
892 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
894 * There's no specific count of discarded packets: set
895 * it to 1 so that we know that we at least discarded
901 stream
->packet_state
.seq_num
+= count
;
907 static inline void put_messages(bt_message_array_const msgs
, uint64_t count
)
911 for (i
= 0; i
< count
; i
++) {
912 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
916 bt_component_class_sink_consume_method_status
ctf_fs_sink_consume(bt_self_component_sink
*self_comp
)
918 bt_component_class_sink_consume_method_status status
=
919 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
920 struct fs_sink_comp
*fs_sink
;
921 bt_message_iterator_next_status next_status
;
922 uint64_t msg_count
= 0;
923 bt_message_array_const msgs
;
925 fs_sink
= (fs_sink_comp
*) bt_self_component_get_data(
926 bt_self_component_sink_as_self_component(self_comp
));
927 BT_ASSERT_DBG(fs_sink
);
928 BT_ASSERT_DBG(fs_sink
->upstream_iter
);
930 /* Consume messages */
931 next_status
= bt_message_iterator_next(fs_sink
->upstream_iter
, &msgs
, &msg_count
);
932 if (next_status
< 0) {
933 status
= (bt_component_class_sink_consume_method_status
) next_status
;
934 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
935 "Failed to get next message from upstream iterator.");
939 switch (next_status
) {
940 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK
:
944 for (i
= 0; i
< msg_count
; i
++) {
945 const bt_message
*msg
= msgs
[i
];
949 switch (bt_message_get_type(msg
)) {
950 case BT_MESSAGE_TYPE_EVENT
:
951 status
= handle_event_msg(fs_sink
, msg
);
953 case BT_MESSAGE_TYPE_PACKET_BEGINNING
:
954 status
= handle_packet_beginning_msg(fs_sink
, msg
);
956 case BT_MESSAGE_TYPE_PACKET_END
:
957 status
= handle_packet_end_msg(fs_sink
, msg
);
959 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY
:
961 BT_COMP_LOGD_STR("Ignoring message iterator inactivity message.");
963 case BT_MESSAGE_TYPE_STREAM_BEGINNING
:
964 status
= handle_stream_beginning_msg(fs_sink
, msg
);
966 case BT_MESSAGE_TYPE_STREAM_END
:
967 status
= handle_stream_end_msg(fs_sink
, msg
);
969 case BT_MESSAGE_TYPE_DISCARDED_EVENTS
:
970 status
= handle_discarded_events_msg(fs_sink
, msg
);
972 case BT_MESSAGE_TYPE_DISCARDED_PACKETS
:
973 status
= handle_discarded_packets_msg(fs_sink
, msg
);
979 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
981 if (status
!= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
) {
982 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
983 "Failed to handle message: "
984 "generated CTF traces could be incomplete: "
985 "output-dir-path=\"%s\"",
986 fs_sink
->output_dir_path
->str
);
993 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN
:
994 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN
;
996 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END
:
997 /* TODO: Finalize all traces (should already be done?) */
998 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END
;
1007 BT_ASSERT(status
!= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
);
1008 put_messages(msgs
, msg_count
);
1014 bt_component_class_sink_graph_is_configured_method_status
1015 ctf_fs_sink_graph_is_configured(bt_self_component_sink
*self_comp
)
1017 bt_component_class_sink_graph_is_configured_method_status status
;
1018 bt_message_iterator_create_from_sink_component_status msg_iter_status
;
1019 fs_sink_comp
*fs_sink
= (fs_sink_comp
*) bt_self_component_get_data(
1020 bt_self_component_sink_as_self_component(self_comp
));
1022 msg_iter_status
= bt_message_iterator_create_from_sink_component(
1023 self_comp
, bt_self_component_sink_borrow_input_port_by_name(self_comp
, in_port_name
),
1024 &fs_sink
->upstream_iter
);
1025 if (msg_iter_status
!= BT_MESSAGE_ITERATOR_CREATE_FROM_SINK_COMPONENT_STATUS_OK
) {
1026 status
= (bt_component_class_sink_graph_is_configured_method_status
) msg_iter_status
;
1027 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to create upstream iterator.");
1031 status
= BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK
;
1036 void ctf_fs_sink_finalize(bt_self_component_sink
*self_comp
)
1038 fs_sink_comp
*fs_sink
= (fs_sink_comp
*) bt_self_component_get_data(
1039 bt_self_component_sink_as_self_component(self_comp
));
1041 destroy_fs_sink_comp(fs_sink
);