2 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23 #define BT_LOG_TAG "PLUGIN-CTF-FS-SINK"
26 #include <babeltrace/babeltrace.h>
30 #include <babeltrace/assert-internal.h>
31 #include <babeltrace/ctfser-internal.h>
34 #include "fs-sink-trace.h"
35 #include "fs-sink-stream.h"
36 #include "fs-sink-ctf-meta.h"
37 #include "translate-trace-ir-to-ctf-ir.h"
38 #include "translate-ctf-ir-to-tsdl.h"
41 const char * const in_port_name
= "in";
44 bt_self_component_status
ensure_output_dir_exists(
45 struct fs_sink_comp
*fs_sink
)
47 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
50 ret
= g_mkdir_with_parents(fs_sink
->output_dir_path
->str
, 0755);
52 BT_LOGE_ERRNO("Cannot create directories for output directory",
53 ": output-dir-path=\"%s\"",
54 fs_sink
->output_dir_path
->str
);
55 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
64 bt_self_component_status
configure_component(struct fs_sink_comp
*fs_sink
,
65 const bt_value
*params
)
67 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
68 const bt_value
*value
;
70 value
= bt_value_map_borrow_entry_value_const(params
, "path");
72 BT_LOGE_STR("Missing mandatory `path` parameter.");
73 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
77 if (!bt_value_is_string(value
)) {
78 BT_LOGE_STR("`path` parameter: expecting a string.");
79 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
83 g_string_assign(fs_sink
->output_dir_path
,
84 bt_value_string_get(value
));
85 value
= bt_value_map_borrow_entry_value_const(params
,
86 "assume-single-trace");
88 if (!bt_value_is_bool(value
)) {
89 BT_LOGE_STR("`assume-single-trace` parameter: expecting a boolean.");
90 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
94 fs_sink
->assume_single_trace
= (bool) bt_value_bool_get(value
);
97 value
= bt_value_map_borrow_entry_value_const(params
,
98 "ignore-discarded-events");
100 if (!bt_value_is_bool(value
)) {
101 BT_LOGE_STR("`ignore-discarded-events` parameter: expecting a boolean.");
102 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
106 fs_sink
->ignore_discarded_events
=
107 (bool) bt_value_bool_get(value
);
110 value
= bt_value_map_borrow_entry_value_const(params
,
111 "ignore-discarded-packets");
113 if (!bt_value_is_bool(value
)) {
114 BT_LOGE_STR("`ignore-discarded-packets` parameter: expecting a boolean.");
115 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
119 fs_sink
->ignore_discarded_packets
=
120 (bool) bt_value_bool_get(value
);
123 value
= bt_value_map_borrow_entry_value_const(params
,
126 if (!bt_value_is_bool(value
)) {
127 BT_LOGE_STR("`quiet` parameter: expecting a boolean.");
128 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
132 fs_sink
->quiet
= (bool) bt_value_bool_get(value
);
140 void destroy_fs_sink_comp(struct fs_sink_comp
*fs_sink
)
146 if (fs_sink
->output_dir_path
) {
147 g_string_free(fs_sink
->output_dir_path
, TRUE
);
148 fs_sink
->output_dir_path
= NULL
;
151 if (fs_sink
->traces
) {
152 g_hash_table_destroy(fs_sink
->traces
);
153 fs_sink
->traces
= NULL
;
156 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(
157 fs_sink
->upstream_iter
);
165 bt_self_component_status
ctf_fs_sink_init(
166 bt_self_component_sink
*self_comp
, const bt_value
*params
,
167 void *init_method_data
)
169 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
170 struct fs_sink_comp
*fs_sink
= NULL
;
172 fs_sink
= g_new0(struct fs_sink_comp
, 1);
174 BT_LOGE_STR("Failed to allocate one CTF FS sink structure.");
175 status
= BT_SELF_COMPONENT_STATUS_NOMEM
;
179 fs_sink
->output_dir_path
= g_string_new(NULL
);
180 fs_sink
->self_comp
= self_comp
;
181 status
= configure_component(fs_sink
, params
);
182 if (status
!= BT_SELF_COMPONENT_STATUS_OK
) {
183 /* configure_component() logs errors */
187 if (fs_sink
->assume_single_trace
&&
188 g_file_test(fs_sink
->output_dir_path
->str
,
189 G_FILE_TEST_EXISTS
)) {
190 BT_LOGE("Single trace mode, but output path exists: "
191 "output-path=\"%s\"", fs_sink
->output_dir_path
->str
);
192 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
196 status
= ensure_output_dir_exists(fs_sink
);
197 if (status
!= BT_SELF_COMPONENT_STATUS_OK
) {
198 /* ensure_output_dir_exists() logs errors */
202 fs_sink
->traces
= g_hash_table_new_full(g_direct_hash
, g_direct_equal
,
203 NULL
, (GDestroyNotify
) fs_sink_trace_destroy
);
204 if (!fs_sink
->traces
) {
205 BT_LOGE_STR("Failed to allocate one GHashTable.");
206 status
= BT_SELF_COMPONENT_STATUS_NOMEM
;
210 status
= bt_self_component_sink_add_input_port(self_comp
, in_port_name
,
212 if (status
!= BT_SELF_COMPONENT_STATUS_OK
) {
216 bt_self_component_set_data(
217 bt_self_component_sink_as_self_component(self_comp
), fs_sink
);
220 if (status
!= BT_SELF_COMPONENT_STATUS_OK
) {
221 destroy_fs_sink_comp(fs_sink
);
228 struct fs_sink_stream
*borrow_stream(struct fs_sink_comp
*fs_sink
,
229 const bt_stream
*ir_stream
)
231 const bt_trace
*ir_trace
= bt_stream_borrow_trace_const(ir_stream
);
232 struct fs_sink_trace
*trace
;
233 struct fs_sink_stream
*stream
= NULL
;
235 trace
= g_hash_table_lookup(fs_sink
->traces
, ir_trace
);
236 if (unlikely(!trace
)) {
237 if (fs_sink
->assume_single_trace
&&
238 g_hash_table_size(fs_sink
->traces
) > 0) {
239 BT_LOGE("Single trace mode, but getting more than one trace: "
240 "stream-name=\"%s\"",
241 bt_stream_get_name(ir_stream
));
245 trace
= fs_sink_trace_create(fs_sink
, ir_trace
);
251 stream
= g_hash_table_lookup(trace
->streams
, ir_stream
);
252 if (unlikely(!stream
)) {
253 stream
= fs_sink_stream_create(trace
, ir_stream
);
264 bt_self_component_status
handle_event_msg(struct fs_sink_comp
*fs_sink
,
265 const bt_message
*msg
)
268 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
269 const bt_event
*ir_event
= bt_message_event_borrow_event_const(msg
);
270 const bt_stream
*ir_stream
= bt_event_borrow_stream_const(ir_event
);
271 struct fs_sink_stream
*stream
;
272 struct fs_sink_ctf_event_class
*ec
= NULL
;
273 const bt_clock_snapshot
*cs
= NULL
;
275 stream
= borrow_stream(fs_sink
, ir_stream
);
276 if (unlikely(!stream
)) {
277 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
281 ret
= try_translate_event_class_trace_ir_to_ctf_ir(stream
->sc
,
282 bt_event_borrow_class_const(ir_event
), &ec
);
284 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
290 if (stream
->sc
->default_clock_class
) {
291 cs
= bt_message_event_borrow_default_clock_snapshot_const(
295 ret
= fs_sink_stream_write_event(stream
, cs
, ir_event
, ec
);
297 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
306 bt_self_component_status
handle_packet_beginning_msg(
307 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
310 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
311 const bt_packet
*ir_packet
=
312 bt_message_packet_beginning_borrow_packet_const(msg
);
313 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
314 struct fs_sink_stream
*stream
;
315 const bt_clock_snapshot
*cs
= NULL
;
317 stream
= borrow_stream(fs_sink
, ir_stream
);
318 if (unlikely(!stream
)) {
319 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
323 if (stream
->sc
->default_clock_class
) {
324 cs
= bt_message_packet_beginning_borrow_default_clock_snapshot_const(
329 if (stream
->discarded_events_state
.in_range
) {
331 * Make sure that the current discarded events range's
332 * beginning time matches what's expected for CTF 1.8.
334 if (stream
->sc
->default_clock_class
) {
335 uint64_t expected_cs
;
337 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
338 /* We're opening the first packet */
339 expected_cs
= bt_clock_snapshot_get_value(cs
);
341 expected_cs
= stream
->prev_packet_state
.end_cs
;
344 if (stream
->discarded_events_state
.beginning_cs
!=
346 BT_LOGE("Incompatible discarded events message: "
347 "unexpected beginning time: "
348 "beginning-cs-val=%" PRIu64
", "
349 "expected-beginning-cs-val=%" PRIu64
", "
350 "stream-id=%" PRIu64
", stream-name=\"%s\", "
351 "trace-name=\"%s\", path=\"%s/%s\"",
352 stream
->discarded_events_state
.beginning_cs
,
354 bt_stream_get_id(ir_stream
),
355 bt_stream_get_name(ir_stream
),
357 bt_stream_borrow_trace_const(ir_stream
)),
358 stream
->trace
->path
->str
, stream
->file_name
->str
);
359 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
366 if (stream
->discarded_packets_state
.in_range
) {
367 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
368 BT_LOGE("Incompatible discarded packets message "
369 "occuring before the stream's first packet: "
370 "stream-id=%" PRIu64
", stream-name=\"%s\", "
371 "trace-name=\"%s\", path=\"%s/%s\"",
372 bt_stream_get_id(ir_stream
),
373 bt_stream_get_name(ir_stream
),
375 bt_stream_borrow_trace_const(ir_stream
)),
376 stream
->trace
->path
->str
, stream
->file_name
->str
);
377 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
382 * Make sure that the current discarded packets range's
383 * beginning and end times match what's expected for CTF
386 if (stream
->sc
->default_clock_class
) {
387 uint64_t expected_end_cs
=
388 bt_clock_snapshot_get_value(cs
);
390 if (stream
->discarded_packets_state
.beginning_cs
!=
391 stream
->prev_packet_state
.end_cs
) {
392 BT_LOGE("Incompatible discarded packets message: "
393 "unexpected beginning time: "
394 "beginning-cs-val=%" PRIu64
", "
395 "expected-beginning-cs-val=%" PRIu64
", "
396 "stream-id=%" PRIu64
", stream-name=\"%s\", "
397 "trace-name=\"%s\", path=\"%s/%s\"",
398 stream
->discarded_packets_state
.beginning_cs
,
399 stream
->prev_packet_state
.end_cs
,
400 bt_stream_get_id(ir_stream
),
401 bt_stream_get_name(ir_stream
),
403 bt_stream_borrow_trace_const(ir_stream
)),
404 stream
->trace
->path
->str
, stream
->file_name
->str
);
405 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
409 if (stream
->discarded_packets_state
.end_cs
!=
411 BT_LOGE("Incompatible discarded packets message: "
412 "unexpected end time: "
413 "end-cs-val=%" PRIu64
", "
414 "expected-end-cs-val=%" PRIu64
", "
415 "stream-id=%" PRIu64
", stream-name=\"%s\", "
416 "trace-name=\"%s\", path=\"%s/%s\"",
417 stream
->discarded_packets_state
.beginning_cs
,
419 bt_stream_get_id(ir_stream
),
420 bt_stream_get_name(ir_stream
),
422 bt_stream_borrow_trace_const(ir_stream
)),
423 stream
->trace
->path
->str
, stream
->file_name
->str
);
424 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
430 stream
->discarded_packets_state
.in_range
= false;
431 ret
= fs_sink_stream_open_packet(stream
, cs
, ir_packet
);
433 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
442 bt_self_component_status
handle_packet_end_msg(
443 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
446 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
447 const bt_packet
*ir_packet
=
448 bt_message_packet_end_borrow_packet_const(msg
);
449 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
450 struct fs_sink_stream
*stream
;
451 const bt_clock_snapshot
*cs
= NULL
;
453 stream
= borrow_stream(fs_sink
, ir_stream
);
454 if (unlikely(!stream
)) {
455 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
459 if (stream
->sc
->default_clock_class
) {
460 cs
= bt_message_packet_end_borrow_default_clock_snapshot_const(
465 if (stream
->sc
->default_clock_class
) {
466 cs
= bt_message_packet_end_borrow_default_clock_snapshot_const(
471 if (stream
->discarded_events_state
.in_range
) {
473 * Make sure that the current discarded events range's
474 * end time matches what's expected for CTF 1.8.
476 if (stream
->sc
->default_clock_class
) {
477 uint64_t expected_cs
= bt_clock_snapshot_get_value(cs
);
479 if (stream
->discarded_events_state
.end_cs
!=
481 BT_LOGE("Incompatible discarded events message: "
482 "unexpected end time: "
483 "end-cs-val=%" PRIu64
", "
484 "expected-end-cs-val=%" PRIu64
", "
485 "stream-id=%" PRIu64
", stream-name=\"%s\", "
486 "trace-name=\"%s\", path=\"%s/%s\"",
487 stream
->discarded_events_state
.end_cs
,
489 bt_stream_get_id(ir_stream
),
490 bt_stream_get_name(ir_stream
),
492 bt_stream_borrow_trace_const(ir_stream
)),
493 stream
->trace
->path
->str
, stream
->file_name
->str
);
494 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
500 ret
= fs_sink_stream_close_packet(stream
, cs
);
502 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
506 stream
->discarded_events_state
.in_range
= false;
513 bt_self_component_status
handle_stream_beginning_msg(
514 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
516 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
517 const bt_stream
*ir_stream
=
518 bt_message_stream_beginning_borrow_stream_const(msg
);
519 const bt_stream_class
*ir_sc
=
520 bt_stream_borrow_class_const(ir_stream
);
521 struct fs_sink_stream
*stream
;
524 * Temporary: if the stream's class has a default clock class,
525 * make sure packet beginning and end messages have default
526 * clock snapshots until the support for not having them is
529 if (bt_stream_class_borrow_default_clock_class_const(ir_sc
)) {
530 if (!bt_stream_class_packets_have_default_beginning_clock_snapshot(
532 BT_LOGE("Unsupported stream: packets have "
533 "no beginning clock snapshot: "
535 "stream-id=%" PRIu64
", "
536 "stream-name=\"%s\"",
537 ir_stream
, bt_stream_get_id(ir_stream
),
538 bt_stream_get_name(ir_stream
));
539 status
= BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR
;
543 if (!bt_stream_class_packets_have_default_end_clock_snapshot(
545 BT_LOGE("Unsupported stream: packets have "
546 "no end clock snapshot: "
548 "stream-id=%" PRIu64
", "
549 "stream-name=\"%s\"",
550 ir_stream
, bt_stream_get_id(ir_stream
),
551 bt_stream_get_name(ir_stream
));
552 status
= BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR
;
556 if (!fs_sink
->ignore_discarded_events
&&
557 bt_stream_class_supports_discarded_events(ir_sc
) &&
558 !bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc
)) {
559 BT_LOGE("Unsupported stream: discarded events "
560 "have no clock snapshots: "
562 "stream-id=%" PRIu64
", "
563 "stream-name=\"%s\"",
564 ir_stream
, bt_stream_get_id(ir_stream
),
565 bt_stream_get_name(ir_stream
));
566 status
= BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR
;
570 if (!fs_sink
->ignore_discarded_packets
&&
571 bt_stream_class_supports_discarded_packets(ir_sc
) &&
572 !bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc
)) {
573 BT_LOGE("Unsupported stream: discarded packets "
574 "have no clock snapshots: "
576 "stream-id=%" PRIu64
", "
577 "stream-name=\"%s\"",
578 ir_stream
, bt_stream_get_id(ir_stream
),
579 bt_stream_get_name(ir_stream
));
580 status
= BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR
;
585 stream
= borrow_stream(fs_sink
, ir_stream
);
587 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
591 BT_LOGI("Created new, empty stream file: "
592 "stream-id=%" PRIu64
", stream-name=\"%s\", "
593 "trace-name=\"%s\", path=\"%s/%s\"",
594 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
595 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
596 stream
->trace
->path
->str
, stream
->file_name
->str
);
603 bt_self_component_status
handle_stream_end_msg(struct fs_sink_comp
*fs_sink
,
604 const bt_message
*msg
)
606 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
607 const bt_stream
*ir_stream
=
608 bt_message_stream_end_borrow_stream_const(msg
);
609 struct fs_sink_stream
*stream
;
611 stream
= borrow_stream(fs_sink
, ir_stream
);
613 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
617 BT_LOGI("Closing stream file: "
618 "stream-id=%" PRIu64
", stream-name=\"%s\", "
619 "trace-name=\"%s\", path=\"%s/%s\"",
620 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
621 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
622 stream
->trace
->path
->str
, stream
->file_name
->str
);
625 * This destroys the stream object and frees all its resources,
626 * closing the stream file.
628 g_hash_table_remove(stream
->trace
->streams
, ir_stream
);
635 bt_self_component_status
handle_discarded_events_msg(
636 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
638 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
639 const bt_stream
*ir_stream
=
640 bt_message_discarded_events_borrow_stream_const(msg
);
641 struct fs_sink_stream
*stream
;
642 const bt_clock_snapshot
*cs
= NULL
;
643 bt_property_availability avail
;
646 stream
= borrow_stream(fs_sink
, ir_stream
);
648 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
652 if (fs_sink
->ignore_discarded_events
) {
653 BT_LOGI("Ignoring discarded events message: "
654 "stream-id=%" PRIu64
", stream-name=\"%s\", "
655 "trace-name=\"%s\", path=\"%s/%s\"",
656 bt_stream_get_id(ir_stream
),
657 bt_stream_get_name(ir_stream
),
659 bt_stream_borrow_trace_const(ir_stream
)),
660 stream
->trace
->path
->str
, stream
->file_name
->str
);
664 if (stream
->discarded_events_state
.in_range
) {
665 BT_LOGE("Unsupported contiguous discarded events message: "
666 "stream-id=%" PRIu64
", stream-name=\"%s\", "
667 "trace-name=\"%s\", path=\"%s/%s\"",
668 bt_stream_get_id(ir_stream
),
669 bt_stream_get_name(ir_stream
),
671 bt_stream_borrow_trace_const(ir_stream
)),
672 stream
->trace
->path
->str
, stream
->file_name
->str
);
673 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
677 if (stream
->packet_state
.is_open
) {
678 BT_LOGE("Unsupported discarded events message occuring "
680 "stream-id=%" PRIu64
", stream-name=\"%s\", "
681 "trace-name=\"%s\", path=\"%s/%s\"",
682 bt_stream_get_id(ir_stream
),
683 bt_stream_get_name(ir_stream
),
685 bt_stream_borrow_trace_const(ir_stream
)),
686 stream
->trace
->path
->str
, stream
->file_name
->str
);
687 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
691 stream
->discarded_events_state
.in_range
= true;
693 if (stream
->sc
->default_clock_class
) {
695 * The clock snapshot values will be validated when
696 * handling the next "packet beginning" message.
698 cs
= bt_message_discarded_events_borrow_default_beginning_clock_snapshot_const(
701 stream
->discarded_events_state
.beginning_cs
=
702 bt_clock_snapshot_get_value(cs
);
703 cs
= bt_message_discarded_events_borrow_default_end_clock_snapshot_const(
706 stream
->discarded_events_state
.end_cs
=
707 bt_clock_snapshot_get_value(cs
);
709 stream
->discarded_events_state
.beginning_cs
= UINT64_C(-1);
710 stream
->discarded_events_state
.end_cs
= UINT64_C(-1);
713 avail
= bt_message_discarded_events_get_count(msg
, &count
);
714 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
718 stream
->packet_state
.discarded_events_counter
+= count
;
725 bt_self_component_status
handle_discarded_packets_msg(
726 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
728 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
729 const bt_stream
*ir_stream
=
730 bt_message_discarded_packets_borrow_stream_const(msg
);
731 struct fs_sink_stream
*stream
;
732 const bt_clock_snapshot
*cs
= NULL
;
733 bt_property_availability avail
;
736 stream
= borrow_stream(fs_sink
, ir_stream
);
738 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
742 if (fs_sink
->ignore_discarded_packets
) {
743 BT_LOGI("Ignoring discarded packets message: "
744 "stream-id=%" PRIu64
", stream-name=\"%s\", "
745 "trace-name=\"%s\", path=\"%s/%s\"",
746 bt_stream_get_id(ir_stream
),
747 bt_stream_get_name(ir_stream
),
749 bt_stream_borrow_trace_const(ir_stream
)),
750 stream
->trace
->path
->str
, stream
->file_name
->str
);
754 if (stream
->discarded_packets_state
.in_range
) {
755 BT_LOGE("Unsupported contiguous discarded packets message: "
756 "stream-id=%" PRIu64
", stream-name=\"%s\", "
757 "trace-name=\"%s\", path=\"%s/%s\"",
758 bt_stream_get_id(ir_stream
),
759 bt_stream_get_name(ir_stream
),
761 bt_stream_borrow_trace_const(ir_stream
)),
762 stream
->trace
->path
->str
, stream
->file_name
->str
);
763 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
767 if (stream
->packet_state
.is_open
) {
768 BT_LOGE("Unsupported discarded packets message occuring "
770 "stream-id=%" PRIu64
", stream-name=\"%s\", "
771 "trace-name=\"%s\", path=\"%s/%s\"",
772 bt_stream_get_id(ir_stream
),
773 bt_stream_get_name(ir_stream
),
775 bt_stream_borrow_trace_const(ir_stream
)),
776 stream
->trace
->path
->str
, stream
->file_name
->str
);
777 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
781 stream
->discarded_packets_state
.in_range
= true;
783 if (stream
->sc
->default_clock_class
) {
785 * The clock snapshot values will be validated when
786 * handling the next "packet beginning" message.
788 cs
= bt_message_discarded_packets_borrow_default_beginning_clock_snapshot_const(
791 stream
->discarded_packets_state
.beginning_cs
=
792 bt_clock_snapshot_get_value(cs
);
793 cs
= bt_message_discarded_packets_borrow_default_end_clock_snapshot_const(
796 stream
->discarded_packets_state
.end_cs
=
797 bt_clock_snapshot_get_value(cs
);
799 stream
->discarded_packets_state
.beginning_cs
= UINT64_C(-1);
800 stream
->discarded_packets_state
.end_cs
= UINT64_C(-1);
803 avail
= bt_message_discarded_packets_get_count(msg
, &count
);
804 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
808 stream
->packet_state
.seq_num
+= count
;
815 void put_messages(bt_message_array_const msgs
, uint64_t count
)
819 for (i
= 0; i
< count
; i
++) {
820 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
825 bt_self_component_status
ctf_fs_sink_consume(bt_self_component_sink
*self_comp
)
827 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
828 struct fs_sink_comp
*fs_sink
;
829 bt_message_iterator_status it_status
;
830 uint64_t msg_count
= 0;
831 bt_message_array_const msgs
;
833 fs_sink
= bt_self_component_get_data(
834 bt_self_component_sink_as_self_component(self_comp
));
836 BT_ASSERT(fs_sink
->upstream_iter
);
838 /* Consume messages */
839 it_status
= bt_self_component_port_input_message_iterator_next(
840 fs_sink
->upstream_iter
, &msgs
, &msg_count
);
842 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
847 case BT_MESSAGE_ITERATOR_STATUS_OK
:
851 for (i
= 0; i
< msg_count
; i
++) {
852 const bt_message
*msg
= msgs
[i
];
856 switch (bt_message_get_type(msg
)) {
857 case BT_MESSAGE_TYPE_EVENT
:
858 status
= handle_event_msg(fs_sink
, msg
);
860 case BT_MESSAGE_TYPE_PACKET_BEGINNING
:
861 status
= handle_packet_beginning_msg(
864 case BT_MESSAGE_TYPE_PACKET_END
:
865 status
= handle_packet_end_msg(
868 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY
:
870 BT_LOGD_STR("Ignoring message iterator inactivity message.");
872 case BT_MESSAGE_TYPE_STREAM_BEGINNING
:
873 status
= handle_stream_beginning_msg(
876 case BT_MESSAGE_TYPE_STREAM_END
:
877 status
= handle_stream_end_msg(
880 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING
:
881 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END
:
882 /* Not supported by CTF 1.8 */
883 BT_LOGD_STR("Ignoring stream activity message.");
885 case BT_MESSAGE_TYPE_DISCARDED_EVENTS
:
886 status
= handle_discarded_events_msg(
889 case BT_MESSAGE_TYPE_DISCARDED_PACKETS
:
890 status
= handle_discarded_packets_msg(
897 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
899 if (status
!= BT_SELF_COMPONENT_STATUS_OK
) {
900 BT_LOGE("Failed to handle message: "
901 "generated CTF traces could be incomplete: "
902 "output-dir-path=\"%s\"",
903 fs_sink
->output_dir_path
->str
);
910 case BT_MESSAGE_ITERATOR_STATUS_AGAIN
:
911 status
= BT_SELF_COMPONENT_STATUS_AGAIN
;
913 case BT_MESSAGE_ITERATOR_STATUS_END
:
914 /* TODO: Finalize all traces (should already be done?) */
915 status
= BT_SELF_COMPONENT_STATUS_END
;
917 case BT_MESSAGE_ITERATOR_STATUS_NOMEM
:
918 status
= BT_SELF_COMPONENT_STATUS_NOMEM
;
920 case BT_MESSAGE_ITERATOR_STATUS_ERROR
:
921 status
= BT_SELF_COMPONENT_STATUS_NOMEM
;
930 BT_ASSERT(status
!= BT_SELF_COMPONENT_STATUS_OK
);
931 put_messages(msgs
, msg_count
);
938 bt_self_component_status
ctf_fs_sink_graph_is_configured(
939 bt_self_component_sink
*self_comp
)
941 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
942 struct fs_sink_comp
*fs_sink
= bt_self_component_get_data(
943 bt_self_component_sink_as_self_component(self_comp
));
945 fs_sink
->upstream_iter
=
946 bt_self_component_port_input_message_iterator_create(
947 bt_self_component_sink_borrow_input_port_by_name(
948 self_comp
, in_port_name
));
949 if (!fs_sink
->upstream_iter
) {
950 status
= BT_SELF_COMPONENT_STATUS_NOMEM
;
959 void ctf_fs_sink_finalize(bt_self_component_sink
*self_comp
)
961 struct fs_sink_comp
*fs_sink
= bt_self_component_get_data(
962 bt_self_component_sink_as_self_component(self_comp
));
964 destroy_fs_sink_comp(fs_sink
);