bt2: always use staticmethod() with native function class attributes
[babeltrace.git] / plugins / ctf / fs-sink / fs-sink.c
CommitLineData
15fe47e0
PP
1/*
2 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 * SOFTWARE.
21 */
22
23#define BT_LOG_TAG "PLUGIN-CTF-FS-SINK"
24#include "logging.h"
25
26#include <babeltrace/babeltrace.h>
27#include <stdio.h>
28#include <stdbool.h>
29#include <glib.h>
30#include <babeltrace/assert-internal.h>
31#include <babeltrace/ctfser-internal.h>
32
33#include "fs-sink.h"
34#include "fs-sink-trace.h"
35#include "fs-sink-stream.h"
36#include "fs-sink-ctf-meta.h"
37#include "translate-trace-ir-to-ctf-ir.h"
38#include "translate-ctf-ir-to-tsdl.h"
39
40static
41const char * const in_port_name = "in";
42
43static
44bt_self_component_status ensure_output_dir_exists(
45 struct fs_sink_comp *fs_sink)
46{
47 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
48 int ret;
49
50 ret = g_mkdir_with_parents(fs_sink->output_dir_path->str, 0755);
51 if (ret) {
52 BT_LOGE_ERRNO("Cannot create directories for output directory",
53 ": output-dir-path=\"%s\"",
54 fs_sink->output_dir_path->str);
55 status = BT_SELF_COMPONENT_STATUS_ERROR;
56 goto end;
57 }
58
59end:
60 return status;
61}
62
63static
64bt_self_component_status configure_component(struct fs_sink_comp *fs_sink,
65 const bt_value *params)
66{
67 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
68 const bt_value *value;
69
70 value = bt_value_map_borrow_entry_value_const(params, "path");
71 if (!value) {
72 BT_LOGE_STR("Missing mandatory `path` parameter.");
73 status = BT_SELF_COMPONENT_STATUS_ERROR;
74 goto end;
75 }
76
77 if (!bt_value_is_string(value)) {
78 BT_LOGE_STR("`path` parameter: expecting a string.");
79 status = BT_SELF_COMPONENT_STATUS_ERROR;
80 goto end;
81 }
82
83 g_string_assign(fs_sink->output_dir_path,
84 bt_value_string_get(value));
85 value = bt_value_map_borrow_entry_value_const(params,
86 "assume-single-trace");
87 if (value) {
88 if (!bt_value_is_bool(value)) {
89 BT_LOGE_STR("`assume-single-trace` parameter: expecting a boolean.");
90 status = BT_SELF_COMPONENT_STATUS_ERROR;
91 goto end;
92 }
93
94 fs_sink->assume_single_trace = (bool) bt_value_bool_get(value);
95 }
96
97 value = bt_value_map_borrow_entry_value_const(params,
98 "ignore-discarded-events");
99 if (value) {
100 if (!bt_value_is_bool(value)) {
101 BT_LOGE_STR("`ignore-discarded-events` parameter: expecting a boolean.");
102 status = BT_SELF_COMPONENT_STATUS_ERROR;
103 goto end;
104 }
105
106 fs_sink->ignore_discarded_events =
107 (bool) bt_value_bool_get(value);
108 }
109
110 value = bt_value_map_borrow_entry_value_const(params,
111 "ignore-discarded-packets");
112 if (value) {
113 if (!bt_value_is_bool(value)) {
114 BT_LOGE_STR("`ignore-discarded-packets` parameter: expecting a boolean.");
115 status = BT_SELF_COMPONENT_STATUS_ERROR;
116 goto end;
117 }
118
119 fs_sink->ignore_discarded_packets =
120 (bool) bt_value_bool_get(value);
121 }
122
123 value = bt_value_map_borrow_entry_value_const(params,
124 "quiet");
125 if (value) {
126 if (!bt_value_is_bool(value)) {
127 BT_LOGE_STR("`quiet` parameter: expecting a boolean.");
128 status = BT_SELF_COMPONENT_STATUS_ERROR;
129 goto end;
130 }
131
132 fs_sink->quiet = (bool) bt_value_bool_get(value);
133 }
134
135end:
136 return status;
137}
138
139static
140void destroy_fs_sink_comp(struct fs_sink_comp *fs_sink)
141{
142 if (!fs_sink) {
143 goto end;
144 }
145
146 if (fs_sink->output_dir_path) {
147 g_string_free(fs_sink->output_dir_path, TRUE);
148 fs_sink->output_dir_path = NULL;
149 }
150
151 if (fs_sink->traces) {
152 g_hash_table_destroy(fs_sink->traces);
153 fs_sink->traces = NULL;
154 }
155
156 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(
157 fs_sink->upstream_iter);
158 g_free(fs_sink);
159
160end:
161 return;
162}
163
164BT_HIDDEN
165bt_self_component_status ctf_fs_sink_init(
166 bt_self_component_sink *self_comp, const bt_value *params,
167 void *init_method_data)
168{
169 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
170 struct fs_sink_comp *fs_sink = NULL;
171
172 fs_sink = g_new0(struct fs_sink_comp, 1);
173 if (!fs_sink) {
174 BT_LOGE_STR("Failed to allocate one CTF FS sink structure.");
175 status = BT_SELF_COMPONENT_STATUS_NOMEM;
176 goto end;
177 }
178
179 fs_sink->output_dir_path = g_string_new(NULL);
180 fs_sink->self_comp = self_comp;
181 status = configure_component(fs_sink, params);
182 if (status != BT_SELF_COMPONENT_STATUS_OK) {
183 /* configure_component() logs errors */
184 goto end;
185 }
186
187 if (fs_sink->assume_single_trace &&
188 g_file_test(fs_sink->output_dir_path->str,
189 G_FILE_TEST_EXISTS)) {
190 BT_LOGE("Single trace mode, but output path exists: "
191 "output-path=\"%s\"", fs_sink->output_dir_path->str);
192 status = BT_SELF_COMPONENT_STATUS_ERROR;
193 goto end;
194 }
195
196 status = ensure_output_dir_exists(fs_sink);
197 if (status != BT_SELF_COMPONENT_STATUS_OK) {
198 /* ensure_output_dir_exists() logs errors */
199 goto end;
200 }
201
202 fs_sink->traces = g_hash_table_new_full(g_direct_hash, g_direct_equal,
203 NULL, (GDestroyNotify) fs_sink_trace_destroy);
204 if (!fs_sink->traces) {
205 BT_LOGE_STR("Failed to allocate one GHashTable.");
206 status = BT_SELF_COMPONENT_STATUS_NOMEM;
207 goto end;
208 }
209
210 status = bt_self_component_sink_add_input_port(self_comp, in_port_name,
211 NULL, NULL);
212 if (status != BT_SELF_COMPONENT_STATUS_OK) {
213 goto end;
214 }
215
216 bt_self_component_set_data(
217 bt_self_component_sink_as_self_component(self_comp), fs_sink);
218
219end:
220 if (status != BT_SELF_COMPONENT_STATUS_OK) {
221 destroy_fs_sink_comp(fs_sink);
222 }
223
224 return status;
225}
226
227static inline
228struct fs_sink_stream *borrow_stream(struct fs_sink_comp *fs_sink,
229 const bt_stream *ir_stream)
230{
231 const bt_trace *ir_trace = bt_stream_borrow_trace_const(ir_stream);
232 struct fs_sink_trace *trace;
233 struct fs_sink_stream *stream = NULL;
234
235 trace = g_hash_table_lookup(fs_sink->traces, ir_trace);
236 if (unlikely(!trace)) {
237 if (fs_sink->assume_single_trace &&
238 g_hash_table_size(fs_sink->traces) > 0) {
239 BT_LOGE("Single trace mode, but getting more than one trace: "
240 "stream-name=\"%s\"",
241 bt_stream_get_name(ir_stream));
242 goto end;
243 }
244
245 trace = fs_sink_trace_create(fs_sink, ir_trace);
246 if (!trace) {
247 goto end;
248 }
249 }
250
251 stream = g_hash_table_lookup(trace->streams, ir_stream);
252 if (unlikely(!stream)) {
253 stream = fs_sink_stream_create(trace, ir_stream);
254 if (!stream) {
255 goto end;
256 }
257 }
258
259end:
260 return stream;
261}
262
263static inline
264bt_self_component_status handle_event_msg(struct fs_sink_comp *fs_sink,
265 const bt_message *msg)
266{
267 int ret;
268 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
269 const bt_event *ir_event = bt_message_event_borrow_event_const(msg);
270 const bt_stream *ir_stream = bt_event_borrow_stream_const(ir_event);
271 struct fs_sink_stream *stream;
272 struct fs_sink_ctf_event_class *ec = NULL;
273 const bt_clock_snapshot *cs = NULL;
274
275 stream = borrow_stream(fs_sink, ir_stream);
276 if (unlikely(!stream)) {
277 status = BT_SELF_COMPONENT_STATUS_ERROR;
278 goto end;
279 }
280
281 ret = try_translate_event_class_trace_ir_to_ctf_ir(stream->sc,
282 bt_event_borrow_class_const(ir_event), &ec);
283 if (ret) {
284 status = BT_SELF_COMPONENT_STATUS_ERROR;
285 goto end;
286 }
287
288 BT_ASSERT(ec);
289
290 if (stream->sc->default_clock_class) {
291 (void) bt_message_event_borrow_default_clock_snapshot_const(
292 msg, &cs);
293 }
294
295 ret = fs_sink_stream_write_event(stream, cs, ir_event, ec);
296 if (unlikely(ret)) {
297 status = BT_SELF_COMPONENT_STATUS_ERROR;
298 goto end;
299 }
300
301end:
302 return status;
303}
304
305static inline
306bt_self_component_status handle_packet_beginning_msg(
307 struct fs_sink_comp *fs_sink, const bt_message *msg)
308{
309 int ret;
310 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
311 const bt_packet *ir_packet =
312 bt_message_packet_beginning_borrow_packet_const(msg);
313 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
314 struct fs_sink_stream *stream;
315 const bt_clock_snapshot *cs = NULL;
316
317 stream = borrow_stream(fs_sink, ir_stream);
318 if (unlikely(!stream)) {
319 status = BT_SELF_COMPONENT_STATUS_ERROR;
320 goto end;
321 }
322
323 if (stream->sc->default_clock_class) {
324 (void) bt_message_packet_beginning_borrow_default_clock_snapshot_const(
325 msg, &cs);
326 BT_ASSERT(cs);
327 }
328
329 if (stream->discarded_events_state.in_range) {
330 /*
331 * Make sure that the current discarded events range's
332 * beginning time matches what's expected for CTF 1.8.
333 */
334 if (stream->sc->default_clock_class) {
335 uint64_t expected_cs;
336
337 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
338 /* We're opening the first packet */
339 expected_cs = bt_clock_snapshot_get_value(cs);
340 } else {
341 expected_cs = stream->prev_packet_state.end_cs;
342 }
343
344 if (stream->discarded_events_state.beginning_cs !=
345 expected_cs) {
346 BT_LOGE("Incompatible discarded events message: "
347 "unexpected beginning time: "
348 "beginning-cs-val=%" PRIu64 ", "
349 "expected-beginning-cs-val=%" PRIu64 ", "
350 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
351 "trace-name=\"%s\", path=\"%s/%s\"",
352 stream->discarded_events_state.beginning_cs,
353 expected_cs,
354 bt_stream_get_id(ir_stream),
355 bt_stream_get_name(ir_stream),
356 bt_trace_get_name(
357 bt_stream_borrow_trace_const(ir_stream)),
358 stream->trace->path->str, stream->file_name->str);
359 status = BT_SELF_COMPONENT_STATUS_ERROR;
360 goto end;
361 }
362 }
363 }
364
365
366 if (stream->discarded_packets_state.in_range) {
367 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
368 BT_LOGE("Incompatible discarded packets message "
369 "occuring before the stream's first packet: "
370 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
371 "trace-name=\"%s\", path=\"%s/%s\"",
372 bt_stream_get_id(ir_stream),
373 bt_stream_get_name(ir_stream),
374 bt_trace_get_name(
375 bt_stream_borrow_trace_const(ir_stream)),
376 stream->trace->path->str, stream->file_name->str);
377 status = BT_SELF_COMPONENT_STATUS_ERROR;
378 goto end;
379 }
380
381 /*
382 * Make sure that the current discarded packets range's
383 * beginning and end times match what's expected for CTF
384 * 1.8.
385 */
386 if (stream->sc->default_clock_class) {
387 uint64_t expected_end_cs =
388 bt_clock_snapshot_get_value(cs);
389
390 if (stream->discarded_packets_state.beginning_cs !=
391 stream->prev_packet_state.end_cs) {
392 BT_LOGE("Incompatible discarded packets message: "
393 "unexpected beginning time: "
394 "beginning-cs-val=%" PRIu64 ", "
395 "expected-beginning-cs-val=%" PRIu64 ", "
396 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
397 "trace-name=\"%s\", path=\"%s/%s\"",
398 stream->discarded_packets_state.beginning_cs,
399 stream->prev_packet_state.end_cs,
400 bt_stream_get_id(ir_stream),
401 bt_stream_get_name(ir_stream),
402 bt_trace_get_name(
403 bt_stream_borrow_trace_const(ir_stream)),
404 stream->trace->path->str, stream->file_name->str);
405 status = BT_SELF_COMPONENT_STATUS_ERROR;
406 goto end;
407 }
408
409 if (stream->discarded_packets_state.end_cs !=
410 expected_end_cs) {
411 BT_LOGE("Incompatible discarded packets message: "
412 "unexpected end time: "
413 "end-cs-val=%" PRIu64 ", "
414 "expected-end-cs-val=%" PRIu64 ", "
415 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
416 "trace-name=\"%s\", path=\"%s/%s\"",
417 stream->discarded_packets_state.beginning_cs,
418 expected_end_cs,
419 bt_stream_get_id(ir_stream),
420 bt_stream_get_name(ir_stream),
421 bt_trace_get_name(
422 bt_stream_borrow_trace_const(ir_stream)),
423 stream->trace->path->str, stream->file_name->str);
424 status = BT_SELF_COMPONENT_STATUS_ERROR;
425 goto end;
426 }
427 }
428 }
429
430 stream->discarded_packets_state.in_range = false;
431 ret = fs_sink_stream_open_packet(stream, cs, ir_packet);
432 if (ret) {
433 status = BT_SELF_COMPONENT_STATUS_ERROR;
434 goto end;
435 }
436
437end:
438 return status;
439}
440
441static inline
442bt_self_component_status handle_packet_end_msg(
443 struct fs_sink_comp *fs_sink, const bt_message *msg)
444{
445 int ret;
446 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
447 const bt_packet *ir_packet =
448 bt_message_packet_end_borrow_packet_const(msg);
449 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
450 struct fs_sink_stream *stream;
451 const bt_clock_snapshot *cs = NULL;
452
453 stream = borrow_stream(fs_sink, ir_stream);
454 if (unlikely(!stream)) {
455 status = BT_SELF_COMPONENT_STATUS_ERROR;
456 goto end;
457 }
458
459 if (stream->sc->default_clock_class) {
460 (void) bt_message_packet_end_borrow_default_clock_snapshot_const(
461 msg, &cs);
462 BT_ASSERT(cs);
463 }
464
465 if (stream->sc->default_clock_class) {
466 (void) bt_message_packet_end_borrow_default_clock_snapshot_const(
467 msg, &cs);
468 BT_ASSERT(cs);
469 }
470
471 if (stream->discarded_events_state.in_range) {
472 /*
473 * Make sure that the current discarded events range's
474 * end time matches what's expected for CTF 1.8.
475 */
476 if (stream->sc->default_clock_class) {
477 uint64_t expected_cs = bt_clock_snapshot_get_value(cs);
478
479 if (stream->discarded_events_state.end_cs !=
480 expected_cs) {
481 BT_LOGE("Incompatible discarded events message: "
482 "unexpected end time: "
483 "end-cs-val=%" PRIu64 ", "
484 "expected-end-cs-val=%" PRIu64 ", "
485 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
486 "trace-name=\"%s\", path=\"%s/%s\"",
487 stream->discarded_events_state.end_cs,
488 expected_cs,
489 bt_stream_get_id(ir_stream),
490 bt_stream_get_name(ir_stream),
491 bt_trace_get_name(
492 bt_stream_borrow_trace_const(ir_stream)),
493 stream->trace->path->str, stream->file_name->str);
494 status = BT_SELF_COMPONENT_STATUS_ERROR;
495 goto end;
496 }
497 }
498 }
499
500 ret = fs_sink_stream_close_packet(stream, cs);
501 if (ret) {
502 status = BT_SELF_COMPONENT_STATUS_ERROR;
503 goto end;
504 }
505
506 stream->discarded_events_state.in_range = false;
507
508end:
509 return status;
510}
511
512static inline
513bt_self_component_status handle_stream_beginning_msg(
514 struct fs_sink_comp *fs_sink, const bt_message *msg)
515{
516 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
517 const bt_stream *ir_stream =
518 bt_message_stream_beginning_borrow_stream_const(msg);
519 struct fs_sink_stream *stream;
520
521 stream = borrow_stream(fs_sink, ir_stream);
522 if (!stream) {
523 status = BT_SELF_COMPONENT_STATUS_ERROR;
524 goto end;
525 }
526
527 BT_LOGI("Created new, empty stream file: "
528 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
529 "trace-name=\"%s\", path=\"%s/%s\"",
530 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
531 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
532 stream->trace->path->str, stream->file_name->str);
533
534end:
535 return status;
536}
537
538static inline
539bt_self_component_status handle_stream_end_msg(struct fs_sink_comp *fs_sink,
540 const bt_message *msg)
541{
542 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
543 const bt_stream *ir_stream =
544 bt_message_stream_end_borrow_stream_const(msg);
545 struct fs_sink_stream *stream;
546
547 stream = borrow_stream(fs_sink, ir_stream);
548 if (!stream) {
549 status = BT_SELF_COMPONENT_STATUS_ERROR;
550 goto end;
551 }
552
553 BT_LOGI("Closing stream file: "
554 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
555 "trace-name=\"%s\", path=\"%s/%s\"",
556 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
557 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
558 stream->trace->path->str, stream->file_name->str);
559
560 /*
561 * This destroys the stream object and frees all its resources,
562 * closing the stream file.
563 */
564 g_hash_table_remove(stream->trace->streams, ir_stream);
565
566end:
567 return status;
568}
569
570static inline
571bt_self_component_status handle_discarded_events_msg(
572 struct fs_sink_comp *fs_sink, const bt_message *msg)
573{
574 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
575 const bt_stream *ir_stream =
576 bt_message_discarded_events_borrow_stream_const(msg);
577 struct fs_sink_stream *stream;
578 const bt_clock_snapshot *cs = NULL;
579 bt_property_availability avail;
580 uint64_t count;
581
582 stream = borrow_stream(fs_sink, ir_stream);
583 if (!stream) {
584 status = BT_SELF_COMPONENT_STATUS_ERROR;
585 goto end;
586 }
587
588 if (fs_sink->ignore_discarded_events) {
589 BT_LOGI("Ignoring discarded events message: "
590 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
591 "trace-name=\"%s\", path=\"%s/%s\"",
592 bt_stream_get_id(ir_stream),
593 bt_stream_get_name(ir_stream),
594 bt_trace_get_name(
595 bt_stream_borrow_trace_const(ir_stream)),
596 stream->trace->path->str, stream->file_name->str);
597 goto end;
598 }
599
600 if (stream->discarded_events_state.in_range) {
601 BT_LOGE("Unsupported contiguous discarded events message: "
602 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
603 "trace-name=\"%s\", path=\"%s/%s\"",
604 bt_stream_get_id(ir_stream),
605 bt_stream_get_name(ir_stream),
606 bt_trace_get_name(
607 bt_stream_borrow_trace_const(ir_stream)),
608 stream->trace->path->str, stream->file_name->str);
609 status = BT_SELF_COMPONENT_STATUS_ERROR;
610 goto end;
611 }
612
613 if (stream->packet_state.is_open) {
614 BT_LOGE("Unsupported discarded events message occuring "
615 "within a packet: "
616 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
617 "trace-name=\"%s\", path=\"%s/%s\"",
618 bt_stream_get_id(ir_stream),
619 bt_stream_get_name(ir_stream),
620 bt_trace_get_name(
621 bt_stream_borrow_trace_const(ir_stream)),
622 stream->trace->path->str, stream->file_name->str);
623 status = BT_SELF_COMPONENT_STATUS_ERROR;
624 goto end;
625 }
626
627 stream->discarded_events_state.in_range = true;
628
629 if (stream->sc->default_clock_class) {
630 /*
631 * The clock snapshot values will be validated when
632 * handling the next "packet beginning" message.
633 */
634 (void) bt_message_discarded_events_borrow_default_beginning_clock_snapshot_const(
635 msg, &cs);
636 BT_ASSERT(cs);
637 stream->discarded_events_state.beginning_cs =
638 bt_clock_snapshot_get_value(cs);
639 cs = NULL;
640 (void) bt_message_discarded_events_borrow_default_end_clock_snapshot_const(
641 msg, &cs);
642 BT_ASSERT(cs);
643 stream->discarded_events_state.end_cs =
644 bt_clock_snapshot_get_value(cs);
645 } else {
646 stream->discarded_events_state.beginning_cs = UINT64_C(-1);
647 stream->discarded_events_state.end_cs = UINT64_C(-1);
648 }
649
650 avail = bt_message_discarded_events_get_count(msg, &count);
651 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
652 count = 1;
653 }
654
655 stream->packet_state.discarded_events_counter += count;
656
657end:
658 return status;
659}
660
661static inline
662bt_self_component_status handle_discarded_packets_msg(
663 struct fs_sink_comp *fs_sink, const bt_message *msg)
664{
665 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
666 const bt_stream *ir_stream =
667 bt_message_discarded_packets_borrow_stream_const(msg);
668 struct fs_sink_stream *stream;
669 const bt_clock_snapshot *cs = NULL;
670 bt_property_availability avail;
671 uint64_t count;
672
673 stream = borrow_stream(fs_sink, ir_stream);
674 if (!stream) {
675 status = BT_SELF_COMPONENT_STATUS_ERROR;
676 goto end;
677 }
678
679 if (fs_sink->ignore_discarded_packets) {
680 BT_LOGI("Ignoring discarded packets message: "
681 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
682 "trace-name=\"%s\", path=\"%s/%s\"",
683 bt_stream_get_id(ir_stream),
684 bt_stream_get_name(ir_stream),
685 bt_trace_get_name(
686 bt_stream_borrow_trace_const(ir_stream)),
687 stream->trace->path->str, stream->file_name->str);
688 goto end;
689 }
690
691 if (stream->discarded_packets_state.in_range) {
692 BT_LOGE("Unsupported contiguous discarded packets message: "
693 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
694 "trace-name=\"%s\", path=\"%s/%s\"",
695 bt_stream_get_id(ir_stream),
696 bt_stream_get_name(ir_stream),
697 bt_trace_get_name(
698 bt_stream_borrow_trace_const(ir_stream)),
699 stream->trace->path->str, stream->file_name->str);
700 status = BT_SELF_COMPONENT_STATUS_ERROR;
701 goto end;
702 }
703
704 if (stream->packet_state.is_open) {
705 BT_LOGE("Unsupported discarded packets message occuring "
706 "within a packet: "
707 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
708 "trace-name=\"%s\", path=\"%s/%s\"",
709 bt_stream_get_id(ir_stream),
710 bt_stream_get_name(ir_stream),
711 bt_trace_get_name(
712 bt_stream_borrow_trace_const(ir_stream)),
713 stream->trace->path->str, stream->file_name->str);
714 status = BT_SELF_COMPONENT_STATUS_ERROR;
715 goto end;
716 }
717
718 stream->discarded_packets_state.in_range = true;
719
720 if (stream->sc->default_clock_class) {
721 /*
722 * The clock snapshot values will be validated when
723 * handling the next "packet beginning" message.
724 */
725 (void) bt_message_discarded_packets_borrow_default_beginning_clock_snapshot_const(
726 msg, &cs);
727 BT_ASSERT(cs);
728 stream->discarded_packets_state.beginning_cs =
729 bt_clock_snapshot_get_value(cs);
730 cs = NULL;
731 (void) bt_message_discarded_packets_borrow_default_end_clock_snapshot_const(
732 msg, &cs);
733 BT_ASSERT(cs);
734 stream->discarded_packets_state.end_cs =
735 bt_clock_snapshot_get_value(cs);
736 } else {
737 stream->discarded_packets_state.beginning_cs = UINT64_C(-1);
738 stream->discarded_packets_state.end_cs = UINT64_C(-1);
739 }
740
741 avail = bt_message_discarded_packets_get_count(msg, &count);
742 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
743 count = 1;
744 }
745
746 stream->packet_state.seq_num += count;
747
748end:
749 return status;
750}
751
752static inline
753void put_messages(bt_message_array_const msgs, uint64_t count)
754{
755 uint64_t i;
756
757 for (i = 0; i < count; i++) {
758 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
759 }
760}
761
762BT_HIDDEN
763bt_self_component_status ctf_fs_sink_consume(bt_self_component_sink *self_comp)
764{
765 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
766 struct fs_sink_comp *fs_sink;
767 bt_message_iterator_status it_status;
768 uint64_t msg_count = 0;
769 bt_message_array_const msgs;
770
771 fs_sink = bt_self_component_get_data(
772 bt_self_component_sink_as_self_component(self_comp));
773 BT_ASSERT(fs_sink);
774 BT_ASSERT(fs_sink->upstream_iter);
775
776 /* Consume messages */
777 it_status = bt_self_component_port_input_message_iterator_next(
778 fs_sink->upstream_iter, &msgs, &msg_count);
779 if (it_status < 0) {
780 status = BT_SELF_COMPONENT_STATUS_ERROR;
781 goto end;
782 }
783
784 switch (it_status) {
785 case BT_MESSAGE_ITERATOR_STATUS_OK:
786 {
787 uint64_t i;
788
789 for (i = 0; i < msg_count; i++) {
790 const bt_message *msg = msgs[i];
791
792 BT_ASSERT(msg);
793
794 switch (bt_message_get_type(msg)) {
795 case BT_MESSAGE_TYPE_EVENT:
796 status = handle_event_msg(fs_sink, msg);
797 break;
798 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
799 status = handle_packet_beginning_msg(
800 fs_sink, msg);
801 break;
802 case BT_MESSAGE_TYPE_PACKET_END:
803 status = handle_packet_end_msg(
804 fs_sink, msg);
805 break;
806 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
807 /* Ignore */
808 BT_LOGD_STR("Ignoring message iterator inactivity message.");
809 break;
810 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
811 status = handle_stream_beginning_msg(
812 fs_sink, msg);
813 break;
814 case BT_MESSAGE_TYPE_STREAM_END:
815 status = handle_stream_end_msg(
816 fs_sink, msg);
817 break;
818 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
819 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
820 /* Not supported by CTF 1.8 */
821 BT_LOGD_STR("Ignoring stream activity message.");
822 break;
823 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
824 status = handle_discarded_events_msg(
825 fs_sink, msg);
826 break;
827 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
828 status = handle_discarded_packets_msg(
829 fs_sink, msg);
830 break;
831 default:
832 abort();
833 }
834
835 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
836
837 if (status != BT_SELF_COMPONENT_STATUS_OK) {
838 BT_LOGE("Failed to handle message: "
839 "generated CTF traces could be incomplete: "
840 "output-dir-path=\"%s\"",
841 fs_sink->output_dir_path->str);
842 goto error;
843 }
844 }
845
846 break;
847 }
848 case BT_MESSAGE_ITERATOR_STATUS_AGAIN:
849 status = BT_SELF_COMPONENT_STATUS_AGAIN;
850 break;
851 case BT_MESSAGE_ITERATOR_STATUS_END:
852 /* TODO: Finalize all traces (should already be done?) */
853 status = BT_SELF_COMPONENT_STATUS_END;
854 break;
855 case BT_MESSAGE_ITERATOR_STATUS_NOMEM:
856 status = BT_SELF_COMPONENT_STATUS_NOMEM;
857 break;
858 case BT_MESSAGE_ITERATOR_STATUS_ERROR:
859 status = BT_SELF_COMPONENT_STATUS_NOMEM;
860 break;
861 default:
862 break;
863 }
864
865 goto end;
866
867error:
868 BT_ASSERT(status != BT_SELF_COMPONENT_STATUS_OK);
869 put_messages(msgs, msg_count);
870
871end:
872 return status;
873}
874
875BT_HIDDEN
876bt_self_component_status ctf_fs_sink_graph_is_configured(
877 bt_self_component_sink *self_comp)
878{
879 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
880 struct fs_sink_comp *fs_sink = bt_self_component_get_data(
881 bt_self_component_sink_as_self_component(self_comp));
882
883 fs_sink->upstream_iter =
884 bt_self_component_port_input_message_iterator_create(
885 bt_self_component_sink_borrow_input_port_by_name(
886 self_comp, in_port_name));
887 if (!fs_sink->upstream_iter) {
888 status = BT_SELF_COMPONENT_STATUS_NOMEM;
889 goto end;
890 }
891
892end:
893 return status;
894}
895
896BT_HIDDEN
897void ctf_fs_sink_finalize(bt_self_component_sink *self_comp)
898{
899 struct fs_sink_comp *fs_sink = bt_self_component_get_data(
900 bt_self_component_sink_as_self_component(self_comp));
901
902 destroy_fs_sink_comp(fs_sink);
903}
This page took 0.056475 seconds and 4 git commands to generate.