2 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23 #define BT_COMP_LOG_SELF_COMP (fs_sink->self_comp)
24 #define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level)
25 #define BT_LOG_TAG "PLUGIN/SINK.CTF.FS"
26 #include "logging/comp-logging.h"
28 #include <babeltrace2/babeltrace.h>
32 #include "common/assert.h"
33 #include "ctfser/ctfser.h"
34 #include "plugins/common/param-validation/param-validation.h"
37 #include "fs-sink-trace.h"
38 #include "fs-sink-stream.h"
39 #include "fs-sink-ctf-meta.h"
40 #include "translate-trace-ir-to-ctf-ir.h"
41 #include "translate-ctf-ir-to-tsdl.h"
44 const char * const in_port_name
= "in";
47 bt_component_class_initialize_method_status
ensure_output_dir_exists(
48 struct fs_sink_comp
*fs_sink
)
50 bt_component_class_initialize_method_status status
=
51 BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
;
54 ret
= g_mkdir_with_parents(fs_sink
->output_dir_path
->str
, 0755);
57 "Cannot create directories for output directory",
58 ": output-dir-path=\"%s\"",
59 fs_sink
->output_dir_path
->str
);
60 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
68 static struct bt_param_validation_map_value_entry_descr fs_sink_params_descr
[] = {
69 { "path", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY
, { .type
= BT_VALUE_TYPE_STRING
} },
70 { "assume-single-trace", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
, { .type
= BT_VALUE_TYPE_BOOL
} },
71 { "ignore-discarded-events", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
, { .type
= BT_VALUE_TYPE_BOOL
} },
72 { "ignore-discarded-packets", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
, { .type
= BT_VALUE_TYPE_BOOL
} },
73 { "quiet", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
, { .type
= BT_VALUE_TYPE_BOOL
} },
74 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END
78 bt_component_class_initialize_method_status
79 configure_component(struct fs_sink_comp
*fs_sink
, const bt_value
*params
)
81 bt_component_class_initialize_method_status status
;
82 const bt_value
*value
;
83 enum bt_param_validation_status validation_status
;
84 gchar
*validation_error
;
86 validation_status
= bt_param_validation_validate(params
,
87 fs_sink_params_descr
, &validation_error
);
88 if (validation_status
== BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR
) {
89 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
91 } else if (validation_status
== BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR
) {
92 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
96 value
= bt_value_map_borrow_entry_value_const(params
, "path");
97 g_string_assign(fs_sink
->output_dir_path
,
98 bt_value_string_get(value
));
100 value
= bt_value_map_borrow_entry_value_const(params
,
101 "assume-single-trace");
103 fs_sink
->assume_single_trace
= (bool) bt_value_bool_get(value
);
106 value
= bt_value_map_borrow_entry_value_const(params
,
107 "ignore-discarded-events");
109 fs_sink
->ignore_discarded_events
=
110 (bool) bt_value_bool_get(value
);
113 value
= bt_value_map_borrow_entry_value_const(params
,
114 "ignore-discarded-packets");
116 fs_sink
->ignore_discarded_packets
=
117 (bool) bt_value_bool_get(value
);
120 value
= bt_value_map_borrow_entry_value_const(params
,
123 fs_sink
->quiet
= (bool) bt_value_bool_get(value
);
126 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
;
129 g_free(validation_error
);
134 void destroy_fs_sink_comp(struct fs_sink_comp
*fs_sink
)
140 if (fs_sink
->output_dir_path
) {
141 g_string_free(fs_sink
->output_dir_path
, TRUE
);
142 fs_sink
->output_dir_path
= NULL
;
145 if (fs_sink
->traces
) {
146 g_hash_table_destroy(fs_sink
->traces
);
147 fs_sink
->traces
= NULL
;
150 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(
151 fs_sink
->upstream_iter
);
159 bt_component_class_initialize_method_status
ctf_fs_sink_init(
160 bt_self_component_sink
*self_comp_sink
,
161 bt_self_component_sink_configuration
*config
,
162 const bt_value
*params
,
163 void *init_method_data
)
165 bt_component_class_initialize_method_status status
=
166 BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
;
167 bt_self_component_add_port_status add_port_status
;
168 struct fs_sink_comp
*fs_sink
= NULL
;
169 bt_self_component
*self_comp
=
170 bt_self_component_sink_as_self_component(self_comp_sink
);
171 bt_logging_level log_level
= bt_component_get_logging_level(
172 bt_self_component_as_component(self_comp
));
174 fs_sink
= g_new0(struct fs_sink_comp
, 1);
176 BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR
, log_level
, self_comp
,
177 "Failed to allocate one CTF FS sink structure.");
178 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
182 fs_sink
->log_level
= log_level
;
183 fs_sink
->self_comp
= self_comp
;
184 fs_sink
->output_dir_path
= g_string_new(NULL
);
185 status
= configure_component(fs_sink
, params
);
186 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
187 /* configure_component() logs errors */
191 if (fs_sink
->assume_single_trace
&&
192 g_file_test(fs_sink
->output_dir_path
->str
,
193 G_FILE_TEST_EXISTS
)) {
194 BT_COMP_LOGE("Single trace mode, but output path exists: "
195 "output-path=\"%s\"", fs_sink
->output_dir_path
->str
);
196 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
200 status
= ensure_output_dir_exists(fs_sink
);
201 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
202 /* ensure_output_dir_exists() logs errors */
206 fs_sink
->traces
= g_hash_table_new_full(g_direct_hash
, g_direct_equal
,
207 NULL
, (GDestroyNotify
) fs_sink_trace_destroy
);
208 if (!fs_sink
->traces
) {
209 BT_COMP_LOGE_STR("Failed to allocate one GHashTable.");
210 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
214 add_port_status
= bt_self_component_sink_add_input_port(
215 self_comp_sink
, in_port_name
, NULL
, NULL
);
216 switch (add_port_status
) {
217 case BT_SELF_COMPONENT_ADD_PORT_STATUS_ERROR
:
218 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
220 case BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR
:
221 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
227 bt_self_component_set_data(self_comp
, fs_sink
);
230 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
231 destroy_fs_sink_comp(fs_sink
);
238 struct fs_sink_stream
*borrow_stream(struct fs_sink_comp
*fs_sink
,
239 const bt_stream
*ir_stream
)
241 const bt_trace
*ir_trace
= bt_stream_borrow_trace_const(ir_stream
);
242 struct fs_sink_trace
*trace
;
243 struct fs_sink_stream
*stream
= NULL
;
245 trace
= g_hash_table_lookup(fs_sink
->traces
, ir_trace
);
246 if (G_UNLIKELY(!trace
)) {
247 if (fs_sink
->assume_single_trace
&&
248 g_hash_table_size(fs_sink
->traces
) > 0) {
249 BT_COMP_LOGE("Single trace mode, but getting more than one trace: "
250 "stream-name=\"%s\"",
251 bt_stream_get_name(ir_stream
));
255 trace
= fs_sink_trace_create(fs_sink
, ir_trace
);
261 stream
= g_hash_table_lookup(trace
->streams
, ir_stream
);
262 if (G_UNLIKELY(!stream
)) {
263 stream
= fs_sink_stream_create(trace
, ir_stream
);
274 bt_component_class_sink_consume_method_status
handle_event_msg(
275 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
278 bt_component_class_sink_consume_method_status status
=
279 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
280 const bt_event
*ir_event
= bt_message_event_borrow_event_const(msg
);
281 const bt_stream
*ir_stream
= bt_event_borrow_stream_const(ir_event
);
282 struct fs_sink_stream
*stream
;
283 struct fs_sink_ctf_event_class
*ec
= NULL
;
284 const bt_clock_snapshot
*cs
= NULL
;
286 stream
= borrow_stream(fs_sink
, ir_stream
);
287 if (G_UNLIKELY(!stream
)) {
288 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
292 ret
= try_translate_event_class_trace_ir_to_ctf_ir(fs_sink
,
293 stream
->sc
, bt_event_borrow_class_const(ir_event
), &ec
);
295 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
301 if (stream
->sc
->default_clock_class
) {
302 cs
= bt_message_event_borrow_default_clock_snapshot_const(
307 * If this event's stream does not support packets, then we
308 * lazily create artificial packets.
310 * The size of an artificial packet is arbitrarily at least
311 * 4 MiB (it usually is greater because we close it when
312 * comes the time to write a new event and the packet's content
313 * size is >= 4 MiB), except the last one which can be smaller.
315 if (G_UNLIKELY(!stream
->sc
->has_packets
)) {
316 if (stream
->packet_state
.is_open
&&
317 bt_ctfser_get_offset_in_current_packet_bits(&stream
->ctfser
) / 8 >=
320 * Stream's current packet is larger than 4 MiB:
321 * close it. A new packet will be opened just
324 ret
= fs_sink_stream_close_packet(stream
, NULL
);
326 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
331 if (!stream
->packet_state
.is_open
) {
332 /* Stream's packet is not currently opened: open it */
333 ret
= fs_sink_stream_open_packet(stream
, NULL
, NULL
);
335 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
341 BT_ASSERT_DBG(stream
->packet_state
.is_open
);
342 ret
= fs_sink_stream_write_event(stream
, cs
, ir_event
, ec
);
343 if (G_UNLIKELY(ret
)) {
344 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
353 bt_component_class_sink_consume_method_status
handle_packet_beginning_msg(
354 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
357 bt_component_class_sink_consume_method_status status
=
358 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
359 const bt_packet
*ir_packet
=
360 bt_message_packet_beginning_borrow_packet_const(msg
);
361 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
362 struct fs_sink_stream
*stream
;
363 const bt_clock_snapshot
*cs
= NULL
;
365 stream
= borrow_stream(fs_sink
, ir_stream
);
366 if (G_UNLIKELY(!stream
)) {
367 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
371 if (stream
->sc
->packets_have_ts_begin
) {
372 cs
= bt_message_packet_beginning_borrow_default_clock_snapshot_const(
378 * If we previously received a discarded events message with
379 * a time range, make sure that its beginning time matches what's
380 * expected for CTF 1.8, that is:
382 * * Its beginning time is the previous packet's end
383 * time (or the current packet's beginning time if
384 * this is the first packet).
386 * We check this here instead of in handle_packet_end_msg()
387 * because we want to catch any incompatible message as early as
388 * possible to report the error.
390 * Validation of the discarded events message's end time is
391 * performed in handle_packet_end_msg().
393 if (stream
->discarded_events_state
.in_range
) {
394 uint64_t expected_cs
;
397 * `stream->discarded_events_state.in_range` is only set
398 * when the stream class's discarded events have a time
401 * It is required that the packet beginning and end
402 * messages for this stream class have times when
403 * discarded events have a time range.
405 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
406 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
407 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
409 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
410 /* We're opening the first packet */
411 expected_cs
= bt_clock_snapshot_get_value(cs
);
413 expected_cs
= stream
->prev_packet_state
.end_cs
;
416 if (stream
->discarded_events_state
.beginning_cs
!=
418 BT_COMP_LOGE("Incompatible discarded events message: "
419 "unexpected beginning time: "
420 "beginning-cs-val=%" PRIu64
", "
421 "expected-beginning-cs-val=%" PRIu64
", "
422 "stream-id=%" PRIu64
", stream-name=\"%s\", "
423 "trace-name=\"%s\", path=\"%s/%s\"",
424 stream
->discarded_events_state
.beginning_cs
,
426 bt_stream_get_id(ir_stream
),
427 bt_stream_get_name(ir_stream
),
429 bt_stream_borrow_trace_const(ir_stream
)),
430 stream
->trace
->path
->str
, stream
->file_name
->str
);
431 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
437 * If we previously received a discarded packets message with a
438 * time range, make sure that its beginning and end times match
439 * what's expected for CTF 1.8, that is:
441 * * Its beginning time is the previous packet's end time.
443 * * Its end time is the current packet's beginning time.
445 if (stream
->discarded_packets_state
.in_range
) {
446 uint64_t expected_end_cs
;
449 * `stream->discarded_packets_state.in_range` is only
450 * set when the stream class's discarded packets have a
453 * It is required that the packet beginning and end
454 * messages for this stream class have times when
455 * discarded packets have a time range.
457 BT_ASSERT(stream
->sc
->discarded_packets_has_ts
);
458 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
459 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
462 * It is not supported to have a discarded packets
463 * message _before_ the first packet: we cannot validate
464 * that its beginning time is compatible with CTF 1.8 in
467 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
468 BT_COMP_LOGE("Incompatible discarded packets message "
469 "occuring before the stream's first packet: "
470 "stream-id=%" PRIu64
", stream-name=\"%s\", "
471 "trace-name=\"%s\", path=\"%s/%s\"",
472 bt_stream_get_id(ir_stream
),
473 bt_stream_get_name(ir_stream
),
475 bt_stream_borrow_trace_const(ir_stream
)),
476 stream
->trace
->path
->str
, stream
->file_name
->str
);
477 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
481 if (stream
->discarded_packets_state
.beginning_cs
!=
482 stream
->prev_packet_state
.end_cs
) {
483 BT_COMP_LOGE("Incompatible discarded packets message: "
484 "unexpected beginning time: "
485 "beginning-cs-val=%" PRIu64
", "
486 "expected-beginning-cs-val=%" PRIu64
", "
487 "stream-id=%" PRIu64
", stream-name=\"%s\", "
488 "trace-name=\"%s\", path=\"%s/%s\"",
489 stream
->discarded_packets_state
.beginning_cs
,
490 stream
->prev_packet_state
.end_cs
,
491 bt_stream_get_id(ir_stream
),
492 bt_stream_get_name(ir_stream
),
494 bt_stream_borrow_trace_const(ir_stream
)),
495 stream
->trace
->path
->str
, stream
->file_name
->str
);
496 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
500 expected_end_cs
= bt_clock_snapshot_get_value(cs
);
502 if (stream
->discarded_packets_state
.end_cs
!=
504 BT_COMP_LOGE("Incompatible discarded packets message: "
505 "unexpected end time: "
506 "end-cs-val=%" PRIu64
", "
507 "expected-end-cs-val=%" PRIu64
", "
508 "stream-id=%" PRIu64
", stream-name=\"%s\", "
509 "trace-name=\"%s\", path=\"%s/%s\"",
510 stream
->discarded_packets_state
.end_cs
,
512 bt_stream_get_id(ir_stream
),
513 bt_stream_get_name(ir_stream
),
515 bt_stream_borrow_trace_const(ir_stream
)),
516 stream
->trace
->path
->str
, stream
->file_name
->str
);
517 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
523 * We're not in a discarded packets time range anymore since we
524 * require that the discarded packets time ranges go from one
525 * packet's end time to the next packet's beginning time, and
526 * we're handling a packet beginning message here.
528 stream
->discarded_packets_state
.in_range
= false;
530 ret
= fs_sink_stream_open_packet(stream
, cs
, ir_packet
);
532 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
541 bt_component_class_sink_consume_method_status
handle_packet_end_msg(
542 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
545 bt_component_class_sink_consume_method_status status
=
546 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
547 const bt_packet
*ir_packet
=
548 bt_message_packet_end_borrow_packet_const(msg
);
549 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
550 struct fs_sink_stream
*stream
;
551 const bt_clock_snapshot
*cs
= NULL
;
553 stream
= borrow_stream(fs_sink
, ir_stream
);
554 if (G_UNLIKELY(!stream
)) {
555 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
559 if (stream
->sc
->packets_have_ts_end
) {
560 cs
= bt_message_packet_end_borrow_default_clock_snapshot_const(
566 * If we previously received a discarded events message with
567 * a time range, make sure that its end time matches what's
568 * expected for CTF 1.8, that is:
570 * * Its end time is the current packet's end time.
572 * Validation of the discarded events message's beginning time
573 * is performed in handle_packet_beginning_msg().
575 if (stream
->discarded_events_state
.in_range
) {
576 uint64_t expected_cs
;
579 * `stream->discarded_events_state.in_range` is only set
580 * when the stream class's discarded events have a time
583 * It is required that the packet beginning and end
584 * messages for this stream class have times when
585 * discarded events have a time range.
587 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
588 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
589 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
591 expected_cs
= bt_clock_snapshot_get_value(cs
);
593 if (stream
->discarded_events_state
.end_cs
!= expected_cs
) {
594 BT_COMP_LOGE("Incompatible discarded events message: "
595 "unexpected end time: "
596 "end-cs-val=%" PRIu64
", "
597 "expected-end-cs-val=%" PRIu64
", "
598 "stream-id=%" PRIu64
", stream-name=\"%s\", "
599 "trace-name=\"%s\", path=\"%s/%s\"",
600 stream
->discarded_events_state
.end_cs
,
602 bt_stream_get_id(ir_stream
),
603 bt_stream_get_name(ir_stream
),
605 bt_stream_borrow_trace_const(ir_stream
)),
606 stream
->trace
->path
->str
, stream
->file_name
->str
);
607 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
612 ret
= fs_sink_stream_close_packet(stream
, cs
);
614 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
619 * We're not in a discarded events time range anymore since we
620 * require that the discarded events time ranges go from one
621 * packet's end time to the next packet's end time, and we're
622 * handling a packet end message here.
624 stream
->discarded_events_state
.in_range
= false;
631 bt_component_class_sink_consume_method_status
handle_stream_beginning_msg(
632 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
634 bt_component_class_sink_consume_method_status status
=
635 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
636 const bt_stream
*ir_stream
=
637 bt_message_stream_beginning_borrow_stream_const(msg
);
638 const bt_stream_class
*ir_sc
=
639 bt_stream_borrow_class_const(ir_stream
);
640 struct fs_sink_stream
*stream
;
641 bool packets_have_beginning_end_cs
=
642 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc
) &&
643 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc
);
646 * Not supported: discarded events or discarded packets support
647 * without packets support. Packets are the way to know where
648 * discarded events/packets occured in CTF 1.8.
650 if (!bt_stream_class_supports_packets(ir_sc
)) {
651 BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc
));
653 if (!fs_sink
->ignore_discarded_events
&&
654 bt_stream_class_supports_discarded_events(ir_sc
)) {
655 BT_COMP_LOGE("Unsupported stream: "
656 "stream does not support packets, "
657 "but supports discarded events: "
659 "stream-id=%" PRIu64
", "
660 "stream-name=\"%s\"",
661 ir_stream
, bt_stream_get_id(ir_stream
),
662 bt_stream_get_name(ir_stream
));
663 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
669 * Not supported: discarded events with default clock snapshots,
670 * but packet beginning/end without default clock snapshot.
672 if (!fs_sink
->ignore_discarded_events
&&
673 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc
) &&
674 !packets_have_beginning_end_cs
) {
675 BT_COMP_LOGE("Unsupported stream: discarded events have "
676 "default clock snapshots, but packets have no "
677 "beginning and/or end default clock snapshots: "
679 "stream-id=%" PRIu64
", "
680 "stream-name=\"%s\"",
681 ir_stream
, bt_stream_get_id(ir_stream
),
682 bt_stream_get_name(ir_stream
));
683 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
688 * Not supported: discarded packets with default clock
689 * snapshots, but packet beginning/end without default clock
692 if (!fs_sink
->ignore_discarded_packets
&&
693 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc
) &&
694 !packets_have_beginning_end_cs
) {
695 BT_COMP_LOGE("Unsupported stream: discarded packets have "
696 "default clock snapshots, but packets have no "
697 "beginning and/or end default clock snapshots: "
699 "stream-id=%" PRIu64
", "
700 "stream-name=\"%s\"",
701 ir_stream
, bt_stream_get_id(ir_stream
),
702 bt_stream_get_name(ir_stream
));
703 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
707 stream
= borrow_stream(fs_sink
, ir_stream
);
709 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
713 BT_COMP_LOGI("Created new, empty stream file: "
714 "stream-id=%" PRIu64
", stream-name=\"%s\", "
715 "trace-name=\"%s\", path=\"%s/%s\"",
716 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
717 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
718 stream
->trace
->path
->str
, stream
->file_name
->str
);
725 bt_component_class_sink_consume_method_status
handle_stream_end_msg(
726 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
728 bt_component_class_sink_consume_method_status status
=
729 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
730 const bt_stream
*ir_stream
=
731 bt_message_stream_end_borrow_stream_const(msg
);
732 struct fs_sink_stream
*stream
;
734 stream
= borrow_stream(fs_sink
, ir_stream
);
736 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
740 if (G_UNLIKELY(!stream
->sc
->has_packets
&&
741 stream
->packet_state
.is_open
)) {
742 /* Close stream's current artificial packet */
743 int ret
= fs_sink_stream_close_packet(stream
, NULL
);
746 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
751 BT_COMP_LOGI("Closing stream file: "
752 "stream-id=%" PRIu64
", stream-name=\"%s\", "
753 "trace-name=\"%s\", path=\"%s/%s\"",
754 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
755 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
756 stream
->trace
->path
->str
, stream
->file_name
->str
);
759 * This destroys the stream object and frees all its resources,
760 * closing the stream file.
762 g_hash_table_remove(stream
->trace
->streams
, ir_stream
);
769 bt_component_class_sink_consume_method_status
handle_discarded_events_msg(
770 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
772 bt_component_class_sink_consume_method_status status
=
773 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
774 const bt_stream
*ir_stream
=
775 bt_message_discarded_events_borrow_stream_const(msg
);
776 struct fs_sink_stream
*stream
;
777 const bt_clock_snapshot
*cs
= NULL
;
778 bt_property_availability avail
;
781 stream
= borrow_stream(fs_sink
, ir_stream
);
783 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
787 if (fs_sink
->ignore_discarded_events
) {
788 BT_COMP_LOGI("Ignoring discarded events message: "
789 "stream-id=%" PRIu64
", stream-name=\"%s\", "
790 "trace-name=\"%s\", path=\"%s/%s\"",
791 bt_stream_get_id(ir_stream
),
792 bt_stream_get_name(ir_stream
),
794 bt_stream_borrow_trace_const(ir_stream
)),
795 stream
->trace
->path
->str
, stream
->file_name
->str
);
799 if (stream
->discarded_events_state
.in_range
) {
800 BT_COMP_LOGE("Unsupported contiguous discarded events message: "
801 "stream-id=%" PRIu64
", stream-name=\"%s\", "
802 "trace-name=\"%s\", path=\"%s/%s\"",
803 bt_stream_get_id(ir_stream
),
804 bt_stream_get_name(ir_stream
),
806 bt_stream_borrow_trace_const(ir_stream
)),
807 stream
->trace
->path
->str
, stream
->file_name
->str
);
808 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
813 * If we're currently in an opened packet (got a packet
814 * beginning message, but no packet end message yet), we do not
815 * support having a discarded events message with a time range
816 * because we require that the discarded events message's time
817 * range go from a packet's end time to the next packet's end
820 if (stream
->packet_state
.is_open
&&
821 stream
->sc
->discarded_events_has_ts
) {
822 BT_COMP_LOGE("Unsupported discarded events message with "
823 "default clock snapshots occuring within a packet: "
824 "stream-id=%" PRIu64
", stream-name=\"%s\", "
825 "trace-name=\"%s\", path=\"%s/%s\"",
826 bt_stream_get_id(ir_stream
),
827 bt_stream_get_name(ir_stream
),
829 bt_stream_borrow_trace_const(ir_stream
)),
830 stream
->trace
->path
->str
, stream
->file_name
->str
);
831 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
835 if (stream
->sc
->discarded_events_has_ts
) {
837 * Make the stream's state be in the time range of a
838 * discarded events message since we have the message's
839 * time range (`stream->sc->discarded_events_has_ts`).
841 stream
->discarded_events_state
.in_range
= true;
844 * The clock snapshot values will be validated when
845 * handling the next packet beginning and end messages
846 * (next calls to handle_packet_beginning_msg() and
847 * handle_packet_end_msg()).
849 cs
= bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
852 stream
->discarded_events_state
.beginning_cs
=
853 bt_clock_snapshot_get_value(cs
);
854 cs
= bt_message_discarded_events_borrow_end_default_clock_snapshot_const(
857 stream
->discarded_events_state
.end_cs
= bt_clock_snapshot_get_value(cs
);
860 avail
= bt_message_discarded_events_get_count(msg
, &count
);
861 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
863 * There's no specific count of discarded events: set it
864 * to 1 so that we know that we at least discarded
870 stream
->packet_state
.discarded_events_counter
+= count
;
877 bt_component_class_sink_consume_method_status
handle_discarded_packets_msg(
878 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
880 bt_component_class_sink_consume_method_status status
=
881 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
882 const bt_stream
*ir_stream
=
883 bt_message_discarded_packets_borrow_stream_const(msg
);
884 struct fs_sink_stream
*stream
;
885 const bt_clock_snapshot
*cs
= NULL
;
886 bt_property_availability avail
;
889 stream
= borrow_stream(fs_sink
, ir_stream
);
891 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
895 if (fs_sink
->ignore_discarded_packets
) {
896 BT_COMP_LOGI("Ignoring discarded packets message: "
897 "stream-id=%" PRIu64
", stream-name=\"%s\", "
898 "trace-name=\"%s\", path=\"%s/%s\"",
899 bt_stream_get_id(ir_stream
),
900 bt_stream_get_name(ir_stream
),
902 bt_stream_borrow_trace_const(ir_stream
)),
903 stream
->trace
->path
->str
, stream
->file_name
->str
);
907 if (stream
->discarded_packets_state
.in_range
) {
908 BT_COMP_LOGE("Unsupported contiguous discarded packets message: "
909 "stream-id=%" PRIu64
", stream-name=\"%s\", "
910 "trace-name=\"%s\", path=\"%s/%s\"",
911 bt_stream_get_id(ir_stream
),
912 bt_stream_get_name(ir_stream
),
914 bt_stream_borrow_trace_const(ir_stream
)),
915 stream
->trace
->path
->str
, stream
->file_name
->str
);
916 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
921 * Discarded packets messages are guaranteed to occur between
924 BT_ASSERT(!stream
->packet_state
.is_open
);
926 if (stream
->sc
->discarded_packets_has_ts
) {
928 * Make the stream's state be in the time range of a
929 * discarded packets message since we have the message's
930 * time range (`stream->sc->discarded_packets_has_ts`).
932 stream
->discarded_packets_state
.in_range
= true;
935 * The clock snapshot values will be validated when
936 * handling the next packet beginning message (next call
937 * to handle_packet_beginning_msg()).
939 cs
= bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
942 stream
->discarded_packets_state
.beginning_cs
=
943 bt_clock_snapshot_get_value(cs
);
944 cs
= bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(
947 stream
->discarded_packets_state
.end_cs
=
948 bt_clock_snapshot_get_value(cs
);
951 avail
= bt_message_discarded_packets_get_count(msg
, &count
);
952 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
954 * There's no specific count of discarded packets: set
955 * it to 1 so that we know that we at least discarded
961 stream
->packet_state
.seq_num
+= count
;
968 void put_messages(bt_message_array_const msgs
, uint64_t count
)
972 for (i
= 0; i
< count
; i
++) {
973 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
978 bt_component_class_sink_consume_method_status
ctf_fs_sink_consume(
979 bt_self_component_sink
*self_comp
)
981 bt_component_class_sink_consume_method_status status
=
982 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
983 struct fs_sink_comp
*fs_sink
;
984 bt_message_iterator_next_status next_status
;
985 uint64_t msg_count
= 0;
986 bt_message_array_const msgs
;
988 fs_sink
= bt_self_component_get_data(
989 bt_self_component_sink_as_self_component(self_comp
));
990 BT_ASSERT_DBG(fs_sink
);
991 BT_ASSERT_DBG(fs_sink
->upstream_iter
);
993 /* Consume messages */
994 next_status
= bt_self_component_port_input_message_iterator_next(
995 fs_sink
->upstream_iter
, &msgs
, &msg_count
);
996 if (next_status
< 0) {
997 status
= (int) next_status
;
1001 switch (next_status
) {
1002 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK
:
1006 for (i
= 0; i
< msg_count
; i
++) {
1007 const bt_message
*msg
= msgs
[i
];
1011 switch (bt_message_get_type(msg
)) {
1012 case BT_MESSAGE_TYPE_EVENT
:
1013 status
= handle_event_msg(fs_sink
, msg
);
1015 case BT_MESSAGE_TYPE_PACKET_BEGINNING
:
1016 status
= handle_packet_beginning_msg(
1019 case BT_MESSAGE_TYPE_PACKET_END
:
1020 status
= handle_packet_end_msg(
1023 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY
:
1025 BT_COMP_LOGD_STR("Ignoring message iterator inactivity message.");
1027 case BT_MESSAGE_TYPE_STREAM_BEGINNING
:
1028 status
= handle_stream_beginning_msg(
1031 case BT_MESSAGE_TYPE_STREAM_END
:
1032 status
= handle_stream_end_msg(
1035 case BT_MESSAGE_TYPE_DISCARDED_EVENTS
:
1036 status
= handle_discarded_events_msg(
1039 case BT_MESSAGE_TYPE_DISCARDED_PACKETS
:
1040 status
= handle_discarded_packets_msg(
1047 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
1049 if (status
!= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
) {
1050 BT_COMP_LOGE("Failed to handle message: "
1051 "generated CTF traces could be incomplete: "
1052 "output-dir-path=\"%s\"",
1053 fs_sink
->output_dir_path
->str
);
1060 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN
:
1061 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN
;
1063 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END
:
1064 /* TODO: Finalize all traces (should already be done?) */
1065 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END
;
1067 case BT_MESSAGE_ITERATOR_NEXT_STATUS_MEMORY_ERROR
:
1068 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_MEMORY_ERROR
;
1070 case BT_MESSAGE_ITERATOR_NEXT_STATUS_ERROR
:
1071 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_MEMORY_ERROR
;
1080 BT_ASSERT(status
!= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
);
1081 put_messages(msgs
, msg_count
);
1088 bt_component_class_sink_graph_is_configured_method_status
1089 ctf_fs_sink_graph_is_configured(
1090 bt_self_component_sink
*self_comp
)
1092 bt_component_class_sink_graph_is_configured_method_status status
;
1093 bt_self_component_port_input_message_iterator_create_from_sink_component_status
1095 struct fs_sink_comp
*fs_sink
= bt_self_component_get_data(
1096 bt_self_component_sink_as_self_component(self_comp
));
1099 bt_self_component_port_input_message_iterator_create_from_sink_component(
1101 bt_self_component_sink_borrow_input_port_by_name(
1102 self_comp
, in_port_name
), &fs_sink
->upstream_iter
);
1103 if (msg_iter_status
!= BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_CREATE_FROM_SINK_COMPONENT_STATUS_OK
) {
1104 status
= (int) msg_iter_status
;
1108 status
= BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK
;
1114 void ctf_fs_sink_finalize(bt_self_component_sink
*self_comp
)
1116 struct fs_sink_comp
*fs_sink
= bt_self_component_get_data(
1117 bt_self_component_sink_as_self_component(self_comp
));
1119 destroy_fs_sink_comp(fs_sink
);