2 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23 #define BT_LOG_TAG "PLUGIN-CTF-FS-SINK"
26 #include <babeltrace/babeltrace.h>
30 #include <babeltrace/assert-internal.h>
31 #include <babeltrace/ctfser-internal.h>
34 #include "fs-sink-trace.h"
35 #include "fs-sink-stream.h"
36 #include "fs-sink-ctf-meta.h"
37 #include "translate-trace-ir-to-ctf-ir.h"
38 #include "translate-ctf-ir-to-tsdl.h"
41 const char * const in_port_name
= "in";
44 bt_self_component_status
ensure_output_dir_exists(
45 struct fs_sink_comp
*fs_sink
)
47 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
50 ret
= g_mkdir_with_parents(fs_sink
->output_dir_path
->str
, 0755);
52 BT_LOGE_ERRNO("Cannot create directories for output directory",
53 ": output-dir-path=\"%s\"",
54 fs_sink
->output_dir_path
->str
);
55 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
64 bt_self_component_status
configure_component(struct fs_sink_comp
*fs_sink
,
65 const bt_value
*params
)
67 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
68 const bt_value
*value
;
70 value
= bt_value_map_borrow_entry_value_const(params
, "path");
72 BT_LOGE_STR("Missing mandatory `path` parameter.");
73 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
77 if (!bt_value_is_string(value
)) {
78 BT_LOGE_STR("`path` parameter: expecting a string.");
79 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
83 g_string_assign(fs_sink
->output_dir_path
,
84 bt_value_string_get(value
));
85 value
= bt_value_map_borrow_entry_value_const(params
,
86 "assume-single-trace");
88 if (!bt_value_is_bool(value
)) {
89 BT_LOGE_STR("`assume-single-trace` parameter: expecting a boolean.");
90 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
94 fs_sink
->assume_single_trace
= (bool) bt_value_bool_get(value
);
97 value
= bt_value_map_borrow_entry_value_const(params
,
98 "ignore-discarded-events");
100 if (!bt_value_is_bool(value
)) {
101 BT_LOGE_STR("`ignore-discarded-events` parameter: expecting a boolean.");
102 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
106 fs_sink
->ignore_discarded_events
=
107 (bool) bt_value_bool_get(value
);
110 value
= bt_value_map_borrow_entry_value_const(params
,
111 "ignore-discarded-packets");
113 if (!bt_value_is_bool(value
)) {
114 BT_LOGE_STR("`ignore-discarded-packets` parameter: expecting a boolean.");
115 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
119 fs_sink
->ignore_discarded_packets
=
120 (bool) bt_value_bool_get(value
);
123 value
= bt_value_map_borrow_entry_value_const(params
,
126 if (!bt_value_is_bool(value
)) {
127 BT_LOGE_STR("`quiet` parameter: expecting a boolean.");
128 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
132 fs_sink
->quiet
= (bool) bt_value_bool_get(value
);
140 void destroy_fs_sink_comp(struct fs_sink_comp
*fs_sink
)
146 if (fs_sink
->output_dir_path
) {
147 g_string_free(fs_sink
->output_dir_path
, TRUE
);
148 fs_sink
->output_dir_path
= NULL
;
151 if (fs_sink
->traces
) {
152 g_hash_table_destroy(fs_sink
->traces
);
153 fs_sink
->traces
= NULL
;
156 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(
157 fs_sink
->upstream_iter
);
165 bt_self_component_status
ctf_fs_sink_init(
166 bt_self_component_sink
*self_comp
, const bt_value
*params
,
167 void *init_method_data
)
169 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
170 struct fs_sink_comp
*fs_sink
= NULL
;
172 fs_sink
= g_new0(struct fs_sink_comp
, 1);
174 BT_LOGE_STR("Failed to allocate one CTF FS sink structure.");
175 status
= BT_SELF_COMPONENT_STATUS_NOMEM
;
179 fs_sink
->output_dir_path
= g_string_new(NULL
);
180 fs_sink
->self_comp
= self_comp
;
181 status
= configure_component(fs_sink
, params
);
182 if (status
!= BT_SELF_COMPONENT_STATUS_OK
) {
183 /* configure_component() logs errors */
187 if (fs_sink
->assume_single_trace
&&
188 g_file_test(fs_sink
->output_dir_path
->str
,
189 G_FILE_TEST_EXISTS
)) {
190 BT_LOGE("Single trace mode, but output path exists: "
191 "output-path=\"%s\"", fs_sink
->output_dir_path
->str
);
192 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
196 status
= ensure_output_dir_exists(fs_sink
);
197 if (status
!= BT_SELF_COMPONENT_STATUS_OK
) {
198 /* ensure_output_dir_exists() logs errors */
202 fs_sink
->traces
= g_hash_table_new_full(g_direct_hash
, g_direct_equal
,
203 NULL
, (GDestroyNotify
) fs_sink_trace_destroy
);
204 if (!fs_sink
->traces
) {
205 BT_LOGE_STR("Failed to allocate one GHashTable.");
206 status
= BT_SELF_COMPONENT_STATUS_NOMEM
;
210 status
= bt_self_component_sink_add_input_port(self_comp
, in_port_name
,
212 if (status
!= BT_SELF_COMPONENT_STATUS_OK
) {
216 bt_self_component_set_data(
217 bt_self_component_sink_as_self_component(self_comp
), fs_sink
);
220 if (status
!= BT_SELF_COMPONENT_STATUS_OK
) {
221 destroy_fs_sink_comp(fs_sink
);
228 struct fs_sink_stream
*borrow_stream(struct fs_sink_comp
*fs_sink
,
229 const bt_stream
*ir_stream
)
231 const bt_trace
*ir_trace
= bt_stream_borrow_trace_const(ir_stream
);
232 struct fs_sink_trace
*trace
;
233 struct fs_sink_stream
*stream
= NULL
;
235 trace
= g_hash_table_lookup(fs_sink
->traces
, ir_trace
);
236 if (unlikely(!trace
)) {
237 if (fs_sink
->assume_single_trace
&&
238 g_hash_table_size(fs_sink
->traces
) > 0) {
239 BT_LOGE("Single trace mode, but getting more than one trace: "
240 "stream-name=\"%s\"",
241 bt_stream_get_name(ir_stream
));
245 trace
= fs_sink_trace_create(fs_sink
, ir_trace
);
251 stream
= g_hash_table_lookup(trace
->streams
, ir_stream
);
252 if (unlikely(!stream
)) {
253 stream
= fs_sink_stream_create(trace
, ir_stream
);
264 bt_self_component_status
handle_event_msg(struct fs_sink_comp
*fs_sink
,
265 const bt_message
*msg
)
268 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
269 const bt_event
*ir_event
= bt_message_event_borrow_event_const(msg
);
270 const bt_stream
*ir_stream
= bt_event_borrow_stream_const(ir_event
);
271 struct fs_sink_stream
*stream
;
272 struct fs_sink_ctf_event_class
*ec
= NULL
;
273 const bt_clock_snapshot
*cs
= NULL
;
275 stream
= borrow_stream(fs_sink
, ir_stream
);
276 if (unlikely(!stream
)) {
277 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
281 ret
= try_translate_event_class_trace_ir_to_ctf_ir(stream
->sc
,
282 bt_event_borrow_class_const(ir_event
), &ec
);
284 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
290 if (stream
->sc
->default_clock_class
) {
291 (void) bt_message_event_borrow_default_clock_snapshot_const(
295 ret
= fs_sink_stream_write_event(stream
, cs
, ir_event
, ec
);
297 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
306 bt_self_component_status
handle_packet_beginning_msg(
307 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
310 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
311 const bt_packet
*ir_packet
=
312 bt_message_packet_beginning_borrow_packet_const(msg
);
313 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
314 struct fs_sink_stream
*stream
;
315 const bt_clock_snapshot
*cs
= NULL
;
317 stream
= borrow_stream(fs_sink
, ir_stream
);
318 if (unlikely(!stream
)) {
319 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
323 if (stream
->sc
->default_clock_class
) {
324 (void) bt_message_packet_beginning_borrow_default_clock_snapshot_const(
329 if (stream
->discarded_events_state
.in_range
) {
331 * Make sure that the current discarded events range's
332 * beginning time matches what's expected for CTF 1.8.
334 if (stream
->sc
->default_clock_class
) {
335 uint64_t expected_cs
;
337 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
338 /* We're opening the first packet */
339 expected_cs
= bt_clock_snapshot_get_value(cs
);
341 expected_cs
= stream
->prev_packet_state
.end_cs
;
344 if (stream
->discarded_events_state
.beginning_cs
!=
346 BT_LOGE("Incompatible discarded events message: "
347 "unexpected beginning time: "
348 "beginning-cs-val=%" PRIu64
", "
349 "expected-beginning-cs-val=%" PRIu64
", "
350 "stream-id=%" PRIu64
", stream-name=\"%s\", "
351 "trace-name=\"%s\", path=\"%s/%s\"",
352 stream
->discarded_events_state
.beginning_cs
,
354 bt_stream_get_id(ir_stream
),
355 bt_stream_get_name(ir_stream
),
357 bt_stream_borrow_trace_const(ir_stream
)),
358 stream
->trace
->path
->str
, stream
->file_name
->str
);
359 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
366 if (stream
->discarded_packets_state
.in_range
) {
367 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
368 BT_LOGE("Incompatible discarded packets message "
369 "occuring before the stream's first packet: "
370 "stream-id=%" PRIu64
", stream-name=\"%s\", "
371 "trace-name=\"%s\", path=\"%s/%s\"",
372 bt_stream_get_id(ir_stream
),
373 bt_stream_get_name(ir_stream
),
375 bt_stream_borrow_trace_const(ir_stream
)),
376 stream
->trace
->path
->str
, stream
->file_name
->str
);
377 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
382 * Make sure that the current discarded packets range's
383 * beginning and end times match what's expected for CTF
386 if (stream
->sc
->default_clock_class
) {
387 uint64_t expected_end_cs
=
388 bt_clock_snapshot_get_value(cs
);
390 if (stream
->discarded_packets_state
.beginning_cs
!=
391 stream
->prev_packet_state
.end_cs
) {
392 BT_LOGE("Incompatible discarded packets message: "
393 "unexpected beginning time: "
394 "beginning-cs-val=%" PRIu64
", "
395 "expected-beginning-cs-val=%" PRIu64
", "
396 "stream-id=%" PRIu64
", stream-name=\"%s\", "
397 "trace-name=\"%s\", path=\"%s/%s\"",
398 stream
->discarded_packets_state
.beginning_cs
,
399 stream
->prev_packet_state
.end_cs
,
400 bt_stream_get_id(ir_stream
),
401 bt_stream_get_name(ir_stream
),
403 bt_stream_borrow_trace_const(ir_stream
)),
404 stream
->trace
->path
->str
, stream
->file_name
->str
);
405 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
409 if (stream
->discarded_packets_state
.end_cs
!=
411 BT_LOGE("Incompatible discarded packets message: "
412 "unexpected end time: "
413 "end-cs-val=%" PRIu64
", "
414 "expected-end-cs-val=%" PRIu64
", "
415 "stream-id=%" PRIu64
", stream-name=\"%s\", "
416 "trace-name=\"%s\", path=\"%s/%s\"",
417 stream
->discarded_packets_state
.beginning_cs
,
419 bt_stream_get_id(ir_stream
),
420 bt_stream_get_name(ir_stream
),
422 bt_stream_borrow_trace_const(ir_stream
)),
423 stream
->trace
->path
->str
, stream
->file_name
->str
);
424 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
430 stream
->discarded_packets_state
.in_range
= false;
431 ret
= fs_sink_stream_open_packet(stream
, cs
, ir_packet
);
433 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
442 bt_self_component_status
handle_packet_end_msg(
443 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
446 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
447 const bt_packet
*ir_packet
=
448 bt_message_packet_end_borrow_packet_const(msg
);
449 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
450 struct fs_sink_stream
*stream
;
451 const bt_clock_snapshot
*cs
= NULL
;
453 stream
= borrow_stream(fs_sink
, ir_stream
);
454 if (unlikely(!stream
)) {
455 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
459 if (stream
->sc
->default_clock_class
) {
460 (void) bt_message_packet_end_borrow_default_clock_snapshot_const(
465 if (stream
->sc
->default_clock_class
) {
466 (void) bt_message_packet_end_borrow_default_clock_snapshot_const(
471 if (stream
->discarded_events_state
.in_range
) {
473 * Make sure that the current discarded events range's
474 * end time matches what's expected for CTF 1.8.
476 if (stream
->sc
->default_clock_class
) {
477 uint64_t expected_cs
= bt_clock_snapshot_get_value(cs
);
479 if (stream
->discarded_events_state
.end_cs
!=
481 BT_LOGE("Incompatible discarded events message: "
482 "unexpected end time: "
483 "end-cs-val=%" PRIu64
", "
484 "expected-end-cs-val=%" PRIu64
", "
485 "stream-id=%" PRIu64
", stream-name=\"%s\", "
486 "trace-name=\"%s\", path=\"%s/%s\"",
487 stream
->discarded_events_state
.end_cs
,
489 bt_stream_get_id(ir_stream
),
490 bt_stream_get_name(ir_stream
),
492 bt_stream_borrow_trace_const(ir_stream
)),
493 stream
->trace
->path
->str
, stream
->file_name
->str
);
494 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
500 ret
= fs_sink_stream_close_packet(stream
, cs
);
502 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
506 stream
->discarded_events_state
.in_range
= false;
513 bt_self_component_status
handle_stream_beginning_msg(
514 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
516 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
517 const bt_stream
*ir_stream
=
518 bt_message_stream_beginning_borrow_stream_const(msg
);
519 struct fs_sink_stream
*stream
;
521 stream
= borrow_stream(fs_sink
, ir_stream
);
523 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
527 BT_LOGI("Created new, empty stream file: "
528 "stream-id=%" PRIu64
", stream-name=\"%s\", "
529 "trace-name=\"%s\", path=\"%s/%s\"",
530 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
531 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
532 stream
->trace
->path
->str
, stream
->file_name
->str
);
539 bt_self_component_status
handle_stream_end_msg(struct fs_sink_comp
*fs_sink
,
540 const bt_message
*msg
)
542 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
543 const bt_stream
*ir_stream
=
544 bt_message_stream_end_borrow_stream_const(msg
);
545 struct fs_sink_stream
*stream
;
547 stream
= borrow_stream(fs_sink
, ir_stream
);
549 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
553 BT_LOGI("Closing stream file: "
554 "stream-id=%" PRIu64
", stream-name=\"%s\", "
555 "trace-name=\"%s\", path=\"%s/%s\"",
556 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
557 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
558 stream
->trace
->path
->str
, stream
->file_name
->str
);
561 * This destroys the stream object and frees all its resources,
562 * closing the stream file.
564 g_hash_table_remove(stream
->trace
->streams
, ir_stream
);
571 bt_self_component_status
handle_discarded_events_msg(
572 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
574 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
575 const bt_stream
*ir_stream
=
576 bt_message_discarded_events_borrow_stream_const(msg
);
577 struct fs_sink_stream
*stream
;
578 const bt_clock_snapshot
*cs
= NULL
;
579 bt_property_availability avail
;
582 stream
= borrow_stream(fs_sink
, ir_stream
);
584 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
588 if (fs_sink
->ignore_discarded_events
) {
589 BT_LOGI("Ignoring discarded events message: "
590 "stream-id=%" PRIu64
", stream-name=\"%s\", "
591 "trace-name=\"%s\", path=\"%s/%s\"",
592 bt_stream_get_id(ir_stream
),
593 bt_stream_get_name(ir_stream
),
595 bt_stream_borrow_trace_const(ir_stream
)),
596 stream
->trace
->path
->str
, stream
->file_name
->str
);
600 if (stream
->discarded_events_state
.in_range
) {
601 BT_LOGE("Unsupported contiguous discarded events message: "
602 "stream-id=%" PRIu64
", stream-name=\"%s\", "
603 "trace-name=\"%s\", path=\"%s/%s\"",
604 bt_stream_get_id(ir_stream
),
605 bt_stream_get_name(ir_stream
),
607 bt_stream_borrow_trace_const(ir_stream
)),
608 stream
->trace
->path
->str
, stream
->file_name
->str
);
609 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
613 if (stream
->packet_state
.is_open
) {
614 BT_LOGE("Unsupported discarded events message occuring "
616 "stream-id=%" PRIu64
", stream-name=\"%s\", "
617 "trace-name=\"%s\", path=\"%s/%s\"",
618 bt_stream_get_id(ir_stream
),
619 bt_stream_get_name(ir_stream
),
621 bt_stream_borrow_trace_const(ir_stream
)),
622 stream
->trace
->path
->str
, stream
->file_name
->str
);
623 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
627 stream
->discarded_events_state
.in_range
= true;
629 if (stream
->sc
->default_clock_class
) {
631 * The clock snapshot values will be validated when
632 * handling the next "packet beginning" message.
634 (void) bt_message_discarded_events_borrow_default_beginning_clock_snapshot_const(
637 stream
->discarded_events_state
.beginning_cs
=
638 bt_clock_snapshot_get_value(cs
);
640 (void) bt_message_discarded_events_borrow_default_end_clock_snapshot_const(
643 stream
->discarded_events_state
.end_cs
=
644 bt_clock_snapshot_get_value(cs
);
646 stream
->discarded_events_state
.beginning_cs
= UINT64_C(-1);
647 stream
->discarded_events_state
.end_cs
= UINT64_C(-1);
650 avail
= bt_message_discarded_events_get_count(msg
, &count
);
651 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
655 stream
->packet_state
.discarded_events_counter
+= count
;
662 bt_self_component_status
handle_discarded_packets_msg(
663 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
665 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
666 const bt_stream
*ir_stream
=
667 bt_message_discarded_packets_borrow_stream_const(msg
);
668 struct fs_sink_stream
*stream
;
669 const bt_clock_snapshot
*cs
= NULL
;
670 bt_property_availability avail
;
673 stream
= borrow_stream(fs_sink
, ir_stream
);
675 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
679 if (fs_sink
->ignore_discarded_packets
) {
680 BT_LOGI("Ignoring discarded packets message: "
681 "stream-id=%" PRIu64
", stream-name=\"%s\", "
682 "trace-name=\"%s\", path=\"%s/%s\"",
683 bt_stream_get_id(ir_stream
),
684 bt_stream_get_name(ir_stream
),
686 bt_stream_borrow_trace_const(ir_stream
)),
687 stream
->trace
->path
->str
, stream
->file_name
->str
);
691 if (stream
->discarded_packets_state
.in_range
) {
692 BT_LOGE("Unsupported contiguous discarded packets message: "
693 "stream-id=%" PRIu64
", stream-name=\"%s\", "
694 "trace-name=\"%s\", path=\"%s/%s\"",
695 bt_stream_get_id(ir_stream
),
696 bt_stream_get_name(ir_stream
),
698 bt_stream_borrow_trace_const(ir_stream
)),
699 stream
->trace
->path
->str
, stream
->file_name
->str
);
700 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
704 if (stream
->packet_state
.is_open
) {
705 BT_LOGE("Unsupported discarded packets message occuring "
707 "stream-id=%" PRIu64
", stream-name=\"%s\", "
708 "trace-name=\"%s\", path=\"%s/%s\"",
709 bt_stream_get_id(ir_stream
),
710 bt_stream_get_name(ir_stream
),
712 bt_stream_borrow_trace_const(ir_stream
)),
713 stream
->trace
->path
->str
, stream
->file_name
->str
);
714 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
718 stream
->discarded_packets_state
.in_range
= true;
720 if (stream
->sc
->default_clock_class
) {
722 * The clock snapshot values will be validated when
723 * handling the next "packet beginning" message.
725 (void) bt_message_discarded_packets_borrow_default_beginning_clock_snapshot_const(
728 stream
->discarded_packets_state
.beginning_cs
=
729 bt_clock_snapshot_get_value(cs
);
731 (void) bt_message_discarded_packets_borrow_default_end_clock_snapshot_const(
734 stream
->discarded_packets_state
.end_cs
=
735 bt_clock_snapshot_get_value(cs
);
737 stream
->discarded_packets_state
.beginning_cs
= UINT64_C(-1);
738 stream
->discarded_packets_state
.end_cs
= UINT64_C(-1);
741 avail
= bt_message_discarded_packets_get_count(msg
, &count
);
742 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
746 stream
->packet_state
.seq_num
+= count
;
753 void put_messages(bt_message_array_const msgs
, uint64_t count
)
757 for (i
= 0; i
< count
; i
++) {
758 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
763 bt_self_component_status
ctf_fs_sink_consume(bt_self_component_sink
*self_comp
)
765 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
766 struct fs_sink_comp
*fs_sink
;
767 bt_message_iterator_status it_status
;
768 uint64_t msg_count
= 0;
769 bt_message_array_const msgs
;
771 fs_sink
= bt_self_component_get_data(
772 bt_self_component_sink_as_self_component(self_comp
));
774 BT_ASSERT(fs_sink
->upstream_iter
);
776 /* Consume messages */
777 it_status
= bt_self_component_port_input_message_iterator_next(
778 fs_sink
->upstream_iter
, &msgs
, &msg_count
);
780 status
= BT_SELF_COMPONENT_STATUS_ERROR
;
785 case BT_MESSAGE_ITERATOR_STATUS_OK
:
789 for (i
= 0; i
< msg_count
; i
++) {
790 const bt_message
*msg
= msgs
[i
];
794 switch (bt_message_get_type(msg
)) {
795 case BT_MESSAGE_TYPE_EVENT
:
796 status
= handle_event_msg(fs_sink
, msg
);
798 case BT_MESSAGE_TYPE_PACKET_BEGINNING
:
799 status
= handle_packet_beginning_msg(
802 case BT_MESSAGE_TYPE_PACKET_END
:
803 status
= handle_packet_end_msg(
806 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY
:
808 BT_LOGD_STR("Ignoring message iterator inactivity message.");
810 case BT_MESSAGE_TYPE_STREAM_BEGINNING
:
811 status
= handle_stream_beginning_msg(
814 case BT_MESSAGE_TYPE_STREAM_END
:
815 status
= handle_stream_end_msg(
818 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING
:
819 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END
:
820 /* Not supported by CTF 1.8 */
821 BT_LOGD_STR("Ignoring stream activity message.");
823 case BT_MESSAGE_TYPE_DISCARDED_EVENTS
:
824 status
= handle_discarded_events_msg(
827 case BT_MESSAGE_TYPE_DISCARDED_PACKETS
:
828 status
= handle_discarded_packets_msg(
835 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
837 if (status
!= BT_SELF_COMPONENT_STATUS_OK
) {
838 BT_LOGE("Failed to handle message: "
839 "generated CTF traces could be incomplete: "
840 "output-dir-path=\"%s\"",
841 fs_sink
->output_dir_path
->str
);
848 case BT_MESSAGE_ITERATOR_STATUS_AGAIN
:
849 status
= BT_SELF_COMPONENT_STATUS_AGAIN
;
851 case BT_MESSAGE_ITERATOR_STATUS_END
:
852 /* TODO: Finalize all traces (should already be done?) */
853 status
= BT_SELF_COMPONENT_STATUS_END
;
855 case BT_MESSAGE_ITERATOR_STATUS_NOMEM
:
856 status
= BT_SELF_COMPONENT_STATUS_NOMEM
;
858 case BT_MESSAGE_ITERATOR_STATUS_ERROR
:
859 status
= BT_SELF_COMPONENT_STATUS_NOMEM
;
868 BT_ASSERT(status
!= BT_SELF_COMPONENT_STATUS_OK
);
869 put_messages(msgs
, msg_count
);
876 bt_self_component_status
ctf_fs_sink_graph_is_configured(
877 bt_self_component_sink
*self_comp
)
879 bt_self_component_status status
= BT_SELF_COMPONENT_STATUS_OK
;
880 struct fs_sink_comp
*fs_sink
= bt_self_component_get_data(
881 bt_self_component_sink_as_self_component(self_comp
));
883 fs_sink
->upstream_iter
=
884 bt_self_component_port_input_message_iterator_create(
885 bt_self_component_sink_borrow_input_port_by_name(
886 self_comp
, in_port_name
));
887 if (!fs_sink
->upstream_iter
) {
888 status
= BT_SELF_COMPONENT_STATUS_NOMEM
;
897 void ctf_fs_sink_finalize(bt_self_component_sink
*self_comp
)
899 struct fs_sink_comp
*fs_sink
= bt_self_component_get_data(
900 bt_self_component_sink_as_self_component(self_comp
));
902 destroy_fs_sink_comp(fs_sink
);