2 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23 #define BT_LOG_TAG "PLUGIN-CTF-FS-SINK"
26 #include <babeltrace/babeltrace.h>
30 #include <babeltrace/assert-internal.h>
31 #include <babeltrace/ctfser-internal.h>
34 #include "fs-sink-trace.h"
35 #include "fs-sink-stream.h"
36 #include "fs-sink-ctf-meta.h"
37 #include "translate-trace-ir-to-ctf-ir.h"
38 #include "translate-ctf-ir-to-tsdl.h"
41 const char * const in_port_name
= "in";
44 bt_self_component_status
ensure_output_dir_exists(
45 struct fs_sink_comp
*fs_sink
)
47 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
50 ret
= g_mkdir_with_parents(fs_sink
->output_dir_path
->str
, 0755);
52 BT_LOGE_ERRNO("Cannot create directories for output directory",
53 ": output-dir-path=\"%s\"",
54 fs_sink
->output_dir_path
->str
);
55 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
64 bt_self_component_status
configure_component(struct fs_sink_comp
*fs_sink
,
65 const bt_value
*params
)
67 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
68 const bt_value
*value
;
70 value
= bt_value_map_borrow_entry_value_const(params
, "path");
72 BT_LOGE_STR("Missing mandatory `path` parameter.");
73 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
77 if (!bt_value_is_string(value
)) {
78 BT_LOGE_STR("`path` parameter: expecting a string.");
79 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
83 g_string_assign(fs_sink
->output_dir_path
,
84 bt_value_string_get(value
));
85 value
= bt_value_map_borrow_entry_value_const(params
,
86 "assume-single-trace");
88 if (!bt_value_is_bool(value
)) {
89 BT_LOGE_STR("`assume-single-trace` parameter: expecting a boolean.");
90 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
94 fs_sink
->assume_single_trace
= (bool) bt_value_bool_get(value
);
97 value
= bt_value_map_borrow_entry_value_const(params
,
98 "ignore-discarded-events");
100 if (!bt_value_is_bool(value
)) {
101 BT_LOGE_STR("`ignore-discarded-events` parameter: expecting a boolean.");
102 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
106 fs_sink
->ignore_discarded_events
=
107 (bool) bt_value_bool_get(value
);
110 value
= bt_value_map_borrow_entry_value_const(params
,
111 "ignore-discarded-packets");
113 if (!bt_value_is_bool(value
)) {
114 BT_LOGE_STR("`ignore-discarded-packets` parameter: expecting a boolean.");
115 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
119 fs_sink
->ignore_discarded_packets
=
120 (bool) bt_value_bool_get(value
);
123 value
= bt_value_map_borrow_entry_value_const(params
,
126 if (!bt_value_is_bool(value
)) {
127 BT_LOGE_STR("`quiet` parameter: expecting a boolean.");
128 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
132 fs_sink
->quiet
= (bool) bt_value_bool_get(value
);
140 void destroy_fs_sink_comp(struct fs_sink_comp
*fs_sink
)
146 if (fs_sink
->output_dir_path
) {
147 g_string_free(fs_sink
->output_dir_path
, TRUE
);
148 fs_sink
->output_dir_path
= NULL
;
151 if (fs_sink
->traces
) {
152 g_hash_table_destroy(fs_sink
->traces
);
153 fs_sink
->traces
= NULL
;
156 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(
157 fs_sink
->upstream_iter
);
165 bt_self_component_status
ctf_fs_sink_init(
166 bt_self_component_sink
*self_comp
, const bt_value
*params
,
167 void *init_method_data
)
169 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
170 struct fs_sink_comp
*fs_sink
= NULL
;
172 fs_sink
= g_new0(struct fs_sink_comp
, 1);
174 BT_LOGE_STR("Failed to allocate one CTF FS sink structure.");
175 status
= BT_SELF_COMPONENT_STATUS_NOMEM
;
179 fs_sink
->output_dir_path
= g_string_new(NULL
);
180 fs_sink
->self_comp
= self_comp
;
181 status
= configure_component(fs_sink
, params
);
182 if (status
!= BT_SELF_COMPONENT_STATUS_OK
) {
183 /* configure_component() logs errors */
187 if (fs_sink
->assume_single_trace
&&
188 g_file_test(fs_sink
->output_dir_path
->str
,
189 G_FILE_TEST_EXISTS
)) {
190 BT_LOGE("Single trace mode, but output path exists: "
191 "output-path=\"%s\"", fs_sink
->output_dir_path
->str
);
192 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
196 status
= ensure_output_dir_exists(fs_sink
);
197 if (status
!= BT_SELF_COMPONENT_STATUS_OK
) {
198 /* ensure_output_dir_exists() logs errors */
202 fs_sink
->traces
= g_hash_table_new_full(g_direct_hash
, g_direct_equal
,
203 NULL
, (GDestroyNotify
) fs_sink_trace_destroy
);
204 if (!fs_sink
->traces
) {
205 BT_LOGE_STR("Failed to allocate one GHashTable.");
206 status
= BT_SELF_COMPONENT_STATUS_NOMEM
;
210 status
= bt_self_component_sink_add_input_port(self_comp
, in_port_name
,
212 if (status
!= BT_SELF_COMPONENT_STATUS_OK
) {
216 bt_self_component_set_data(
217 bt_self_component_sink_as_self_component(self_comp
), fs_sink
);
220 if (status
!= BT_SELF_COMPONENT_STATUS_OK
) {
221 destroy_fs_sink_comp(fs_sink
);
228 struct fs_sink_stream
*borrow_stream(struct fs_sink_comp
*fs_sink
,
229 const bt_stream
*ir_stream
)
231 const bt_trace
*ir_trace
= bt_stream_borrow_trace_const(ir_stream
);
232 struct fs_sink_trace
*trace
;
233 struct fs_sink_stream
*stream
= NULL
;
235 trace
= g_hash_table_lookup(fs_sink
->traces
, ir_trace
);
236 if (unlikely(!trace
)) {
237 if (fs_sink
->assume_single_trace
&&
238 g_hash_table_size(fs_sink
->traces
) > 0) {
239 BT_LOGE("Single trace mode, but getting more than one trace: "
240 "stream-name=\"%s\"",
241 bt_stream_get_name(ir_stream
));
245 trace
= fs_sink_trace_create(fs_sink
, ir_trace
);
251 stream
= g_hash_table_lookup(trace
->streams
, ir_stream
);
252 if (unlikely(!stream
)) {
253 stream
= fs_sink_stream_create(trace
, ir_stream
);
264 bt_self_component_status
handle_event_msg(struct fs_sink_comp
*fs_sink
,
265 const bt_message
*msg
)
268 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
269 const bt_event
*ir_event
= bt_message_event_borrow_event_const(msg
);
270 const bt_stream
*ir_stream
= bt_event_borrow_stream_const(ir_event
);
271 struct fs_sink_stream
*stream
;
272 struct fs_sink_ctf_event_class
*ec
= NULL
;
273 const bt_clock_snapshot
*cs
= NULL
;
275 stream
= borrow_stream(fs_sink
, ir_stream
);
276 if (unlikely(!stream
)) {
277 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
281 ret
= try_translate_event_class_trace_ir_to_ctf_ir(stream
->sc
,
282 bt_event_borrow_class_const(ir_event
), &ec
);
284 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
290 if (stream
->sc
->default_clock_class
) {
291 cs
= bt_message_event_borrow_default_clock_snapshot_const(
295 ret
= fs_sink_stream_write_event(stream
, cs
, ir_event
, ec
);
297 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
306 bt_self_component_status
handle_packet_beginning_msg(
307 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
310 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
311 const bt_packet
*ir_packet
=
312 bt_message_packet_beginning_borrow_packet_const(msg
);
313 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
314 struct fs_sink_stream
*stream
;
315 const bt_clock_snapshot
*cs
= NULL
;
317 stream
= borrow_stream(fs_sink
, ir_stream
);
318 if (unlikely(!stream
)) {
319 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
323 if (stream
->sc
->packets_have_ts_begin
) {
324 cs
= bt_message_packet_beginning_borrow_default_clock_snapshot_const(
329 if (stream
->discarded_events_state
.in_range
) {
330 uint64_t expected_cs
;
332 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
333 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
334 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
337 * Make sure that the current discarded events range's
338 * beginning time matches what's expected for CTF 1.8.
340 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
341 /* We're opening the first packet */
342 expected_cs
= bt_clock_snapshot_get_value(cs
);
344 expected_cs
= stream
->prev_packet_state
.end_cs
;
347 if (stream
->discarded_events_state
.beginning_cs
!=
349 BT_LOGE("Incompatible discarded events message: "
350 "unexpected beginning time: "
351 "beginning-cs-val=%" PRIu64
", "
352 "expected-beginning-cs-val=%" PRIu64
", "
353 "stream-id=%" PRIu64
", stream-name=\"%s\", "
354 "trace-name=\"%s\", path=\"%s/%s\"",
355 stream
->discarded_events_state
.beginning_cs
,
357 bt_stream_get_id(ir_stream
),
358 bt_stream_get_name(ir_stream
),
360 bt_stream_borrow_trace_const(ir_stream
)),
361 stream
->trace
->path
->str
, stream
->file_name
->str
);
362 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
367 if (stream
->discarded_packets_state
.in_range
) {
368 uint64_t expected_end_cs
;
370 BT_ASSERT(stream
->sc
->discarded_packets_has_ts
);
371 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
372 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
375 * Make sure that the current discarded packets range's
376 * beginning and end times match what's expected for CTF
379 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
380 BT_LOGE("Incompatible discarded packets message "
381 "occuring before the stream's first packet: "
382 "stream-id=%" PRIu64
", stream-name=\"%s\", "
383 "trace-name=\"%s\", path=\"%s/%s\"",
384 bt_stream_get_id(ir_stream
),
385 bt_stream_get_name(ir_stream
),
387 bt_stream_borrow_trace_const(ir_stream
)),
388 stream
->trace
->path
->str
, stream
->file_name
->str
);
389 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
393 if (stream
->discarded_packets_state
.beginning_cs
!=
394 stream
->prev_packet_state
.end_cs
) {
395 BT_LOGE("Incompatible discarded packets message: "
396 "unexpected beginning time: "
397 "beginning-cs-val=%" PRIu64
", "
398 "expected-beginning-cs-val=%" PRIu64
", "
399 "stream-id=%" PRIu64
", stream-name=\"%s\", "
400 "trace-name=\"%s\", path=\"%s/%s\"",
401 stream
->discarded_packets_state
.beginning_cs
,
402 stream
->prev_packet_state
.end_cs
,
403 bt_stream_get_id(ir_stream
),
404 bt_stream_get_name(ir_stream
),
406 bt_stream_borrow_trace_const(ir_stream
)),
407 stream
->trace
->path
->str
, stream
->file_name
->str
);
408 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
412 expected_end_cs
= bt_clock_snapshot_get_value(cs
);
414 if (stream
->discarded_packets_state
.end_cs
!=
416 BT_LOGE("Incompatible discarded packets message: "
417 "unexpected end time: "
418 "end-cs-val=%" PRIu64
", "
419 "expected-end-cs-val=%" PRIu64
", "
420 "stream-id=%" PRIu64
", stream-name=\"%s\", "
421 "trace-name=\"%s\", path=\"%s/%s\"",
422 stream
->discarded_packets_state
.end_cs
,
424 bt_stream_get_id(ir_stream
),
425 bt_stream_get_name(ir_stream
),
427 bt_stream_borrow_trace_const(ir_stream
)),
428 stream
->trace
->path
->str
, stream
->file_name
->str
);
429 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
434 if (stream
->sc
->discarded_packets_has_ts
) {
435 stream
->discarded_packets_state
.in_range
= false;
438 ret
= fs_sink_stream_open_packet(stream
, cs
, ir_packet
);
440 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
449 bt_self_component_status
handle_packet_end_msg(
450 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
453 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
454 const bt_packet
*ir_packet
=
455 bt_message_packet_end_borrow_packet_const(msg
);
456 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
457 struct fs_sink_stream
*stream
;
458 const bt_clock_snapshot
*cs
= NULL
;
460 stream
= borrow_stream(fs_sink
, ir_stream
);
461 if (unlikely(!stream
)) {
462 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
466 if (stream
->sc
->packets_have_ts_end
) {
467 cs
= bt_message_packet_end_borrow_default_clock_snapshot_const(
472 if (stream
->discarded_events_state
.in_range
) {
473 uint64_t expected_cs
;
475 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
476 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
477 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
480 * Make sure that the current discarded events range's
481 * end time matches what's expected for CTF 1.8.
483 expected_cs
= bt_clock_snapshot_get_value(cs
);
485 if (stream
->discarded_events_state
.end_cs
!= expected_cs
) {
486 BT_LOGE("Incompatible discarded events message: "
487 "unexpected end time: "
488 "end-cs-val=%" PRIu64
", "
489 "expected-end-cs-val=%" PRIu64
", "
490 "stream-id=%" PRIu64
", stream-name=\"%s\", "
491 "trace-name=\"%s\", path=\"%s/%s\"",
492 stream
->discarded_events_state
.end_cs
,
494 bt_stream_get_id(ir_stream
),
495 bt_stream_get_name(ir_stream
),
497 bt_stream_borrow_trace_const(ir_stream
)),
498 stream
->trace
->path
->str
, stream
->file_name
->str
);
499 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
504 ret
= fs_sink_stream_close_packet(stream
, cs
);
506 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
510 if (stream
->sc
->discarded_events_has_ts
) {
511 stream
->discarded_events_state
.in_range
= false;
519 bt_self_component_status
handle_stream_beginning_msg(
520 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
522 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
523 const bt_stream
*ir_stream
=
524 bt_message_stream_beginning_borrow_stream_const(msg
);
525 const bt_stream_class
*ir_sc
=
526 bt_stream_borrow_class_const(ir_stream
);
527 struct fs_sink_stream
*stream
;
528 bool packets_have_beginning_end_cs
=
529 bt_stream_class_packets_have_default_beginning_clock_snapshot(ir_sc
) &&
530 bt_stream_class_packets_have_default_end_clock_snapshot(ir_sc
);
533 * Not supported: discarded events with default clock snapshots,
534 * but packet beginning/end without default clock snapshot.
536 if (!fs_sink
->ignore_discarded_events
&&
537 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc
) &&
538 !packets_have_beginning_end_cs
) {
539 BT_LOGE("Unsupported stream: discarded events have "
540 "default clock snapshots, but packets have no "
541 "beginning and/or end default clock snapshots: "
543 "stream-id=%" PRIu64
", "
544 "stream-name=\"%s\"",
545 ir_stream
, bt_stream_get_id(ir_stream
),
546 bt_stream_get_name(ir_stream
));
547 status
= BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR
;
552 * Not supported: discarded packets with default clock
553 * snapshots, but packet beginning/end without default clock
556 if (!fs_sink
->ignore_discarded_packets
&&
557 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc
) &&
558 !packets_have_beginning_end_cs
) {
559 BT_LOGE("Unsupported stream: discarded packets have "
560 "default clock snapshots, but packets have no "
561 "beginning and/or end default clock snapshots: "
563 "stream-id=%" PRIu64
", "
564 "stream-name=\"%s\"",
565 ir_stream
, bt_stream_get_id(ir_stream
),
566 bt_stream_get_name(ir_stream
));
567 status
= BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR
;
571 stream
= borrow_stream(fs_sink
, ir_stream
);
573 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
577 BT_LOGI("Created new, empty stream file: "
578 "stream-id=%" PRIu64
", stream-name=\"%s\", "
579 "trace-name=\"%s\", path=\"%s/%s\"",
580 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
581 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
582 stream
->trace
->path
->str
, stream
->file_name
->str
);
589 bt_self_component_status
handle_stream_end_msg(struct fs_sink_comp
*fs_sink
,
590 const bt_message
*msg
)
592 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
593 const bt_stream
*ir_stream
=
594 bt_message_stream_end_borrow_stream_const(msg
);
595 struct fs_sink_stream
*stream
;
597 stream
= borrow_stream(fs_sink
, ir_stream
);
599 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
603 BT_LOGI("Closing stream file: "
604 "stream-id=%" PRIu64
", stream-name=\"%s\", "
605 "trace-name=\"%s\", path=\"%s/%s\"",
606 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
607 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
608 stream
->trace
->path
->str
, stream
->file_name
->str
);
611 * This destroys the stream object and frees all its resources,
612 * closing the stream file.
614 g_hash_table_remove(stream
->trace
->streams
, ir_stream
);
621 bt_self_component_status
handle_discarded_events_msg(
622 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
624 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
625 const bt_stream
*ir_stream
=
626 bt_message_discarded_events_borrow_stream_const(msg
);
627 struct fs_sink_stream
*stream
;
628 const bt_clock_snapshot
*cs
= NULL
;
629 bt_property_availability avail
;
632 stream
= borrow_stream(fs_sink
, ir_stream
);
634 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
638 if (fs_sink
->ignore_discarded_events
) {
639 BT_LOGI("Ignoring discarded events message: "
640 "stream-id=%" PRIu64
", stream-name=\"%s\", "
641 "trace-name=\"%s\", path=\"%s/%s\"",
642 bt_stream_get_id(ir_stream
),
643 bt_stream_get_name(ir_stream
),
645 bt_stream_borrow_trace_const(ir_stream
)),
646 stream
->trace
->path
->str
, stream
->file_name
->str
);
650 if (stream
->discarded_events_state
.in_range
) {
651 BT_LOGE("Unsupported contiguous discarded events message: "
652 "stream-id=%" PRIu64
", stream-name=\"%s\", "
653 "trace-name=\"%s\", path=\"%s/%s\"",
654 bt_stream_get_id(ir_stream
),
655 bt_stream_get_name(ir_stream
),
657 bt_stream_borrow_trace_const(ir_stream
)),
658 stream
->trace
->path
->str
, stream
->file_name
->str
);
659 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
663 if (stream
->packet_state
.is_open
&&
664 stream
->sc
->discarded_events_has_ts
) {
665 BT_LOGE("Unsupported discarded events message with "
666 "default clock snapshots occuring within a packet: "
667 "stream-id=%" PRIu64
", stream-name=\"%s\", "
668 "trace-name=\"%s\", path=\"%s/%s\"",
669 bt_stream_get_id(ir_stream
),
670 bt_stream_get_name(ir_stream
),
672 bt_stream_borrow_trace_const(ir_stream
)),
673 stream
->trace
->path
->str
, stream
->file_name
->str
);
674 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
678 if (stream
->sc
->discarded_events_has_ts
) {
679 stream
->discarded_events_state
.in_range
= true;
682 * The clock snapshot values will be validated when
683 * handling the next "packet beginning" message.
685 cs
= bt_message_discarded_events_borrow_default_beginning_clock_snapshot_const(
688 stream
->discarded_events_state
.beginning_cs
=
689 bt_clock_snapshot_get_value(cs
);
690 cs
= bt_message_discarded_events_borrow_default_end_clock_snapshot_const(
693 stream
->discarded_events_state
.end_cs
= bt_clock_snapshot_get_value(cs
);
696 avail
= bt_message_discarded_events_get_count(msg
, &count
);
697 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
701 stream
->packet_state
.discarded_events_counter
+= count
;
708 bt_self_component_status
handle_discarded_packets_msg(
709 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
711 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
712 const bt_stream
*ir_stream
=
713 bt_message_discarded_packets_borrow_stream_const(msg
);
714 struct fs_sink_stream
*stream
;
715 const bt_clock_snapshot
*cs
= NULL
;
716 bt_property_availability avail
;
719 stream
= borrow_stream(fs_sink
, ir_stream
);
721 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
725 if (fs_sink
->ignore_discarded_packets
) {
726 BT_LOGI("Ignoring discarded packets message: "
727 "stream-id=%" PRIu64
", stream-name=\"%s\", "
728 "trace-name=\"%s\", path=\"%s/%s\"",
729 bt_stream_get_id(ir_stream
),
730 bt_stream_get_name(ir_stream
),
732 bt_stream_borrow_trace_const(ir_stream
)),
733 stream
->trace
->path
->str
, stream
->file_name
->str
);
737 if (stream
->discarded_packets_state
.in_range
) {
738 BT_LOGE("Unsupported contiguous discarded packets message: "
739 "stream-id=%" PRIu64
", stream-name=\"%s\", "
740 "trace-name=\"%s\", path=\"%s/%s\"",
741 bt_stream_get_id(ir_stream
),
742 bt_stream_get_name(ir_stream
),
744 bt_stream_borrow_trace_const(ir_stream
)),
745 stream
->trace
->path
->str
, stream
->file_name
->str
);
746 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
750 BT_ASSERT(!stream
->packet_state
.is_open
);
752 if (stream
->sc
->discarded_packets_has_ts
) {
753 stream
->discarded_packets_state
.in_range
= true;
756 * The clock snapshot values will be validated when
757 * handling the next "packet beginning" message.
759 cs
= bt_message_discarded_packets_borrow_default_beginning_clock_snapshot_const(
762 stream
->discarded_packets_state
.beginning_cs
=
763 bt_clock_snapshot_get_value(cs
);
764 cs
= bt_message_discarded_packets_borrow_default_end_clock_snapshot_const(
767 stream
->discarded_packets_state
.end_cs
=
768 bt_clock_snapshot_get_value(cs
);
771 avail
= bt_message_discarded_packets_get_count(msg
, &count
);
772 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
776 stream
->packet_state
.seq_num
+= count
;
783 void put_messages(bt_message_array_const msgs
, uint64_t count
)
787 for (i
= 0; i
< count
; i
++) {
788 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
793 bt_self_component_status
ctf_fs_sink_consume(bt_self_component_sink
*self_comp
)
795 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
796 struct fs_sink_comp
*fs_sink
;
797 bt_message_iterator_status it_status
;
798 uint64_t msg_count
= 0;
799 bt_message_array_const msgs
;
801 fs_sink
= bt_self_component_get_data(
802 bt_self_component_sink_as_self_component(self_comp
));
804 BT_ASSERT(fs_sink
->upstream_iter
);
806 /* Consume messages */
807 it_status
= bt_self_component_port_input_message_iterator_next(
808 fs_sink
->upstream_iter
, &msgs
, &msg_count
);
810 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
815 case BT_MESSAGE_ITERATOR_STATUS_OK
:
819 for (i
= 0; i
< msg_count
; i
++) {
820 const bt_message
*msg
= msgs
[i
];
824 switch (bt_message_get_type(msg
)) {
825 case BT_MESSAGE_TYPE_EVENT
:
826 status
= handle_event_msg(fs_sink
, msg
);
828 case BT_MESSAGE_TYPE_PACKET_BEGINNING
:
829 status
= handle_packet_beginning_msg(
832 case BT_MESSAGE_TYPE_PACKET_END
:
833 status
= handle_packet_end_msg(
836 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY
:
838 BT_LOGD_STR("Ignoring message iterator inactivity message.");
840 case BT_MESSAGE_TYPE_STREAM_BEGINNING
:
841 status
= handle_stream_beginning_msg(
844 case BT_MESSAGE_TYPE_STREAM_END
:
845 status
= handle_stream_end_msg(
848 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING
:
849 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END
:
850 /* Not supported by CTF 1.8 */
851 BT_LOGD_STR("Ignoring stream activity message.");
853 case BT_MESSAGE_TYPE_DISCARDED_EVENTS
:
854 status
= handle_discarded_events_msg(
857 case BT_MESSAGE_TYPE_DISCARDED_PACKETS
:
858 status
= handle_discarded_packets_msg(
865 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
867 if (status
!= BT_SELF_COMPONENT_STATUS_OK
) {
868 BT_LOGE("Failed to handle message: "
869 "generated CTF traces could be incomplete: "
870 "output-dir-path=\"%s\"",
871 fs_sink
->output_dir_path
->str
);
878 case BT_MESSAGE_ITERATOR_STATUS_AGAIN
:
879 status
= BT_SELF_COMPONENT_STATUS_AGAIN
;
881 case BT_MESSAGE_ITERATOR_STATUS_END
:
882 /* TODO: Finalize all traces (should already be done?) */
883 status
= BT_SELF_COMPONENT_STATUS_END
;
885 case BT_MESSAGE_ITERATOR_STATUS_NOMEM
:
886 status
= BT_SELF_COMPONENT_STATUS_NOMEM
;
888 case BT_MESSAGE_ITERATOR_STATUS_ERROR
:
889 status
= BT_SELF_COMPONENT_STATUS_NOMEM
;
898 BT_ASSERT(status
!= BT_SELF_COMPONENT_STATUS_OK
);
899 put_messages(msgs
, msg_count
);
906 bt_self_component_status
ctf_fs_sink_graph_is_configured(
907 bt_self_component_sink
*self_comp
)
909 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
910 struct fs_sink_comp
*fs_sink
= bt_self_component_get_data(
911 bt_self_component_sink_as_self_component(self_comp
));
913 fs_sink
->upstream_iter
=
914 bt_self_component_port_input_message_iterator_create(
915 bt_self_component_sink_borrow_input_port_by_name(
916 self_comp
, in_port_name
));
917 if (!fs_sink
->upstream_iter
) {
918 status
= BT_SELF_COMPONENT_STATUS_NOMEM
;
927 void ctf_fs_sink_finalize(bt_self_component_sink
*self_comp
)
929 struct fs_sink_comp
*fs_sink
= bt_self_component_get_data(
930 bt_self_component_sink_as_self_component(self_comp
));
932 destroy_fs_sink_comp(fs_sink
);