2 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23 #define BT_COMP_LOG_SELF_COMP (fs_sink->self_comp)
24 #define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level)
25 #define BT_LOG_TAG "PLUGIN/SINK.CTF.FS"
26 #include "logging/comp-logging.h"
28 #include <babeltrace2/babeltrace.h>
32 #include "common/assert.h"
33 #include "ctfser/ctfser.h"
36 #include "fs-sink-trace.h"
37 #include "fs-sink-stream.h"
38 #include "fs-sink-ctf-meta.h"
39 #include "translate-trace-ir-to-ctf-ir.h"
40 #include "translate-ctf-ir-to-tsdl.h"
43 const char * const in_port_name
= "in";
46 bt_component_class_init_method_status
ensure_output_dir_exists(
47 struct fs_sink_comp
*fs_sink
)
49 bt_component_class_init_method_status status
=
50 BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK
;
53 ret
= g_mkdir_with_parents(fs_sink
->output_dir_path
->str
, 0755);
56 "Cannot create directories for output directory",
57 ": output-dir-path=\"%s\"",
58 fs_sink
->output_dir_path
->str
);
59 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
68 bt_component_class_init_method_status
69 configure_component(struct fs_sink_comp
*fs_sink
,
70 const bt_value
*params
)
72 bt_component_class_init_method_status status
=
73 BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK
;
74 const bt_value
*value
;
76 value
= bt_value_map_borrow_entry_value_const(params
, "path");
78 BT_COMP_LOGE_STR("Missing mandatory `path` parameter.");
79 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
83 if (!bt_value_is_string(value
)) {
84 BT_COMP_LOGE_STR("`path` parameter: expecting a string.");
85 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
89 g_string_assign(fs_sink
->output_dir_path
,
90 bt_value_string_get(value
));
91 value
= bt_value_map_borrow_entry_value_const(params
,
92 "assume-single-trace");
94 if (!bt_value_is_bool(value
)) {
95 BT_COMP_LOGE_STR("`assume-single-trace` parameter: expecting a boolean.");
96 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
100 fs_sink
->assume_single_trace
= (bool) bt_value_bool_get(value
);
103 value
= bt_value_map_borrow_entry_value_const(params
,
104 "ignore-discarded-events");
106 if (!bt_value_is_bool(value
)) {
107 BT_COMP_LOGE_STR("`ignore-discarded-events` parameter: expecting a boolean.");
108 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
112 fs_sink
->ignore_discarded_events
=
113 (bool) bt_value_bool_get(value
);
116 value
= bt_value_map_borrow_entry_value_const(params
,
117 "ignore-discarded-packets");
119 if (!bt_value_is_bool(value
)) {
120 BT_COMP_LOGE_STR("`ignore-discarded-packets` parameter: expecting a boolean.");
121 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
125 fs_sink
->ignore_discarded_packets
=
126 (bool) bt_value_bool_get(value
);
129 value
= bt_value_map_borrow_entry_value_const(params
,
132 if (!bt_value_is_bool(value
)) {
133 BT_COMP_LOGE_STR("`quiet` parameter: expecting a boolean.");
134 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
138 fs_sink
->quiet
= (bool) bt_value_bool_get(value
);
146 void destroy_fs_sink_comp(struct fs_sink_comp
*fs_sink
)
152 if (fs_sink
->output_dir_path
) {
153 g_string_free(fs_sink
->output_dir_path
, TRUE
);
154 fs_sink
->output_dir_path
= NULL
;
157 if (fs_sink
->traces
) {
158 g_hash_table_destroy(fs_sink
->traces
);
159 fs_sink
->traces
= NULL
;
162 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(
163 fs_sink
->upstream_iter
);
171 bt_component_class_init_method_status
ctf_fs_sink_init(
172 bt_self_component_sink
*self_comp_sink
,
173 bt_self_component_sink_configuration
*config
,
174 const bt_value
*params
,
175 void *init_method_data
)
177 bt_component_class_init_method_status status
=
178 BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK
;
179 bt_self_component_add_port_status add_port_status
;
180 struct fs_sink_comp
*fs_sink
= NULL
;
181 bt_self_component
*self_comp
=
182 bt_self_component_sink_as_self_component(self_comp_sink
);
183 bt_logging_level log_level
= bt_component_get_logging_level(
184 bt_self_component_as_component(self_comp
));
186 fs_sink
= g_new0(struct fs_sink_comp
, 1);
188 BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR
, log_level
, self_comp
,
189 "Failed to allocate one CTF FS sink structure.");
190 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR
;
194 fs_sink
->log_level
= log_level
;
195 fs_sink
->self_comp
= self_comp
;
196 fs_sink
->output_dir_path
= g_string_new(NULL
);
197 status
= configure_component(fs_sink
, params
);
198 if (status
!= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK
) {
199 /* configure_component() logs errors */
203 if (fs_sink
->assume_single_trace
&&
204 g_file_test(fs_sink
->output_dir_path
->str
,
205 G_FILE_TEST_EXISTS
)) {
206 BT_COMP_LOGE("Single trace mode, but output path exists: "
207 "output-path=\"%s\"", fs_sink
->output_dir_path
->str
);
208 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
212 status
= ensure_output_dir_exists(fs_sink
);
213 if (status
!= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK
) {
214 /* ensure_output_dir_exists() logs errors */
218 fs_sink
->traces
= g_hash_table_new_full(g_direct_hash
, g_direct_equal
,
219 NULL
, (GDestroyNotify
) fs_sink_trace_destroy
);
220 if (!fs_sink
->traces
) {
221 BT_COMP_LOGE_STR("Failed to allocate one GHashTable.");
222 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR
;
226 add_port_status
= bt_self_component_sink_add_input_port(
227 self_comp_sink
, in_port_name
, NULL
, NULL
);
228 switch (add_port_status
) {
229 case BT_SELF_COMPONENT_ADD_PORT_STATUS_ERROR
:
230 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
232 case BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR
:
233 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR
;
239 bt_self_component_set_data(self_comp
, fs_sink
);
242 if (status
!= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK
) {
243 destroy_fs_sink_comp(fs_sink
);
250 struct fs_sink_stream
*borrow_stream(struct fs_sink_comp
*fs_sink
,
251 const bt_stream
*ir_stream
)
253 const bt_trace
*ir_trace
= bt_stream_borrow_trace_const(ir_stream
);
254 struct fs_sink_trace
*trace
;
255 struct fs_sink_stream
*stream
= NULL
;
257 trace
= g_hash_table_lookup(fs_sink
->traces
, ir_trace
);
258 if (G_UNLIKELY(!trace
)) {
259 if (fs_sink
->assume_single_trace
&&
260 g_hash_table_size(fs_sink
->traces
) > 0) {
261 BT_COMP_LOGE("Single trace mode, but getting more than one trace: "
262 "stream-name=\"%s\"",
263 bt_stream_get_name(ir_stream
));
267 trace
= fs_sink_trace_create(fs_sink
, ir_trace
);
273 stream
= g_hash_table_lookup(trace
->streams
, ir_stream
);
274 if (G_UNLIKELY(!stream
)) {
275 stream
= fs_sink_stream_create(trace
, ir_stream
);
286 bt_component_class_sink_consume_method_status
handle_event_msg(
287 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
290 bt_component_class_sink_consume_method_status status
=
291 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
292 const bt_event
*ir_event
= bt_message_event_borrow_event_const(msg
);
293 const bt_stream
*ir_stream
= bt_event_borrow_stream_const(ir_event
);
294 struct fs_sink_stream
*stream
;
295 struct fs_sink_ctf_event_class
*ec
= NULL
;
296 const bt_clock_snapshot
*cs
= NULL
;
298 stream
= borrow_stream(fs_sink
, ir_stream
);
299 if (G_UNLIKELY(!stream
)) {
300 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
304 ret
= try_translate_event_class_trace_ir_to_ctf_ir(fs_sink
,
305 stream
->sc
, bt_event_borrow_class_const(ir_event
), &ec
);
307 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
313 if (stream
->sc
->default_clock_class
) {
314 cs
= bt_message_event_borrow_default_clock_snapshot_const(
319 * If this event's stream does not support packets, then we
320 * lazily create artificial packets.
322 * The size of an artificial packet is arbitrarily at least
323 * 4 MiB (it usually is greater because we close it when
324 * comes the time to write a new event and the packet's content
325 * size is >= 4 MiB), except the last one which can be smaller.
327 if (G_UNLIKELY(!stream
->sc
->has_packets
)) {
328 if (stream
->packet_state
.is_open
&&
329 bt_ctfser_get_offset_in_current_packet_bits(&stream
->ctfser
) / 8 >=
332 * Stream's current packet is larger than 4 MiB:
333 * close it. A new packet will be opened just
336 ret
= fs_sink_stream_close_packet(stream
, NULL
);
338 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
343 if (!stream
->packet_state
.is_open
) {
344 /* Stream's packet is not currently opened: open it */
345 ret
= fs_sink_stream_open_packet(stream
, NULL
, NULL
);
347 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
353 BT_ASSERT(stream
->packet_state
.is_open
);
354 ret
= fs_sink_stream_write_event(stream
, cs
, ir_event
, ec
);
355 if (G_UNLIKELY(ret
)) {
356 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
365 bt_component_class_sink_consume_method_status
handle_packet_beginning_msg(
366 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
369 bt_component_class_sink_consume_method_status status
=
370 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
371 const bt_packet
*ir_packet
=
372 bt_message_packet_beginning_borrow_packet_const(msg
);
373 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
374 struct fs_sink_stream
*stream
;
375 const bt_clock_snapshot
*cs
= NULL
;
377 stream
= borrow_stream(fs_sink
, ir_stream
);
378 if (G_UNLIKELY(!stream
)) {
379 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
383 if (stream
->sc
->packets_have_ts_begin
) {
384 cs
= bt_message_packet_beginning_borrow_default_clock_snapshot_const(
390 * If we previously received a discarded events message with
391 * a time range, make sure that its beginning time matches what's
392 * expected for CTF 1.8, that is:
394 * * Its beginning time is the previous packet's end
395 * time (or the current packet's beginning time if
396 * this is the first packet).
398 * We check this here instead of in handle_packet_end_msg()
399 * because we want to catch any incompatible message as early as
400 * possible to report the error.
402 * Validation of the discarded events message's end time is
403 * performed in handle_packet_end_msg().
405 if (stream
->discarded_events_state
.in_range
) {
406 uint64_t expected_cs
;
409 * `stream->discarded_events_state.in_range` is only set
410 * when the stream class's discarded events have a time
413 * It is required that the packet beginning and end
414 * messages for this stream class have times when
415 * discarded events have a time range.
417 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
418 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
419 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
421 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
422 /* We're opening the first packet */
423 expected_cs
= bt_clock_snapshot_get_value(cs
);
425 expected_cs
= stream
->prev_packet_state
.end_cs
;
428 if (stream
->discarded_events_state
.beginning_cs
!=
430 BT_COMP_LOGE("Incompatible discarded events message: "
431 "unexpected beginning time: "
432 "beginning-cs-val=%" PRIu64
", "
433 "expected-beginning-cs-val=%" PRIu64
", "
434 "stream-id=%" PRIu64
", stream-name=\"%s\", "
435 "trace-name=\"%s\", path=\"%s/%s\"",
436 stream
->discarded_events_state
.beginning_cs
,
438 bt_stream_get_id(ir_stream
),
439 bt_stream_get_name(ir_stream
),
441 bt_stream_borrow_trace_const(ir_stream
)),
442 stream
->trace
->path
->str
, stream
->file_name
->str
);
443 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
449 * If we previously received a discarded packets message with a
450 * time range, make sure that its beginning and end times match
451 * what's expected for CTF 1.8, that is:
453 * * Its beginning time is the previous packet's end time.
455 * * Its end time is the current packet's beginning time.
457 if (stream
->discarded_packets_state
.in_range
) {
458 uint64_t expected_end_cs
;
461 * `stream->discarded_packets_state.in_range` is only
462 * set when the stream class's discarded packets have a
465 * It is required that the packet beginning and end
466 * messages for this stream class have times when
467 * discarded packets have a time range.
469 BT_ASSERT(stream
->sc
->discarded_packets_has_ts
);
470 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
471 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
474 * It is not supported to have a discarded packets
475 * message _before_ the first packet: we cannot validate
476 * that its beginning time is compatible with CTF 1.8 in
479 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
480 BT_COMP_LOGE("Incompatible discarded packets message "
481 "occuring before the stream's first packet: "
482 "stream-id=%" PRIu64
", stream-name=\"%s\", "
483 "trace-name=\"%s\", path=\"%s/%s\"",
484 bt_stream_get_id(ir_stream
),
485 bt_stream_get_name(ir_stream
),
487 bt_stream_borrow_trace_const(ir_stream
)),
488 stream
->trace
->path
->str
, stream
->file_name
->str
);
489 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
493 if (stream
->discarded_packets_state
.beginning_cs
!=
494 stream
->prev_packet_state
.end_cs
) {
495 BT_COMP_LOGE("Incompatible discarded packets message: "
496 "unexpected beginning time: "
497 "beginning-cs-val=%" PRIu64
", "
498 "expected-beginning-cs-val=%" PRIu64
", "
499 "stream-id=%" PRIu64
", stream-name=\"%s\", "
500 "trace-name=\"%s\", path=\"%s/%s\"",
501 stream
->discarded_packets_state
.beginning_cs
,
502 stream
->prev_packet_state
.end_cs
,
503 bt_stream_get_id(ir_stream
),
504 bt_stream_get_name(ir_stream
),
506 bt_stream_borrow_trace_const(ir_stream
)),
507 stream
->trace
->path
->str
, stream
->file_name
->str
);
508 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
512 expected_end_cs
= bt_clock_snapshot_get_value(cs
);
514 if (stream
->discarded_packets_state
.end_cs
!=
516 BT_COMP_LOGE("Incompatible discarded packets message: "
517 "unexpected end time: "
518 "end-cs-val=%" PRIu64
", "
519 "expected-end-cs-val=%" PRIu64
", "
520 "stream-id=%" PRIu64
", stream-name=\"%s\", "
521 "trace-name=\"%s\", path=\"%s/%s\"",
522 stream
->discarded_packets_state
.end_cs
,
524 bt_stream_get_id(ir_stream
),
525 bt_stream_get_name(ir_stream
),
527 bt_stream_borrow_trace_const(ir_stream
)),
528 stream
->trace
->path
->str
, stream
->file_name
->str
);
529 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
535 * We're not in a discarded packets time range anymore since we
536 * require that the discarded packets time ranges go from one
537 * packet's end time to the next packet's beginning time, and
538 * we're handling a packet beginning message here.
540 stream
->discarded_packets_state
.in_range
= false;
542 ret
= fs_sink_stream_open_packet(stream
, cs
, ir_packet
);
544 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
553 bt_component_class_sink_consume_method_status
handle_packet_end_msg(
554 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
557 bt_component_class_sink_consume_method_status status
=
558 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
559 const bt_packet
*ir_packet
=
560 bt_message_packet_end_borrow_packet_const(msg
);
561 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
562 struct fs_sink_stream
*stream
;
563 const bt_clock_snapshot
*cs
= NULL
;
565 stream
= borrow_stream(fs_sink
, ir_stream
);
566 if (G_UNLIKELY(!stream
)) {
567 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
571 if (stream
->sc
->packets_have_ts_end
) {
572 cs
= bt_message_packet_end_borrow_default_clock_snapshot_const(
578 * If we previously received a discarded events message with
579 * a time range, make sure that its end time matches what's
580 * expected for CTF 1.8, that is:
582 * * Its end time is the current packet's end time.
584 * Validation of the discarded events message's beginning time
585 * is performed in handle_packet_beginning_msg().
587 if (stream
->discarded_events_state
.in_range
) {
588 uint64_t expected_cs
;
591 * `stream->discarded_events_state.in_range` is only set
592 * when the stream class's discarded events have a time
595 * It is required that the packet beginning and end
596 * messages for this stream class have times when
597 * discarded events have a time range.
599 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
600 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
601 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
603 expected_cs
= bt_clock_snapshot_get_value(cs
);
605 if (stream
->discarded_events_state
.end_cs
!= expected_cs
) {
606 BT_COMP_LOGE("Incompatible discarded events message: "
607 "unexpected end time: "
608 "end-cs-val=%" PRIu64
", "
609 "expected-end-cs-val=%" PRIu64
", "
610 "stream-id=%" PRIu64
", stream-name=\"%s\", "
611 "trace-name=\"%s\", path=\"%s/%s\"",
612 stream
->discarded_events_state
.end_cs
,
614 bt_stream_get_id(ir_stream
),
615 bt_stream_get_name(ir_stream
),
617 bt_stream_borrow_trace_const(ir_stream
)),
618 stream
->trace
->path
->str
, stream
->file_name
->str
);
619 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
624 ret
= fs_sink_stream_close_packet(stream
, cs
);
626 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
631 * We're not in a discarded events time range anymore since we
632 * require that the discarded events time ranges go from one
633 * packet's end time to the next packet's end time, and we're
634 * handling a packet end message here.
636 stream
->discarded_events_state
.in_range
= false;
643 bt_component_class_sink_consume_method_status
handle_stream_beginning_msg(
644 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
646 bt_component_class_sink_consume_method_status status
=
647 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
648 const bt_stream
*ir_stream
=
649 bt_message_stream_beginning_borrow_stream_const(msg
);
650 const bt_stream_class
*ir_sc
=
651 bt_stream_borrow_class_const(ir_stream
);
652 struct fs_sink_stream
*stream
;
653 bool packets_have_beginning_end_cs
=
654 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc
) &&
655 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc
);
658 * Not supported: discarded events or discarded packets support
659 * without packets support. Packets are the way to know where
660 * discarded events/packets occured in CTF 1.8.
662 if (!bt_stream_class_supports_packets(ir_sc
)) {
663 BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc
));
665 if (!fs_sink
->ignore_discarded_events
&&
666 bt_stream_class_supports_discarded_events(ir_sc
)) {
667 BT_COMP_LOGE("Unsupported stream: "
668 "stream does not support packets, "
669 "but supports discarded events: "
671 "stream-id=%" PRIu64
", "
672 "stream-name=\"%s\"",
673 ir_stream
, bt_stream_get_id(ir_stream
),
674 bt_stream_get_name(ir_stream
));
675 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
681 * Not supported: discarded events with default clock snapshots,
682 * but packet beginning/end without default clock snapshot.
684 if (!fs_sink
->ignore_discarded_events
&&
685 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc
) &&
686 !packets_have_beginning_end_cs
) {
687 BT_COMP_LOGE("Unsupported stream: discarded events have "
688 "default clock snapshots, but packets have no "
689 "beginning and/or end default clock snapshots: "
691 "stream-id=%" PRIu64
", "
692 "stream-name=\"%s\"",
693 ir_stream
, bt_stream_get_id(ir_stream
),
694 bt_stream_get_name(ir_stream
));
695 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
700 * Not supported: discarded packets with default clock
701 * snapshots, but packet beginning/end without default clock
704 if (!fs_sink
->ignore_discarded_packets
&&
705 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc
) &&
706 !packets_have_beginning_end_cs
) {
707 BT_COMP_LOGE("Unsupported stream: discarded packets have "
708 "default clock snapshots, but packets have no "
709 "beginning and/or end default clock snapshots: "
711 "stream-id=%" PRIu64
", "
712 "stream-name=\"%s\"",
713 ir_stream
, bt_stream_get_id(ir_stream
),
714 bt_stream_get_name(ir_stream
));
715 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
719 stream
= borrow_stream(fs_sink
, ir_stream
);
721 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
725 BT_COMP_LOGI("Created new, empty stream file: "
726 "stream-id=%" PRIu64
", stream-name=\"%s\", "
727 "trace-name=\"%s\", path=\"%s/%s\"",
728 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
729 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
730 stream
->trace
->path
->str
, stream
->file_name
->str
);
737 bt_component_class_sink_consume_method_status
handle_stream_end_msg(
738 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
740 bt_component_class_sink_consume_method_status status
=
741 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
742 const bt_stream
*ir_stream
=
743 bt_message_stream_end_borrow_stream_const(msg
);
744 struct fs_sink_stream
*stream
;
746 stream
= borrow_stream(fs_sink
, ir_stream
);
748 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
752 if (G_UNLIKELY(!stream
->sc
->has_packets
&&
753 stream
->packet_state
.is_open
)) {
754 /* Close stream's current artificial packet */
755 int ret
= fs_sink_stream_close_packet(stream
, NULL
);
758 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
763 BT_COMP_LOGI("Closing stream file: "
764 "stream-id=%" PRIu64
", stream-name=\"%s\", "
765 "trace-name=\"%s\", path=\"%s/%s\"",
766 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
767 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
768 stream
->trace
->path
->str
, stream
->file_name
->str
);
771 * This destroys the stream object and frees all its resources,
772 * closing the stream file.
774 g_hash_table_remove(stream
->trace
->streams
, ir_stream
);
781 bt_component_class_sink_consume_method_status
handle_discarded_events_msg(
782 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
784 bt_component_class_sink_consume_method_status status
=
785 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
786 const bt_stream
*ir_stream
=
787 bt_message_discarded_events_borrow_stream_const(msg
);
788 struct fs_sink_stream
*stream
;
789 const bt_clock_snapshot
*cs
= NULL
;
790 bt_property_availability avail
;
793 stream
= borrow_stream(fs_sink
, ir_stream
);
795 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
799 if (fs_sink
->ignore_discarded_events
) {
800 BT_COMP_LOGI("Ignoring discarded events message: "
801 "stream-id=%" PRIu64
", stream-name=\"%s\", "
802 "trace-name=\"%s\", path=\"%s/%s\"",
803 bt_stream_get_id(ir_stream
),
804 bt_stream_get_name(ir_stream
),
806 bt_stream_borrow_trace_const(ir_stream
)),
807 stream
->trace
->path
->str
, stream
->file_name
->str
);
811 if (stream
->discarded_events_state
.in_range
) {
812 BT_COMP_LOGE("Unsupported contiguous discarded events message: "
813 "stream-id=%" PRIu64
", stream-name=\"%s\", "
814 "trace-name=\"%s\", path=\"%s/%s\"",
815 bt_stream_get_id(ir_stream
),
816 bt_stream_get_name(ir_stream
),
818 bt_stream_borrow_trace_const(ir_stream
)),
819 stream
->trace
->path
->str
, stream
->file_name
->str
);
820 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
825 * If we're currently in an opened packet (got a packet
826 * beginning message, but no packet end message yet), we do not
827 * support having a discarded events message with a time range
828 * because we require that the discarded events message's time
829 * range go from a packet's end time to the next packet's end
832 if (stream
->packet_state
.is_open
&&
833 stream
->sc
->discarded_events_has_ts
) {
834 BT_COMP_LOGE("Unsupported discarded events message with "
835 "default clock snapshots occuring within a packet: "
836 "stream-id=%" PRIu64
", stream-name=\"%s\", "
837 "trace-name=\"%s\", path=\"%s/%s\"",
838 bt_stream_get_id(ir_stream
),
839 bt_stream_get_name(ir_stream
),
841 bt_stream_borrow_trace_const(ir_stream
)),
842 stream
->trace
->path
->str
, stream
->file_name
->str
);
843 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
847 if (stream
->sc
->discarded_events_has_ts
) {
849 * Make the stream's state be in the time range of a
850 * discarded events message since we have the message's
851 * time range (`stream->sc->discarded_events_has_ts`).
853 stream
->discarded_events_state
.in_range
= true;
856 * The clock snapshot values will be validated when
857 * handling the next packet beginning and end messages
858 * (next calls to handle_packet_beginning_msg() and
859 * handle_packet_end_msg()).
861 cs
= bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
864 stream
->discarded_events_state
.beginning_cs
=
865 bt_clock_snapshot_get_value(cs
);
866 cs
= bt_message_discarded_events_borrow_end_default_clock_snapshot_const(
869 stream
->discarded_events_state
.end_cs
= bt_clock_snapshot_get_value(cs
);
872 avail
= bt_message_discarded_events_get_count(msg
, &count
);
873 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
875 * There's no specific count of discarded events: set it
876 * to 1 so that we know that we at least discarded
882 stream
->packet_state
.discarded_events_counter
+= count
;
889 bt_component_class_sink_consume_method_status
handle_discarded_packets_msg(
890 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
892 bt_component_class_sink_consume_method_status status
=
893 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
894 const bt_stream
*ir_stream
=
895 bt_message_discarded_packets_borrow_stream_const(msg
);
896 struct fs_sink_stream
*stream
;
897 const bt_clock_snapshot
*cs
= NULL
;
898 bt_property_availability avail
;
901 stream
= borrow_stream(fs_sink
, ir_stream
);
903 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
907 if (fs_sink
->ignore_discarded_packets
) {
908 BT_COMP_LOGI("Ignoring discarded packets message: "
909 "stream-id=%" PRIu64
", stream-name=\"%s\", "
910 "trace-name=\"%s\", path=\"%s/%s\"",
911 bt_stream_get_id(ir_stream
),
912 bt_stream_get_name(ir_stream
),
914 bt_stream_borrow_trace_const(ir_stream
)),
915 stream
->trace
->path
->str
, stream
->file_name
->str
);
919 if (stream
->discarded_packets_state
.in_range
) {
920 BT_COMP_LOGE("Unsupported contiguous discarded packets message: "
921 "stream-id=%" PRIu64
", stream-name=\"%s\", "
922 "trace-name=\"%s\", path=\"%s/%s\"",
923 bt_stream_get_id(ir_stream
),
924 bt_stream_get_name(ir_stream
),
926 bt_stream_borrow_trace_const(ir_stream
)),
927 stream
->trace
->path
->str
, stream
->file_name
->str
);
928 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
933 * Discarded packets messages are guaranteed to occur between
936 BT_ASSERT(!stream
->packet_state
.is_open
);
938 if (stream
->sc
->discarded_packets_has_ts
) {
940 * Make the stream's state be in the time range of a
941 * discarded packets message since we have the message's
942 * time range (`stream->sc->discarded_packets_has_ts`).
944 stream
->discarded_packets_state
.in_range
= true;
947 * The clock snapshot values will be validated when
948 * handling the next packet beginning message (next call
949 * to handle_packet_beginning_msg()).
951 cs
= bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
954 stream
->discarded_packets_state
.beginning_cs
=
955 bt_clock_snapshot_get_value(cs
);
956 cs
= bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(
959 stream
->discarded_packets_state
.end_cs
=
960 bt_clock_snapshot_get_value(cs
);
963 avail
= bt_message_discarded_packets_get_count(msg
, &count
);
964 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
966 * There's no specific count of discarded packets: set
967 * it to 1 so that we know that we at least discarded
973 stream
->packet_state
.seq_num
+= count
;
980 void put_messages(bt_message_array_const msgs
, uint64_t count
)
984 for (i
= 0; i
< count
; i
++) {
985 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
990 bt_component_class_sink_consume_method_status
ctf_fs_sink_consume(
991 bt_self_component_sink
*self_comp
)
993 bt_component_class_sink_consume_method_status status
=
994 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
995 struct fs_sink_comp
*fs_sink
;
996 bt_message_iterator_next_status next_status
;
997 uint64_t msg_count
= 0;
998 bt_message_array_const msgs
;
1000 fs_sink
= bt_self_component_get_data(
1001 bt_self_component_sink_as_self_component(self_comp
));
1003 BT_ASSERT(fs_sink
->upstream_iter
);
1005 /* Consume messages */
1006 next_status
= bt_self_component_port_input_message_iterator_next(
1007 fs_sink
->upstream_iter
, &msgs
, &msg_count
);
1008 if (next_status
< 0) {
1009 status
= (int) next_status
;
1013 switch (next_status
) {
1014 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK
:
1018 for (i
= 0; i
< msg_count
; i
++) {
1019 const bt_message
*msg
= msgs
[i
];
1023 switch (bt_message_get_type(msg
)) {
1024 case BT_MESSAGE_TYPE_EVENT
:
1025 status
= handle_event_msg(fs_sink
, msg
);
1027 case BT_MESSAGE_TYPE_PACKET_BEGINNING
:
1028 status
= handle_packet_beginning_msg(
1031 case BT_MESSAGE_TYPE_PACKET_END
:
1032 status
= handle_packet_end_msg(
1035 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY
:
1037 BT_COMP_LOGD_STR("Ignoring message iterator inactivity message.");
1039 case BT_MESSAGE_TYPE_STREAM_BEGINNING
:
1040 status
= handle_stream_beginning_msg(
1043 case BT_MESSAGE_TYPE_STREAM_END
:
1044 status
= handle_stream_end_msg(
1047 case BT_MESSAGE_TYPE_DISCARDED_EVENTS
:
1048 status
= handle_discarded_events_msg(
1051 case BT_MESSAGE_TYPE_DISCARDED_PACKETS
:
1052 status
= handle_discarded_packets_msg(
1059 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
1061 if (status
!= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
) {
1062 BT_COMP_LOGE("Failed to handle message: "
1063 "generated CTF traces could be incomplete: "
1064 "output-dir-path=\"%s\"",
1065 fs_sink
->output_dir_path
->str
);
1072 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN
:
1073 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN
;
1075 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END
:
1076 /* TODO: Finalize all traces (should already be done?) */
1077 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END
;
1079 case BT_MESSAGE_ITERATOR_NEXT_STATUS_MEMORY_ERROR
:
1080 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_MEMORY_ERROR
;
1082 case BT_MESSAGE_ITERATOR_NEXT_STATUS_ERROR
:
1083 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_MEMORY_ERROR
;
1092 BT_ASSERT(status
!= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
);
1093 put_messages(msgs
, msg_count
);
1100 bt_component_class_sink_graph_is_configured_method_status
1101 ctf_fs_sink_graph_is_configured(
1102 bt_self_component_sink
*self_comp
)
1104 bt_component_class_sink_graph_is_configured_method_status status
;
1105 bt_self_component_port_input_message_iterator_create_from_sink_component_status
1107 struct fs_sink_comp
*fs_sink
= bt_self_component_get_data(
1108 bt_self_component_sink_as_self_component(self_comp
));
1111 bt_self_component_port_input_message_iterator_create_from_sink_component(
1113 bt_self_component_sink_borrow_input_port_by_name(
1114 self_comp
, in_port_name
), &fs_sink
->upstream_iter
);
1115 if (msg_iter_status
!= BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_CREATE_FROM_SINK_COMPONENT_STATUS_OK
) {
1116 status
= (int) msg_iter_status
;
1120 status
= BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK
;
1126 void ctf_fs_sink_finalize(bt_self_component_sink
*self_comp
)
1128 struct fs_sink_comp
*fs_sink
= bt_self_component_get_data(
1129 bt_self_component_sink_as_self_component(self_comp
));
1131 destroy_fs_sink_comp(fs_sink
);