2 * SPDX-License-Identifier: MIT
4 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
10 #include <babeltrace2/babeltrace.h>
12 #define BT_COMP_LOG_SELF_COMP (fs_sink->self_comp)
13 #define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level)
14 #define BT_LOG_TAG "PLUGIN/SINK.CTF.FS"
15 #include "logging/comp-logging.h"
17 #include "common/assert.h"
18 #include "ctfser/ctfser.h"
20 #include "plugins/common/param-validation/param-validation.h"
22 #include "fs-sink-ctf-meta.hpp"
23 #include "fs-sink-stream.hpp"
24 #include "fs-sink-trace.hpp"
25 #include "fs-sink.hpp"
26 #include "translate-trace-ir-to-ctf-ir.hpp"
28 static const char * const in_port_name
= "in";
30 static bt_component_class_initialize_method_status
31 ensure_output_dir_exists(struct fs_sink_comp
*fs_sink
)
33 bt_component_class_initialize_method_status status
=
34 BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
;
37 ret
= g_mkdir_with_parents(fs_sink
->output_dir_path
->str
, 0755);
39 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(fs_sink
->self_comp
,
40 "Cannot create directories for output directory",
41 ": output-dir-path=\"%s\"", fs_sink
->output_dir_path
->str
);
42 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
50 static bt_param_validation_map_value_entry_descr fs_sink_params_descr
[] = {
51 {"path", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY
,
52 bt_param_validation_value_descr::makeString()},
53 {"assume-single-trace", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
54 bt_param_validation_value_descr::makeBool()},
55 {"ignore-discarded-events", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
56 bt_param_validation_value_descr::makeBool()},
57 {"ignore-discarded-packets", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
58 bt_param_validation_value_descr::makeBool()},
59 {"quiet", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
60 bt_param_validation_value_descr::makeBool()},
61 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END
};
63 static bt_component_class_initialize_method_status
configure_component(struct fs_sink_comp
*fs_sink
,
64 const bt_value
*params
)
66 bt_component_class_initialize_method_status status
;
67 const bt_value
*value
;
68 enum bt_param_validation_status validation_status
;
69 gchar
*validation_error
;
72 bt_param_validation_validate(params
, fs_sink_params_descr
, &validation_error
);
73 if (validation_status
== BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR
) {
74 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
75 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "%s", validation_error
);
77 } else if (validation_status
== BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR
) {
78 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
82 value
= bt_value_map_borrow_entry_value_const(params
, "path");
83 g_string_assign(fs_sink
->output_dir_path
, bt_value_string_get(value
));
85 value
= bt_value_map_borrow_entry_value_const(params
, "assume-single-trace");
87 fs_sink
->assume_single_trace
= (bool) bt_value_bool_get(value
);
90 value
= bt_value_map_borrow_entry_value_const(params
, "ignore-discarded-events");
92 fs_sink
->ignore_discarded_events
= (bool) bt_value_bool_get(value
);
95 value
= bt_value_map_borrow_entry_value_const(params
, "ignore-discarded-packets");
97 fs_sink
->ignore_discarded_packets
= (bool) bt_value_bool_get(value
);
100 value
= bt_value_map_borrow_entry_value_const(params
, "quiet");
102 fs_sink
->quiet
= (bool) bt_value_bool_get(value
);
105 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
;
108 g_free(validation_error
);
112 static void destroy_fs_sink_comp(struct fs_sink_comp
*fs_sink
)
118 if (fs_sink
->output_dir_path
) {
119 g_string_free(fs_sink
->output_dir_path
, TRUE
);
120 fs_sink
->output_dir_path
= NULL
;
123 if (fs_sink
->traces
) {
124 g_hash_table_destroy(fs_sink
->traces
);
125 fs_sink
->traces
= NULL
;
128 BT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(fs_sink
->upstream_iter
);
135 bt_component_class_initialize_method_status
ctf_fs_sink_init(bt_self_component_sink
*self_comp_sink
,
136 bt_self_component_sink_configuration
*,
137 const bt_value
*params
, void *)
139 bt_component_class_initialize_method_status status
;
140 bt_self_component_add_port_status add_port_status
;
141 struct fs_sink_comp
*fs_sink
= NULL
;
142 bt_self_component
*self_comp
= bt_self_component_sink_as_self_component(self_comp_sink
);
143 bt_logging_level log_level
=
144 bt_component_get_logging_level(bt_self_component_as_component(self_comp
));
146 fs_sink
= new fs_sink_comp
;
147 fs_sink
->log_level
= log_level
;
148 fs_sink
->self_comp
= self_comp
;
149 fs_sink
->output_dir_path
= g_string_new(NULL
);
150 status
= configure_component(fs_sink
, params
);
151 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
152 /* configure_component() logs errors */
156 if (fs_sink
->assume_single_trace
&&
157 g_file_test(fs_sink
->output_dir_path
->str
, G_FILE_TEST_EXISTS
)) {
158 BT_COMP_LOGE_APPEND_CAUSE(self_comp
,
159 "Single trace mode, but output path exists: output-path=\"%s\"",
160 fs_sink
->output_dir_path
->str
);
161 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
165 status
= ensure_output_dir_exists(fs_sink
);
166 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
167 /* ensure_output_dir_exists() logs errors */
171 fs_sink
->traces
= g_hash_table_new_full(g_direct_hash
, g_direct_equal
, NULL
,
172 (GDestroyNotify
) fs_sink_trace_destroy
);
173 if (!fs_sink
->traces
) {
174 BT_COMP_LOGE_APPEND_CAUSE(self_comp
, "Failed to allocate one GHashTable.");
175 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
180 bt_self_component_sink_add_input_port(self_comp_sink
, in_port_name
, NULL
, NULL
);
181 if (add_port_status
!= BT_SELF_COMPONENT_ADD_PORT_STATUS_OK
) {
182 status
= (bt_component_class_initialize_method_status
) add_port_status
;
183 BT_COMP_LOGE_APPEND_CAUSE(self_comp
, "Failed to add input port.");
187 bt_self_component_set_data(self_comp
, fs_sink
);
190 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
191 destroy_fs_sink_comp(fs_sink
);
197 static inline struct fs_sink_stream
*borrow_stream(struct fs_sink_comp
*fs_sink
,
198 const bt_stream
*ir_stream
)
200 const bt_trace
*ir_trace
= bt_stream_borrow_trace_const(ir_stream
);
201 struct fs_sink_trace
*trace
;
202 struct fs_sink_stream
*stream
= NULL
;
204 trace
= (fs_sink_trace
*) g_hash_table_lookup(fs_sink
->traces
, ir_trace
);
205 if (G_UNLIKELY(!trace
)) {
206 if (fs_sink
->assume_single_trace
&& g_hash_table_size(fs_sink
->traces
) > 0) {
207 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
208 "Single trace mode, but getting more than one trace: "
209 "stream-name=\"%s\"",
210 bt_stream_get_name(ir_stream
));
214 trace
= fs_sink_trace_create(fs_sink
, ir_trace
);
220 stream
= (fs_sink_stream
*) g_hash_table_lookup(trace
->streams
, ir_stream
);
221 if (G_UNLIKELY(!stream
)) {
222 stream
= fs_sink_stream_create(trace
, ir_stream
);
232 static inline bt_component_class_sink_consume_method_status
233 handle_event_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
236 bt_component_class_sink_consume_method_status status
=
237 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
238 const bt_event
*ir_event
= bt_message_event_borrow_event_const(msg
);
239 const bt_stream
*ir_stream
= bt_event_borrow_stream_const(ir_event
);
240 struct fs_sink_stream
*stream
;
241 struct fs_sink_ctf_event_class
*ec
= NULL
;
242 const bt_clock_snapshot
*cs
= NULL
;
244 stream
= borrow_stream(fs_sink
, ir_stream
);
245 if (G_UNLIKELY(!stream
)) {
246 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
247 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
251 ret
= try_translate_event_class_trace_ir_to_ctf_ir(fs_sink
, stream
->sc
,
252 bt_event_borrow_class_const(ir_event
), &ec
);
254 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to translate event class to CTF IR.");
255 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
261 if (stream
->sc
->default_clock_class
) {
262 cs
= bt_message_event_borrow_default_clock_snapshot_const(msg
);
266 * If this event's stream does not support packets, then we
267 * lazily create artificial packets.
269 * The size of an artificial packet is arbitrarily at least
270 * 4 MiB (it usually is greater because we close it when
271 * comes the time to write a new event and the packet's content
272 * size is >= 4 MiB), except the last one which can be smaller.
274 if (G_UNLIKELY(!stream
->sc
->has_packets
)) {
275 if (stream
->packet_state
.is_open
&&
276 bt_ctfser_get_offset_in_current_packet_bits(&stream
->ctfser
) / 8 >= 4 * 1024 * 1024) {
278 * Stream's current packet is larger than 4 MiB:
279 * close it. A new packet will be opened just
282 ret
= fs_sink_stream_close_packet(stream
, NULL
);
284 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to close packet.");
285 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
290 if (!stream
->packet_state
.is_open
) {
291 /* Stream's packet is not currently opened: open it */
292 ret
= fs_sink_stream_open_packet(stream
, NULL
, NULL
);
294 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to open packet.");
295 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
301 BT_ASSERT_DBG(stream
->packet_state
.is_open
);
302 ret
= fs_sink_stream_write_event(stream
, cs
, ir_event
, ec
);
303 if (G_UNLIKELY(ret
)) {
304 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to write event.");
305 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
313 static inline bt_component_class_sink_consume_method_status
314 handle_packet_beginning_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
317 bt_component_class_sink_consume_method_status status
=
318 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
319 const bt_packet
*ir_packet
= bt_message_packet_beginning_borrow_packet_const(msg
);
320 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
321 struct fs_sink_stream
*stream
;
322 const bt_clock_snapshot
*cs
= NULL
;
324 stream
= borrow_stream(fs_sink
, ir_stream
);
325 if (G_UNLIKELY(!stream
)) {
326 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
327 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
331 if (stream
->sc
->packets_have_ts_begin
) {
332 cs
= bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg
);
337 * If we previously received a discarded events message with
338 * a time range, make sure that its beginning time matches what's
339 * expected for CTF 1.8, that is:
341 * * Its beginning time is the previous packet's end
342 * time (or the current packet's beginning time if
343 * this is the first packet).
345 * We check this here instead of in handle_packet_end_msg()
346 * because we want to catch any incompatible message as early as
347 * possible to report the error.
349 * Validation of the discarded events message's end time is
350 * performed in handle_packet_end_msg().
352 if (stream
->discarded_events_state
.in_range
) {
353 uint64_t expected_cs
;
356 * `stream->discarded_events_state.in_range` is only set
357 * when the stream class's discarded events have a time
360 * It is required that the packet beginning and end
361 * messages for this stream class have times when
362 * discarded events have a time range.
364 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
365 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
366 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
368 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
369 /* We're opening the first packet */
370 expected_cs
= bt_clock_snapshot_get_value(cs
);
372 expected_cs
= stream
->prev_packet_state
.end_cs
;
375 if (stream
->discarded_events_state
.beginning_cs
!= expected_cs
) {
376 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
377 "Incompatible discarded events message: "
378 "unexpected beginning time: "
379 "beginning-cs-val=%" PRIu64
", "
380 "expected-beginning-cs-val=%" PRIu64
", "
381 "stream-id=%" PRIu64
", stream-name=\"%s\", "
382 "trace-name=\"%s\", path=\"%s/%s\"",
383 stream
->discarded_events_state
.beginning_cs
, expected_cs
,
384 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
385 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
386 stream
->trace
->path
->str
, stream
->file_name
->str
);
387 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
393 * If we previously received a discarded packets message with a
394 * time range, make sure that its beginning and end times match
395 * what's expected for CTF 1.8, that is:
397 * * Its beginning time is the previous packet's end time.
399 * * Its end time is the current packet's beginning time.
401 if (stream
->discarded_packets_state
.in_range
) {
402 uint64_t expected_end_cs
;
405 * `stream->discarded_packets_state.in_range` is only
406 * set when the stream class's discarded packets have a
409 * It is required that the packet beginning and end
410 * messages for this stream class have times when
411 * discarded packets have a time range.
413 BT_ASSERT(stream
->sc
->discarded_packets_has_ts
);
414 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
415 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
418 * It is not supported to have a discarded packets
419 * message _before_ the first packet: we cannot validate
420 * that its beginning time is compatible with CTF 1.8 in
423 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
424 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
425 "Incompatible discarded packets message "
426 "occurring before the stream's first packet: "
427 "stream-id=%" PRIu64
", stream-name=\"%s\", "
428 "trace-name=\"%s\", path=\"%s/%s\"",
429 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
430 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
431 stream
->trace
->path
->str
, stream
->file_name
->str
);
432 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
436 if (stream
->discarded_packets_state
.beginning_cs
!= stream
->prev_packet_state
.end_cs
) {
437 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
438 "Incompatible discarded packets message: "
439 "unexpected beginning time: "
440 "beginning-cs-val=%" PRIu64
", "
441 "expected-beginning-cs-val=%" PRIu64
", "
442 "stream-id=%" PRIu64
", stream-name=\"%s\", "
443 "trace-name=\"%s\", path=\"%s/%s\"",
444 stream
->discarded_packets_state
.beginning_cs
,
445 stream
->prev_packet_state
.end_cs
, bt_stream_get_id(ir_stream
),
446 bt_stream_get_name(ir_stream
),
447 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
448 stream
->trace
->path
->str
, stream
->file_name
->str
);
449 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
453 expected_end_cs
= bt_clock_snapshot_get_value(cs
);
455 if (stream
->discarded_packets_state
.end_cs
!= expected_end_cs
) {
456 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
457 "Incompatible discarded packets message: "
458 "unexpected end time: "
459 "end-cs-val=%" PRIu64
", "
460 "expected-end-cs-val=%" PRIu64
", "
461 "stream-id=%" PRIu64
", stream-name=\"%s\", "
462 "trace-name=\"%s\", path=\"%s/%s\"",
463 stream
->discarded_packets_state
.end_cs
, expected_end_cs
,
464 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
465 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
466 stream
->trace
->path
->str
, stream
->file_name
->str
);
467 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
473 * We're not in a discarded packets time range anymore since we
474 * require that the discarded packets time ranges go from one
475 * packet's end time to the next packet's beginning time, and
476 * we're handling a packet beginning message here.
478 stream
->discarded_packets_state
.in_range
= false;
480 ret
= fs_sink_stream_open_packet(stream
, cs
, ir_packet
);
482 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to open packet.");
483 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
491 static inline bt_component_class_sink_consume_method_status
492 handle_packet_end_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
495 bt_component_class_sink_consume_method_status status
=
496 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
497 const bt_packet
*ir_packet
= bt_message_packet_end_borrow_packet_const(msg
);
498 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
499 struct fs_sink_stream
*stream
;
500 const bt_clock_snapshot
*cs
= NULL
;
502 stream
= borrow_stream(fs_sink
, ir_stream
);
503 if (G_UNLIKELY(!stream
)) {
504 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
505 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
509 if (stream
->sc
->packets_have_ts_end
) {
510 cs
= bt_message_packet_end_borrow_default_clock_snapshot_const(msg
);
515 * If we previously received a discarded events message with
516 * a time range, make sure that its end time matches what's
517 * expected for CTF 1.8, that is:
519 * * Its end time is the current packet's end time.
521 * Validation of the discarded events message's beginning time
522 * is performed in handle_packet_beginning_msg().
524 if (stream
->discarded_events_state
.in_range
) {
525 uint64_t expected_cs
;
528 * `stream->discarded_events_state.in_range` is only set
529 * when the stream class's discarded events have a time
532 * It is required that the packet beginning and end
533 * messages for this stream class have times when
534 * discarded events have a time range.
536 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
537 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
538 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
540 expected_cs
= bt_clock_snapshot_get_value(cs
);
542 if (stream
->discarded_events_state
.end_cs
!= expected_cs
) {
543 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
544 "Incompatible discarded events message: "
545 "unexpected end time: "
546 "end-cs-val=%" PRIu64
", "
547 "expected-end-cs-val=%" PRIu64
", "
548 "stream-id=%" PRIu64
", stream-name=\"%s\", "
549 "trace-name=\"%s\", path=\"%s/%s\"",
550 stream
->discarded_events_state
.end_cs
, expected_cs
,
551 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
552 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
553 stream
->trace
->path
->str
, stream
->file_name
->str
);
554 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
559 ret
= fs_sink_stream_close_packet(stream
, cs
);
561 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to close packet.");
562 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
567 * We're not in a discarded events time range anymore since we
568 * require that the discarded events time ranges go from one
569 * packet's end time to the next packet's end time, and we're
570 * handling a packet end message here.
572 stream
->discarded_events_state
.in_range
= false;
578 static inline bt_component_class_sink_consume_method_status
579 handle_stream_beginning_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
581 bt_component_class_sink_consume_method_status status
=
582 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
583 const bt_stream
*ir_stream
= bt_message_stream_beginning_borrow_stream_const(msg
);
584 const bt_stream_class
*ir_sc
= bt_stream_borrow_class_const(ir_stream
);
585 struct fs_sink_stream
*stream
;
586 bool packets_have_beginning_end_cs
=
587 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc
) &&
588 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc
);
591 * Not supported: discarded events or discarded packets support
592 * without packets support. Packets are the way to know where
593 * discarded events/packets occurred in CTF 1.8.
595 if (!bt_stream_class_supports_packets(ir_sc
)) {
596 BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc
));
598 if (!fs_sink
->ignore_discarded_events
&& bt_stream_class_supports_discarded_events(ir_sc
)) {
599 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
600 "Unsupported stream: "
601 "stream does not support packets, "
602 "but supports discarded events: "
604 "stream-id=%" PRIu64
", "
605 "stream-name=\"%s\"",
606 ir_stream
, bt_stream_get_id(ir_stream
),
607 bt_stream_get_name(ir_stream
));
608 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
614 * Not supported: discarded events with default clock snapshots,
615 * but packet beginning/end without default clock snapshot.
617 if (!fs_sink
->ignore_discarded_events
&&
618 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc
) &&
619 !packets_have_beginning_end_cs
) {
620 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
621 "Unsupported stream: discarded events have "
622 "default clock snapshots, but packets have no "
623 "beginning and/or end default clock snapshots: "
625 "stream-id=%" PRIu64
", "
626 "stream-name=\"%s\"",
627 ir_stream
, bt_stream_get_id(ir_stream
),
628 bt_stream_get_name(ir_stream
));
629 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
634 * Not supported: discarded packets with default clock
635 * snapshots, but packet beginning/end without default clock
638 if (!fs_sink
->ignore_discarded_packets
&&
639 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc
) &&
640 !packets_have_beginning_end_cs
) {
641 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
642 "Unsupported stream: discarded packets have "
643 "default clock snapshots, but packets have no "
644 "beginning and/or end default clock snapshots: "
646 "stream-id=%" PRIu64
", "
647 "stream-name=\"%s\"",
648 ir_stream
, bt_stream_get_id(ir_stream
),
649 bt_stream_get_name(ir_stream
));
650 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
654 stream
= borrow_stream(fs_sink
, ir_stream
);
656 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
657 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
661 BT_COMP_LOGI("Created new, empty stream file: "
662 "stream-id=%" PRIu64
", stream-name=\"%s\", "
663 "trace-name=\"%s\", path=\"%s/%s\"",
664 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
665 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
666 stream
->trace
->path
->str
, stream
->file_name
->str
);
672 static inline bt_component_class_sink_consume_method_status
673 handle_stream_end_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
675 bt_component_class_sink_consume_method_status status
=
676 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
677 const bt_stream
*ir_stream
= bt_message_stream_end_borrow_stream_const(msg
);
678 struct fs_sink_stream
*stream
;
680 stream
= borrow_stream(fs_sink
, ir_stream
);
682 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
683 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
687 if (G_UNLIKELY(!stream
->sc
->has_packets
&& stream
->packet_state
.is_open
)) {
688 /* Close stream's current artificial packet */
689 int ret
= fs_sink_stream_close_packet(stream
, NULL
);
692 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to close packet.");
693 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
698 BT_COMP_LOGI("Closing stream file: "
699 "stream-id=%" PRIu64
", stream-name=\"%s\", "
700 "trace-name=\"%s\", path=\"%s/%s\"",
701 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
702 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
703 stream
->trace
->path
->str
, stream
->file_name
->str
);
706 * This destroys the stream object and frees all its resources,
707 * closing the stream file.
709 g_hash_table_remove(stream
->trace
->streams
, ir_stream
);
715 static inline bt_component_class_sink_consume_method_status
716 handle_discarded_events_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
718 bt_component_class_sink_consume_method_status status
=
719 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
720 const bt_stream
*ir_stream
= bt_message_discarded_events_borrow_stream_const(msg
);
721 struct fs_sink_stream
*stream
;
722 const bt_clock_snapshot
*cs
= NULL
;
723 bt_property_availability avail
;
726 stream
= borrow_stream(fs_sink
, ir_stream
);
728 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
729 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
733 if (fs_sink
->ignore_discarded_events
) {
734 BT_COMP_LOGI("Ignoring discarded events message: "
735 "stream-id=%" PRIu64
", stream-name=\"%s\", "
736 "trace-name=\"%s\", path=\"%s/%s\"",
737 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
738 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
739 stream
->trace
->path
->str
, stream
->file_name
->str
);
743 if (stream
->discarded_events_state
.in_range
) {
744 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
745 "Unsupported contiguous discarded events message: "
746 "stream-id=%" PRIu64
", stream-name=\"%s\", "
747 "trace-name=\"%s\", path=\"%s/%s\"",
748 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
749 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
750 stream
->trace
->path
->str
, stream
->file_name
->str
);
751 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
756 * If we're currently in an opened packet (got a packet
757 * beginning message, but no packet end message yet), we do not
758 * support having a discarded events message with a time range
759 * because we require that the discarded events message's time
760 * range go from a packet's end time to the next packet's end
763 if (stream
->packet_state
.is_open
&& stream
->sc
->discarded_events_has_ts
) {
764 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
765 "Unsupported discarded events message with "
766 "default clock snapshots occurring within a packet: "
767 "stream-id=%" PRIu64
", stream-name=\"%s\", "
768 "trace-name=\"%s\", path=\"%s/%s\"",
769 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
770 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
771 stream
->trace
->path
->str
, stream
->file_name
->str
);
772 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
776 if (stream
->sc
->discarded_events_has_ts
) {
778 * Make the stream's state be in the time range of a
779 * discarded events message since we have the message's
780 * time range (`stream->sc->discarded_events_has_ts`).
782 stream
->discarded_events_state
.in_range
= true;
785 * The clock snapshot values will be validated when
786 * handling the next packet beginning and end messages
787 * (next calls to handle_packet_beginning_msg() and
788 * handle_packet_end_msg()).
790 cs
= bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg
);
792 stream
->discarded_events_state
.beginning_cs
= bt_clock_snapshot_get_value(cs
);
793 cs
= bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg
);
795 stream
->discarded_events_state
.end_cs
= bt_clock_snapshot_get_value(cs
);
798 avail
= bt_message_discarded_events_get_count(msg
, &count
);
799 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
801 * There's no specific count of discarded events: set it
802 * to 1 so that we know that we at least discarded
808 stream
->packet_state
.discarded_events_counter
+= count
;
814 static inline bt_component_class_sink_consume_method_status
815 handle_discarded_packets_msg(struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
817 bt_component_class_sink_consume_method_status status
=
818 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
819 const bt_stream
*ir_stream
= bt_message_discarded_packets_borrow_stream_const(msg
);
820 struct fs_sink_stream
*stream
;
821 const bt_clock_snapshot
*cs
= NULL
;
822 bt_property_availability avail
;
825 stream
= borrow_stream(fs_sink
, ir_stream
);
827 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to borrow stream.");
828 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
832 if (fs_sink
->ignore_discarded_packets
) {
833 BT_COMP_LOGI("Ignoring discarded packets message: "
834 "stream-id=%" PRIu64
", stream-name=\"%s\", "
835 "trace-name=\"%s\", path=\"%s/%s\"",
836 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
837 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
838 stream
->trace
->path
->str
, stream
->file_name
->str
);
842 if (stream
->discarded_packets_state
.in_range
) {
843 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
844 "Unsupported contiguous discarded packets message: "
845 "stream-id=%" PRIu64
", stream-name=\"%s\", "
846 "trace-name=\"%s\", path=\"%s/%s\"",
847 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
848 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
849 stream
->trace
->path
->str
, stream
->file_name
->str
);
850 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
855 * Discarded packets messages are guaranteed to occur between
858 BT_ASSERT(!stream
->packet_state
.is_open
);
860 if (stream
->sc
->discarded_packets_has_ts
) {
862 * Make the stream's state be in the time range of a
863 * discarded packets message since we have the message's
864 * time range (`stream->sc->discarded_packets_has_ts`).
866 stream
->discarded_packets_state
.in_range
= true;
869 * The clock snapshot values will be validated when
870 * handling the next packet beginning message (next call
871 * to handle_packet_beginning_msg()).
873 cs
= bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg
);
875 stream
->discarded_packets_state
.beginning_cs
= bt_clock_snapshot_get_value(cs
);
876 cs
= bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg
);
878 stream
->discarded_packets_state
.end_cs
= bt_clock_snapshot_get_value(cs
);
881 avail
= bt_message_discarded_packets_get_count(msg
, &count
);
882 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
884 * There's no specific count of discarded packets: set
885 * it to 1 so that we know that we at least discarded
891 stream
->packet_state
.seq_num
+= count
;
897 static inline void put_messages(bt_message_array_const msgs
, uint64_t count
)
901 for (i
= 0; i
< count
; i
++) {
902 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
906 bt_component_class_sink_consume_method_status
ctf_fs_sink_consume(bt_self_component_sink
*self_comp
)
908 bt_component_class_sink_consume_method_status status
=
909 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
910 struct fs_sink_comp
*fs_sink
;
911 bt_message_iterator_next_status next_status
;
912 uint64_t msg_count
= 0;
913 bt_message_array_const msgs
;
915 fs_sink
= (fs_sink_comp
*) bt_self_component_get_data(
916 bt_self_component_sink_as_self_component(self_comp
));
917 BT_ASSERT_DBG(fs_sink
);
918 BT_ASSERT_DBG(fs_sink
->upstream_iter
);
920 /* Consume messages */
921 next_status
= bt_message_iterator_next(fs_sink
->upstream_iter
, &msgs
, &msg_count
);
922 if (next_status
< 0) {
923 status
= (bt_component_class_sink_consume_method_status
) next_status
;
924 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
925 "Failed to get next message from upstream iterator.");
929 switch (next_status
) {
930 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK
:
934 for (i
= 0; i
< msg_count
; i
++) {
935 const bt_message
*msg
= msgs
[i
];
939 switch (bt_message_get_type(msg
)) {
940 case BT_MESSAGE_TYPE_EVENT
:
941 status
= handle_event_msg(fs_sink
, msg
);
943 case BT_MESSAGE_TYPE_PACKET_BEGINNING
:
944 status
= handle_packet_beginning_msg(fs_sink
, msg
);
946 case BT_MESSAGE_TYPE_PACKET_END
:
947 status
= handle_packet_end_msg(fs_sink
, msg
);
949 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY
:
951 BT_COMP_LOGD_STR("Ignoring message iterator inactivity message.");
953 case BT_MESSAGE_TYPE_STREAM_BEGINNING
:
954 status
= handle_stream_beginning_msg(fs_sink
, msg
);
956 case BT_MESSAGE_TYPE_STREAM_END
:
957 status
= handle_stream_end_msg(fs_sink
, msg
);
959 case BT_MESSAGE_TYPE_DISCARDED_EVENTS
:
960 status
= handle_discarded_events_msg(fs_sink
, msg
);
962 case BT_MESSAGE_TYPE_DISCARDED_PACKETS
:
963 status
= handle_discarded_packets_msg(fs_sink
, msg
);
969 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
971 if (status
!= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
) {
972 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
973 "Failed to handle message: "
974 "generated CTF traces could be incomplete: "
975 "output-dir-path=\"%s\"",
976 fs_sink
->output_dir_path
->str
);
983 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN
:
984 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN
;
986 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END
:
987 /* TODO: Finalize all traces (should already be done?) */
988 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END
;
997 BT_ASSERT(status
!= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
);
998 put_messages(msgs
, msg_count
);
1004 bt_component_class_sink_graph_is_configured_method_status
1005 ctf_fs_sink_graph_is_configured(bt_self_component_sink
*self_comp
)
1007 bt_component_class_sink_graph_is_configured_method_status status
;
1008 bt_message_iterator_create_from_sink_component_status msg_iter_status
;
1009 fs_sink_comp
*fs_sink
= (fs_sink_comp
*) bt_self_component_get_data(
1010 bt_self_component_sink_as_self_component(self_comp
));
1012 msg_iter_status
= bt_message_iterator_create_from_sink_component(
1013 self_comp
, bt_self_component_sink_borrow_input_port_by_name(self_comp
, in_port_name
),
1014 &fs_sink
->upstream_iter
);
1015 if (msg_iter_status
!= BT_MESSAGE_ITERATOR_CREATE_FROM_SINK_COMPONENT_STATUS_OK
) {
1016 status
= (bt_component_class_sink_graph_is_configured_method_status
) msg_iter_status
;
1017 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
, "Failed to create upstream iterator.");
1021 status
= BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK
;
1026 void ctf_fs_sink_finalize(bt_self_component_sink
*self_comp
)
1028 fs_sink_comp
*fs_sink
= (fs_sink_comp
*) bt_self_component_get_data(
1029 bt_self_component_sink_as_self_component(self_comp
));
1031 destroy_fs_sink_comp(fs_sink
);