lib: make discarded events/packets support and clock snapshots optional
[babeltrace.git] / plugins / ctf / fs-sink / fs-sink.c
CommitLineData
15fe47e0
PP
1/*
2 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 * SOFTWARE.
21 */
22
23#define BT_LOG_TAG "PLUGIN-CTF-FS-SINK"
24#include "logging.h"
25
26#include <babeltrace/babeltrace.h>
27#include <stdio.h>
28#include <stdbool.h>
29#include <glib.h>
30#include <babeltrace/assert-internal.h>
31#include <babeltrace/ctfser-internal.h>
32
33#include "fs-sink.h"
34#include "fs-sink-trace.h"
35#include "fs-sink-stream.h"
36#include "fs-sink-ctf-meta.h"
37#include "translate-trace-ir-to-ctf-ir.h"
38#include "translate-ctf-ir-to-tsdl.h"
39
40static
41const char * const in_port_name = "in";
42
43static
44bt_self_component_status ensure_output_dir_exists(
45 struct fs_sink_comp *fs_sink)
46{
47 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
48 int ret;
49
50 ret = g_mkdir_with_parents(fs_sink->output_dir_path->str, 0755);
51 if (ret) {
52 BT_LOGE_ERRNO("Cannot create directories for output directory",
53 ": output-dir-path=\"%s\"",
54 fs_sink->output_dir_path->str);
55 status = BT_SELF_COMPONENT_STATUS_ERROR;
56 goto end;
57 }
58
59end:
60 return status;
61}
62
63static
64bt_self_component_status configure_component(struct fs_sink_comp *fs_sink,
65 const bt_value *params)
66{
67 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
68 const bt_value *value;
69
70 value = bt_value_map_borrow_entry_value_const(params, "path");
71 if (!value) {
72 BT_LOGE_STR("Missing mandatory `path` parameter.");
73 status = BT_SELF_COMPONENT_STATUS_ERROR;
74 goto end;
75 }
76
77 if (!bt_value_is_string(value)) {
78 BT_LOGE_STR("`path` parameter: expecting a string.");
79 status = BT_SELF_COMPONENT_STATUS_ERROR;
80 goto end;
81 }
82
83 g_string_assign(fs_sink->output_dir_path,
84 bt_value_string_get(value));
85 value = bt_value_map_borrow_entry_value_const(params,
86 "assume-single-trace");
87 if (value) {
88 if (!bt_value_is_bool(value)) {
89 BT_LOGE_STR("`assume-single-trace` parameter: expecting a boolean.");
90 status = BT_SELF_COMPONENT_STATUS_ERROR;
91 goto end;
92 }
93
94 fs_sink->assume_single_trace = (bool) bt_value_bool_get(value);
95 }
96
97 value = bt_value_map_borrow_entry_value_const(params,
98 "ignore-discarded-events");
99 if (value) {
100 if (!bt_value_is_bool(value)) {
101 BT_LOGE_STR("`ignore-discarded-events` parameter: expecting a boolean.");
102 status = BT_SELF_COMPONENT_STATUS_ERROR;
103 goto end;
104 }
105
106 fs_sink->ignore_discarded_events =
107 (bool) bt_value_bool_get(value);
108 }
109
110 value = bt_value_map_borrow_entry_value_const(params,
111 "ignore-discarded-packets");
112 if (value) {
113 if (!bt_value_is_bool(value)) {
114 BT_LOGE_STR("`ignore-discarded-packets` parameter: expecting a boolean.");
115 status = BT_SELF_COMPONENT_STATUS_ERROR;
116 goto end;
117 }
118
119 fs_sink->ignore_discarded_packets =
120 (bool) bt_value_bool_get(value);
121 }
122
123 value = bt_value_map_borrow_entry_value_const(params,
124 "quiet");
125 if (value) {
126 if (!bt_value_is_bool(value)) {
127 BT_LOGE_STR("`quiet` parameter: expecting a boolean.");
128 status = BT_SELF_COMPONENT_STATUS_ERROR;
129 goto end;
130 }
131
132 fs_sink->quiet = (bool) bt_value_bool_get(value);
133 }
134
135end:
136 return status;
137}
138
139static
140void destroy_fs_sink_comp(struct fs_sink_comp *fs_sink)
141{
142 if (!fs_sink) {
143 goto end;
144 }
145
146 if (fs_sink->output_dir_path) {
147 g_string_free(fs_sink->output_dir_path, TRUE);
148 fs_sink->output_dir_path = NULL;
149 }
150
151 if (fs_sink->traces) {
152 g_hash_table_destroy(fs_sink->traces);
153 fs_sink->traces = NULL;
154 }
155
156 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(
157 fs_sink->upstream_iter);
158 g_free(fs_sink);
159
160end:
161 return;
162}
163
164BT_HIDDEN
165bt_self_component_status ctf_fs_sink_init(
166 bt_self_component_sink *self_comp, const bt_value *params,
167 void *init_method_data)
168{
169 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
170 struct fs_sink_comp *fs_sink = NULL;
171
172 fs_sink = g_new0(struct fs_sink_comp, 1);
173 if (!fs_sink) {
174 BT_LOGE_STR("Failed to allocate one CTF FS sink structure.");
175 status = BT_SELF_COMPONENT_STATUS_NOMEM;
176 goto end;
177 }
178
179 fs_sink->output_dir_path = g_string_new(NULL);
180 fs_sink->self_comp = self_comp;
181 status = configure_component(fs_sink, params);
182 if (status != BT_SELF_COMPONENT_STATUS_OK) {
183 /* configure_component() logs errors */
184 goto end;
185 }
186
187 if (fs_sink->assume_single_trace &&
188 g_file_test(fs_sink->output_dir_path->str,
189 G_FILE_TEST_EXISTS)) {
190 BT_LOGE("Single trace mode, but output path exists: "
191 "output-path=\"%s\"", fs_sink->output_dir_path->str);
192 status = BT_SELF_COMPONENT_STATUS_ERROR;
193 goto end;
194 }
195
196 status = ensure_output_dir_exists(fs_sink);
197 if (status != BT_SELF_COMPONENT_STATUS_OK) {
198 /* ensure_output_dir_exists() logs errors */
199 goto end;
200 }
201
202 fs_sink->traces = g_hash_table_new_full(g_direct_hash, g_direct_equal,
203 NULL, (GDestroyNotify) fs_sink_trace_destroy);
204 if (!fs_sink->traces) {
205 BT_LOGE_STR("Failed to allocate one GHashTable.");
206 status = BT_SELF_COMPONENT_STATUS_NOMEM;
207 goto end;
208 }
209
210 status = bt_self_component_sink_add_input_port(self_comp, in_port_name,
211 NULL, NULL);
212 if (status != BT_SELF_COMPONENT_STATUS_OK) {
213 goto end;
214 }
215
216 bt_self_component_set_data(
217 bt_self_component_sink_as_self_component(self_comp), fs_sink);
218
219end:
220 if (status != BT_SELF_COMPONENT_STATUS_OK) {
221 destroy_fs_sink_comp(fs_sink);
222 }
223
224 return status;
225}
226
227static inline
228struct fs_sink_stream *borrow_stream(struct fs_sink_comp *fs_sink,
229 const bt_stream *ir_stream)
230{
231 const bt_trace *ir_trace = bt_stream_borrow_trace_const(ir_stream);
232 struct fs_sink_trace *trace;
233 struct fs_sink_stream *stream = NULL;
234
235 trace = g_hash_table_lookup(fs_sink->traces, ir_trace);
236 if (unlikely(!trace)) {
237 if (fs_sink->assume_single_trace &&
238 g_hash_table_size(fs_sink->traces) > 0) {
239 BT_LOGE("Single trace mode, but getting more than one trace: "
240 "stream-name=\"%s\"",
241 bt_stream_get_name(ir_stream));
242 goto end;
243 }
244
245 trace = fs_sink_trace_create(fs_sink, ir_trace);
246 if (!trace) {
247 goto end;
248 }
249 }
250
251 stream = g_hash_table_lookup(trace->streams, ir_stream);
252 if (unlikely(!stream)) {
253 stream = fs_sink_stream_create(trace, ir_stream);
254 if (!stream) {
255 goto end;
256 }
257 }
258
259end:
260 return stream;
261}
262
263static inline
264bt_self_component_status handle_event_msg(struct fs_sink_comp *fs_sink,
265 const bt_message *msg)
266{
267 int ret;
268 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
269 const bt_event *ir_event = bt_message_event_borrow_event_const(msg);
270 const bt_stream *ir_stream = bt_event_borrow_stream_const(ir_event);
271 struct fs_sink_stream *stream;
272 struct fs_sink_ctf_event_class *ec = NULL;
273 const bt_clock_snapshot *cs = NULL;
274
275 stream = borrow_stream(fs_sink, ir_stream);
276 if (unlikely(!stream)) {
277 status = BT_SELF_COMPONENT_STATUS_ERROR;
278 goto end;
279 }
280
281 ret = try_translate_event_class_trace_ir_to_ctf_ir(stream->sc,
282 bt_event_borrow_class_const(ir_event), &ec);
283 if (ret) {
284 status = BT_SELF_COMPONENT_STATUS_ERROR;
285 goto end;
286 }
287
288 BT_ASSERT(ec);
289
290 if (stream->sc->default_clock_class) {
0cbc2c33
PP
291 cs = bt_message_event_borrow_default_clock_snapshot_const(
292 msg);
15fe47e0
PP
293 }
294
295 ret = fs_sink_stream_write_event(stream, cs, ir_event, ec);
296 if (unlikely(ret)) {
297 status = BT_SELF_COMPONENT_STATUS_ERROR;
298 goto end;
299 }
300
301end:
302 return status;
303}
304
305static inline
306bt_self_component_status handle_packet_beginning_msg(
307 struct fs_sink_comp *fs_sink, const bt_message *msg)
308{
309 int ret;
310 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
311 const bt_packet *ir_packet =
312 bt_message_packet_beginning_borrow_packet_const(msg);
313 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
314 struct fs_sink_stream *stream;
315 const bt_clock_snapshot *cs = NULL;
316
317 stream = borrow_stream(fs_sink, ir_stream);
318 if (unlikely(!stream)) {
319 status = BT_SELF_COMPONENT_STATUS_ERROR;
320 goto end;
321 }
322
323 if (stream->sc->default_clock_class) {
0cbc2c33
PP
324 cs = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
325 msg);
15fe47e0
PP
326 BT_ASSERT(cs);
327 }
328
329 if (stream->discarded_events_state.in_range) {
330 /*
331 * Make sure that the current discarded events range's
332 * beginning time matches what's expected for CTF 1.8.
333 */
334 if (stream->sc->default_clock_class) {
335 uint64_t expected_cs;
336
337 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
338 /* We're opening the first packet */
339 expected_cs = bt_clock_snapshot_get_value(cs);
340 } else {
341 expected_cs = stream->prev_packet_state.end_cs;
342 }
343
344 if (stream->discarded_events_state.beginning_cs !=
345 expected_cs) {
346 BT_LOGE("Incompatible discarded events message: "
347 "unexpected beginning time: "
348 "beginning-cs-val=%" PRIu64 ", "
349 "expected-beginning-cs-val=%" PRIu64 ", "
350 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
351 "trace-name=\"%s\", path=\"%s/%s\"",
352 stream->discarded_events_state.beginning_cs,
353 expected_cs,
354 bt_stream_get_id(ir_stream),
355 bt_stream_get_name(ir_stream),
356 bt_trace_get_name(
357 bt_stream_borrow_trace_const(ir_stream)),
358 stream->trace->path->str, stream->file_name->str);
359 status = BT_SELF_COMPONENT_STATUS_ERROR;
360 goto end;
361 }
362 }
363 }
364
365
366 if (stream->discarded_packets_state.in_range) {
367 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
368 BT_LOGE("Incompatible discarded packets message "
369 "occuring before the stream's first packet: "
370 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
371 "trace-name=\"%s\", path=\"%s/%s\"",
372 bt_stream_get_id(ir_stream),
373 bt_stream_get_name(ir_stream),
374 bt_trace_get_name(
375 bt_stream_borrow_trace_const(ir_stream)),
376 stream->trace->path->str, stream->file_name->str);
377 status = BT_SELF_COMPONENT_STATUS_ERROR;
378 goto end;
379 }
380
381 /*
382 * Make sure that the current discarded packets range's
383 * beginning and end times match what's expected for CTF
384 * 1.8.
385 */
386 if (stream->sc->default_clock_class) {
387 uint64_t expected_end_cs =
388 bt_clock_snapshot_get_value(cs);
389
390 if (stream->discarded_packets_state.beginning_cs !=
391 stream->prev_packet_state.end_cs) {
392 BT_LOGE("Incompatible discarded packets message: "
393 "unexpected beginning time: "
394 "beginning-cs-val=%" PRIu64 ", "
395 "expected-beginning-cs-val=%" PRIu64 ", "
396 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
397 "trace-name=\"%s\", path=\"%s/%s\"",
398 stream->discarded_packets_state.beginning_cs,
399 stream->prev_packet_state.end_cs,
400 bt_stream_get_id(ir_stream),
401 bt_stream_get_name(ir_stream),
402 bt_trace_get_name(
403 bt_stream_borrow_trace_const(ir_stream)),
404 stream->trace->path->str, stream->file_name->str);
405 status = BT_SELF_COMPONENT_STATUS_ERROR;
406 goto end;
407 }
408
409 if (stream->discarded_packets_state.end_cs !=
410 expected_end_cs) {
411 BT_LOGE("Incompatible discarded packets message: "
412 "unexpected end time: "
413 "end-cs-val=%" PRIu64 ", "
414 "expected-end-cs-val=%" PRIu64 ", "
415 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
416 "trace-name=\"%s\", path=\"%s/%s\"",
417 stream->discarded_packets_state.beginning_cs,
418 expected_end_cs,
419 bt_stream_get_id(ir_stream),
420 bt_stream_get_name(ir_stream),
421 bt_trace_get_name(
422 bt_stream_borrow_trace_const(ir_stream)),
423 stream->trace->path->str, stream->file_name->str);
424 status = BT_SELF_COMPONENT_STATUS_ERROR;
425 goto end;
426 }
427 }
428 }
429
430 stream->discarded_packets_state.in_range = false;
431 ret = fs_sink_stream_open_packet(stream, cs, ir_packet);
432 if (ret) {
433 status = BT_SELF_COMPONENT_STATUS_ERROR;
434 goto end;
435 }
436
437end:
438 return status;
439}
440
441static inline
442bt_self_component_status handle_packet_end_msg(
443 struct fs_sink_comp *fs_sink, const bt_message *msg)
444{
445 int ret;
446 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
447 const bt_packet *ir_packet =
448 bt_message_packet_end_borrow_packet_const(msg);
449 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
450 struct fs_sink_stream *stream;
451 const bt_clock_snapshot *cs = NULL;
452
453 stream = borrow_stream(fs_sink, ir_stream);
454 if (unlikely(!stream)) {
455 status = BT_SELF_COMPONENT_STATUS_ERROR;
456 goto end;
457 }
458
459 if (stream->sc->default_clock_class) {
0cbc2c33
PP
460 cs = bt_message_packet_end_borrow_default_clock_snapshot_const(
461 msg);
15fe47e0
PP
462 BT_ASSERT(cs);
463 }
464
465 if (stream->sc->default_clock_class) {
0cbc2c33
PP
466 cs = bt_message_packet_end_borrow_default_clock_snapshot_const(
467 msg);
15fe47e0
PP
468 BT_ASSERT(cs);
469 }
470
471 if (stream->discarded_events_state.in_range) {
472 /*
473 * Make sure that the current discarded events range's
474 * end time matches what's expected for CTF 1.8.
475 */
476 if (stream->sc->default_clock_class) {
477 uint64_t expected_cs = bt_clock_snapshot_get_value(cs);
478
479 if (stream->discarded_events_state.end_cs !=
480 expected_cs) {
481 BT_LOGE("Incompatible discarded events message: "
482 "unexpected end time: "
483 "end-cs-val=%" PRIu64 ", "
484 "expected-end-cs-val=%" PRIu64 ", "
485 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
486 "trace-name=\"%s\", path=\"%s/%s\"",
487 stream->discarded_events_state.end_cs,
488 expected_cs,
489 bt_stream_get_id(ir_stream),
490 bt_stream_get_name(ir_stream),
491 bt_trace_get_name(
492 bt_stream_borrow_trace_const(ir_stream)),
493 stream->trace->path->str, stream->file_name->str);
494 status = BT_SELF_COMPONENT_STATUS_ERROR;
495 goto end;
496 }
497 }
498 }
499
500 ret = fs_sink_stream_close_packet(stream, cs);
501 if (ret) {
502 status = BT_SELF_COMPONENT_STATUS_ERROR;
503 goto end;
504 }
505
506 stream->discarded_events_state.in_range = false;
507
508end:
509 return status;
510}
511
512static inline
513bt_self_component_status handle_stream_beginning_msg(
514 struct fs_sink_comp *fs_sink, const bt_message *msg)
515{
516 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
517 const bt_stream *ir_stream =
518 bt_message_stream_beginning_borrow_stream_const(msg);
649934d2
PP
519 const bt_stream_class *ir_sc =
520 bt_stream_borrow_class_const(ir_stream);
15fe47e0
PP
521 struct fs_sink_stream *stream;
522
649934d2
PP
523 /*
524 * Temporary: if the stream's class has a default clock class,
525 * make sure packet beginning and end messages have default
526 * clock snapshots until the support for not having them is
527 * implemented.
528 */
529 if (bt_stream_class_borrow_default_clock_class_const(ir_sc)) {
530 if (!bt_stream_class_packets_have_default_beginning_clock_snapshot(
531 ir_sc)) {
532 BT_LOGE("Unsupported stream: packets have "
533 "no beginning clock snapshot: "
534 "stream-addr=%p, "
535 "stream-id=%" PRIu64 ", "
536 "stream-name=\"%s\"",
537 ir_stream, bt_stream_get_id(ir_stream),
538 bt_stream_get_name(ir_stream));
539 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
540 goto end;
541 }
542
543 if (!bt_stream_class_packets_have_default_end_clock_snapshot(
544 ir_sc)) {
545 BT_LOGE("Unsupported stream: packets have "
546 "no end clock snapshot: "
547 "stream-addr=%p, "
548 "stream-id=%" PRIu64 ", "
549 "stream-name=\"%s\"",
550 ir_stream, bt_stream_get_id(ir_stream),
551 bt_stream_get_name(ir_stream));
552 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
553 goto end;
554 }
2e90378a
PP
555
556 if (!fs_sink->ignore_discarded_events &&
557 bt_stream_class_supports_discarded_events(ir_sc) &&
558 !bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc)) {
559 BT_LOGE("Unsupported stream: discarded events "
560 "have no clock snapshots: "
561 "stream-addr=%p, "
562 "stream-id=%" PRIu64 ", "
563 "stream-name=\"%s\"",
564 ir_stream, bt_stream_get_id(ir_stream),
565 bt_stream_get_name(ir_stream));
566 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
567 goto end;
568 }
569
570 if (!fs_sink->ignore_discarded_packets &&
571 bt_stream_class_supports_discarded_packets(ir_sc) &&
572 !bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc)) {
573 BT_LOGE("Unsupported stream: discarded packets "
574 "have no clock snapshots: "
575 "stream-addr=%p, "
576 "stream-id=%" PRIu64 ", "
577 "stream-name=\"%s\"",
578 ir_stream, bt_stream_get_id(ir_stream),
579 bt_stream_get_name(ir_stream));
580 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
581 goto end;
582 }
649934d2
PP
583 }
584
15fe47e0
PP
585 stream = borrow_stream(fs_sink, ir_stream);
586 if (!stream) {
587 status = BT_SELF_COMPONENT_STATUS_ERROR;
588 goto end;
589 }
590
591 BT_LOGI("Created new, empty stream file: "
592 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
593 "trace-name=\"%s\", path=\"%s/%s\"",
594 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
595 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
596 stream->trace->path->str, stream->file_name->str);
597
598end:
599 return status;
600}
601
602static inline
603bt_self_component_status handle_stream_end_msg(struct fs_sink_comp *fs_sink,
604 const bt_message *msg)
605{
606 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
607 const bt_stream *ir_stream =
608 bt_message_stream_end_borrow_stream_const(msg);
609 struct fs_sink_stream *stream;
610
611 stream = borrow_stream(fs_sink, ir_stream);
612 if (!stream) {
613 status = BT_SELF_COMPONENT_STATUS_ERROR;
614 goto end;
615 }
616
617 BT_LOGI("Closing stream file: "
618 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
619 "trace-name=\"%s\", path=\"%s/%s\"",
620 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
621 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
622 stream->trace->path->str, stream->file_name->str);
623
624 /*
625 * This destroys the stream object and frees all its resources,
626 * closing the stream file.
627 */
628 g_hash_table_remove(stream->trace->streams, ir_stream);
629
630end:
631 return status;
632}
633
634static inline
635bt_self_component_status handle_discarded_events_msg(
636 struct fs_sink_comp *fs_sink, const bt_message *msg)
637{
638 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
639 const bt_stream *ir_stream =
640 bt_message_discarded_events_borrow_stream_const(msg);
641 struct fs_sink_stream *stream;
642 const bt_clock_snapshot *cs = NULL;
643 bt_property_availability avail;
644 uint64_t count;
645
646 stream = borrow_stream(fs_sink, ir_stream);
647 if (!stream) {
648 status = BT_SELF_COMPONENT_STATUS_ERROR;
649 goto end;
650 }
651
652 if (fs_sink->ignore_discarded_events) {
653 BT_LOGI("Ignoring discarded events message: "
654 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
655 "trace-name=\"%s\", path=\"%s/%s\"",
656 bt_stream_get_id(ir_stream),
657 bt_stream_get_name(ir_stream),
658 bt_trace_get_name(
659 bt_stream_borrow_trace_const(ir_stream)),
660 stream->trace->path->str, stream->file_name->str);
661 goto end;
662 }
663
664 if (stream->discarded_events_state.in_range) {
665 BT_LOGE("Unsupported contiguous discarded events message: "
666 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
667 "trace-name=\"%s\", path=\"%s/%s\"",
668 bt_stream_get_id(ir_stream),
669 bt_stream_get_name(ir_stream),
670 bt_trace_get_name(
671 bt_stream_borrow_trace_const(ir_stream)),
672 stream->trace->path->str, stream->file_name->str);
673 status = BT_SELF_COMPONENT_STATUS_ERROR;
674 goto end;
675 }
676
677 if (stream->packet_state.is_open) {
678 BT_LOGE("Unsupported discarded events message occuring "
679 "within a packet: "
680 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
681 "trace-name=\"%s\", path=\"%s/%s\"",
682 bt_stream_get_id(ir_stream),
683 bt_stream_get_name(ir_stream),
684 bt_trace_get_name(
685 bt_stream_borrow_trace_const(ir_stream)),
686 stream->trace->path->str, stream->file_name->str);
687 status = BT_SELF_COMPONENT_STATUS_ERROR;
688 goto end;
689 }
690
691 stream->discarded_events_state.in_range = true;
692
693 if (stream->sc->default_clock_class) {
694 /*
695 * The clock snapshot values will be validated when
696 * handling the next "packet beginning" message.
697 */
0cbc2c33
PP
698 cs = bt_message_discarded_events_borrow_default_beginning_clock_snapshot_const(
699 msg);
15fe47e0
PP
700 BT_ASSERT(cs);
701 stream->discarded_events_state.beginning_cs =
702 bt_clock_snapshot_get_value(cs);
0cbc2c33
PP
703 cs = bt_message_discarded_events_borrow_default_end_clock_snapshot_const(
704 msg);
15fe47e0
PP
705 BT_ASSERT(cs);
706 stream->discarded_events_state.end_cs =
707 bt_clock_snapshot_get_value(cs);
708 } else {
709 stream->discarded_events_state.beginning_cs = UINT64_C(-1);
710 stream->discarded_events_state.end_cs = UINT64_C(-1);
711 }
712
713 avail = bt_message_discarded_events_get_count(msg, &count);
714 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
715 count = 1;
716 }
717
718 stream->packet_state.discarded_events_counter += count;
719
720end:
721 return status;
722}
723
724static inline
725bt_self_component_status handle_discarded_packets_msg(
726 struct fs_sink_comp *fs_sink, const bt_message *msg)
727{
728 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
729 const bt_stream *ir_stream =
730 bt_message_discarded_packets_borrow_stream_const(msg);
731 struct fs_sink_stream *stream;
732 const bt_clock_snapshot *cs = NULL;
733 bt_property_availability avail;
734 uint64_t count;
735
736 stream = borrow_stream(fs_sink, ir_stream);
737 if (!stream) {
738 status = BT_SELF_COMPONENT_STATUS_ERROR;
739 goto end;
740 }
741
742 if (fs_sink->ignore_discarded_packets) {
743 BT_LOGI("Ignoring discarded packets message: "
744 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
745 "trace-name=\"%s\", path=\"%s/%s\"",
746 bt_stream_get_id(ir_stream),
747 bt_stream_get_name(ir_stream),
748 bt_trace_get_name(
749 bt_stream_borrow_trace_const(ir_stream)),
750 stream->trace->path->str, stream->file_name->str);
751 goto end;
752 }
753
754 if (stream->discarded_packets_state.in_range) {
755 BT_LOGE("Unsupported contiguous discarded packets message: "
756 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
757 "trace-name=\"%s\", path=\"%s/%s\"",
758 bt_stream_get_id(ir_stream),
759 bt_stream_get_name(ir_stream),
760 bt_trace_get_name(
761 bt_stream_borrow_trace_const(ir_stream)),
762 stream->trace->path->str, stream->file_name->str);
763 status = BT_SELF_COMPONENT_STATUS_ERROR;
764 goto end;
765 }
766
767 if (stream->packet_state.is_open) {
768 BT_LOGE("Unsupported discarded packets message occuring "
769 "within a packet: "
770 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
771 "trace-name=\"%s\", path=\"%s/%s\"",
772 bt_stream_get_id(ir_stream),
773 bt_stream_get_name(ir_stream),
774 bt_trace_get_name(
775 bt_stream_borrow_trace_const(ir_stream)),
776 stream->trace->path->str, stream->file_name->str);
777 status = BT_SELF_COMPONENT_STATUS_ERROR;
778 goto end;
779 }
780
781 stream->discarded_packets_state.in_range = true;
782
783 if (stream->sc->default_clock_class) {
784 /*
785 * The clock snapshot values will be validated when
786 * handling the next "packet beginning" message.
787 */
0cbc2c33
PP
788 cs = bt_message_discarded_packets_borrow_default_beginning_clock_snapshot_const(
789 msg);
15fe47e0
PP
790 BT_ASSERT(cs);
791 stream->discarded_packets_state.beginning_cs =
792 bt_clock_snapshot_get_value(cs);
0cbc2c33
PP
793 cs = bt_message_discarded_packets_borrow_default_end_clock_snapshot_const(
794 msg);
15fe47e0
PP
795 BT_ASSERT(cs);
796 stream->discarded_packets_state.end_cs =
797 bt_clock_snapshot_get_value(cs);
798 } else {
799 stream->discarded_packets_state.beginning_cs = UINT64_C(-1);
800 stream->discarded_packets_state.end_cs = UINT64_C(-1);
801 }
802
803 avail = bt_message_discarded_packets_get_count(msg, &count);
804 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
805 count = 1;
806 }
807
808 stream->packet_state.seq_num += count;
809
810end:
811 return status;
812}
813
814static inline
815void put_messages(bt_message_array_const msgs, uint64_t count)
816{
817 uint64_t i;
818
819 for (i = 0; i < count; i++) {
820 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
821 }
822}
823
824BT_HIDDEN
825bt_self_component_status ctf_fs_sink_consume(bt_self_component_sink *self_comp)
826{
827 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
828 struct fs_sink_comp *fs_sink;
829 bt_message_iterator_status it_status;
830 uint64_t msg_count = 0;
831 bt_message_array_const msgs;
832
833 fs_sink = bt_self_component_get_data(
834 bt_self_component_sink_as_self_component(self_comp));
835 BT_ASSERT(fs_sink);
836 BT_ASSERT(fs_sink->upstream_iter);
837
838 /* Consume messages */
839 it_status = bt_self_component_port_input_message_iterator_next(
840 fs_sink->upstream_iter, &msgs, &msg_count);
841 if (it_status < 0) {
842 status = BT_SELF_COMPONENT_STATUS_ERROR;
843 goto end;
844 }
845
846 switch (it_status) {
847 case BT_MESSAGE_ITERATOR_STATUS_OK:
848 {
849 uint64_t i;
850
851 for (i = 0; i < msg_count; i++) {
852 const bt_message *msg = msgs[i];
853
854 BT_ASSERT(msg);
855
856 switch (bt_message_get_type(msg)) {
857 case BT_MESSAGE_TYPE_EVENT:
858 status = handle_event_msg(fs_sink, msg);
859 break;
860 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
861 status = handle_packet_beginning_msg(
862 fs_sink, msg);
863 break;
864 case BT_MESSAGE_TYPE_PACKET_END:
865 status = handle_packet_end_msg(
866 fs_sink, msg);
867 break;
868 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
869 /* Ignore */
870 BT_LOGD_STR("Ignoring message iterator inactivity message.");
871 break;
872 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
873 status = handle_stream_beginning_msg(
874 fs_sink, msg);
875 break;
876 case BT_MESSAGE_TYPE_STREAM_END:
877 status = handle_stream_end_msg(
878 fs_sink, msg);
879 break;
880 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
881 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
882 /* Not supported by CTF 1.8 */
883 BT_LOGD_STR("Ignoring stream activity message.");
884 break;
885 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
886 status = handle_discarded_events_msg(
887 fs_sink, msg);
888 break;
889 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
890 status = handle_discarded_packets_msg(
891 fs_sink, msg);
892 break;
893 default:
894 abort();
895 }
896
897 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
898
899 if (status != BT_SELF_COMPONENT_STATUS_OK) {
900 BT_LOGE("Failed to handle message: "
901 "generated CTF traces could be incomplete: "
902 "output-dir-path=\"%s\"",
903 fs_sink->output_dir_path->str);
904 goto error;
905 }
906 }
907
908 break;
909 }
910 case BT_MESSAGE_ITERATOR_STATUS_AGAIN:
911 status = BT_SELF_COMPONENT_STATUS_AGAIN;
912 break;
913 case BT_MESSAGE_ITERATOR_STATUS_END:
914 /* TODO: Finalize all traces (should already be done?) */
915 status = BT_SELF_COMPONENT_STATUS_END;
916 break;
917 case BT_MESSAGE_ITERATOR_STATUS_NOMEM:
918 status = BT_SELF_COMPONENT_STATUS_NOMEM;
919 break;
920 case BT_MESSAGE_ITERATOR_STATUS_ERROR:
921 status = BT_SELF_COMPONENT_STATUS_NOMEM;
922 break;
923 default:
924 break;
925 }
926
927 goto end;
928
929error:
930 BT_ASSERT(status != BT_SELF_COMPONENT_STATUS_OK);
931 put_messages(msgs, msg_count);
932
933end:
934 return status;
935}
936
937BT_HIDDEN
938bt_self_component_status ctf_fs_sink_graph_is_configured(
939 bt_self_component_sink *self_comp)
940{
941 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
942 struct fs_sink_comp *fs_sink = bt_self_component_get_data(
943 bt_self_component_sink_as_self_component(self_comp));
944
945 fs_sink->upstream_iter =
946 bt_self_component_port_input_message_iterator_create(
947 bt_self_component_sink_borrow_input_port_by_name(
948 self_comp, in_port_name));
949 if (!fs_sink->upstream_iter) {
950 status = BT_SELF_COMPONENT_STATUS_NOMEM;
951 goto end;
952 }
953
954end:
955 return status;
956}
957
958BT_HIDDEN
959void ctf_fs_sink_finalize(bt_self_component_sink *self_comp)
960{
961 struct fs_sink_comp *fs_sink = bt_self_component_get_data(
962 bt_self_component_sink_as_self_component(self_comp));
963
964 destroy_fs_sink_comp(fs_sink);
965}
This page took 0.06257 seconds and 4 git commands to generate.