lib: create_packet_message(): make assertion message less convoluted
[babeltrace.git] / plugins / ctf / fs-sink / fs-sink.c
CommitLineData
15fe47e0
PP
1/*
2 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 * SOFTWARE.
21 */
22
23#define BT_LOG_TAG "PLUGIN-CTF-FS-SINK"
24#include "logging.h"
25
26#include <babeltrace/babeltrace.h>
27#include <stdio.h>
28#include <stdbool.h>
29#include <glib.h>
30#include <babeltrace/assert-internal.h>
31#include <babeltrace/ctfser-internal.h>
32
33#include "fs-sink.h"
34#include "fs-sink-trace.h"
35#include "fs-sink-stream.h"
36#include "fs-sink-ctf-meta.h"
37#include "translate-trace-ir-to-ctf-ir.h"
38#include "translate-ctf-ir-to-tsdl.h"
39
40static
41const char * const in_port_name = "in";
42
43static
44bt_self_component_status ensure_output_dir_exists(
45 struct fs_sink_comp *fs_sink)
46{
47 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
48 int ret;
49
50 ret = g_mkdir_with_parents(fs_sink->output_dir_path->str, 0755);
51 if (ret) {
52 BT_LOGE_ERRNO("Cannot create directories for output directory",
53 ": output-dir-path=\"%s\"",
54 fs_sink->output_dir_path->str);
55 status = BT_SELF_COMPONENT_STATUS_ERROR;
56 goto end;
57 }
58
59end:
60 return status;
61}
62
63static
64bt_self_component_status configure_component(struct fs_sink_comp *fs_sink,
65 const bt_value *params)
66{
67 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
68 const bt_value *value;
69
70 value = bt_value_map_borrow_entry_value_const(params, "path");
71 if (!value) {
72 BT_LOGE_STR("Missing mandatory `path` parameter.");
73 status = BT_SELF_COMPONENT_STATUS_ERROR;
74 goto end;
75 }
76
77 if (!bt_value_is_string(value)) {
78 BT_LOGE_STR("`path` parameter: expecting a string.");
79 status = BT_SELF_COMPONENT_STATUS_ERROR;
80 goto end;
81 }
82
83 g_string_assign(fs_sink->output_dir_path,
84 bt_value_string_get(value));
85 value = bt_value_map_borrow_entry_value_const(params,
86 "assume-single-trace");
87 if (value) {
88 if (!bt_value_is_bool(value)) {
89 BT_LOGE_STR("`assume-single-trace` parameter: expecting a boolean.");
90 status = BT_SELF_COMPONENT_STATUS_ERROR;
91 goto end;
92 }
93
94 fs_sink->assume_single_trace = (bool) bt_value_bool_get(value);
95 }
96
97 value = bt_value_map_borrow_entry_value_const(params,
98 "ignore-discarded-events");
99 if (value) {
100 if (!bt_value_is_bool(value)) {
101 BT_LOGE_STR("`ignore-discarded-events` parameter: expecting a boolean.");
102 status = BT_SELF_COMPONENT_STATUS_ERROR;
103 goto end;
104 }
105
106 fs_sink->ignore_discarded_events =
107 (bool) bt_value_bool_get(value);
108 }
109
110 value = bt_value_map_borrow_entry_value_const(params,
111 "ignore-discarded-packets");
112 if (value) {
113 if (!bt_value_is_bool(value)) {
114 BT_LOGE_STR("`ignore-discarded-packets` parameter: expecting a boolean.");
115 status = BT_SELF_COMPONENT_STATUS_ERROR;
116 goto end;
117 }
118
119 fs_sink->ignore_discarded_packets =
120 (bool) bt_value_bool_get(value);
121 }
122
123 value = bt_value_map_borrow_entry_value_const(params,
124 "quiet");
125 if (value) {
126 if (!bt_value_is_bool(value)) {
127 BT_LOGE_STR("`quiet` parameter: expecting a boolean.");
128 status = BT_SELF_COMPONENT_STATUS_ERROR;
129 goto end;
130 }
131
132 fs_sink->quiet = (bool) bt_value_bool_get(value);
133 }
134
135end:
136 return status;
137}
138
139static
140void destroy_fs_sink_comp(struct fs_sink_comp *fs_sink)
141{
142 if (!fs_sink) {
143 goto end;
144 }
145
146 if (fs_sink->output_dir_path) {
147 g_string_free(fs_sink->output_dir_path, TRUE);
148 fs_sink->output_dir_path = NULL;
149 }
150
151 if (fs_sink->traces) {
152 g_hash_table_destroy(fs_sink->traces);
153 fs_sink->traces = NULL;
154 }
155
156 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(
157 fs_sink->upstream_iter);
158 g_free(fs_sink);
159
160end:
161 return;
162}
163
164BT_HIDDEN
165bt_self_component_status ctf_fs_sink_init(
166 bt_self_component_sink *self_comp, const bt_value *params,
167 void *init_method_data)
168{
169 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
170 struct fs_sink_comp *fs_sink = NULL;
171
172 fs_sink = g_new0(struct fs_sink_comp, 1);
173 if (!fs_sink) {
174 BT_LOGE_STR("Failed to allocate one CTF FS sink structure.");
175 status = BT_SELF_COMPONENT_STATUS_NOMEM;
176 goto end;
177 }
178
179 fs_sink->output_dir_path = g_string_new(NULL);
180 fs_sink->self_comp = self_comp;
181 status = configure_component(fs_sink, params);
182 if (status != BT_SELF_COMPONENT_STATUS_OK) {
183 /* configure_component() logs errors */
184 goto end;
185 }
186
187 if (fs_sink->assume_single_trace &&
188 g_file_test(fs_sink->output_dir_path->str,
189 G_FILE_TEST_EXISTS)) {
190 BT_LOGE("Single trace mode, but output path exists: "
191 "output-path=\"%s\"", fs_sink->output_dir_path->str);
192 status = BT_SELF_COMPONENT_STATUS_ERROR;
193 goto end;
194 }
195
196 status = ensure_output_dir_exists(fs_sink);
197 if (status != BT_SELF_COMPONENT_STATUS_OK) {
198 /* ensure_output_dir_exists() logs errors */
199 goto end;
200 }
201
202 fs_sink->traces = g_hash_table_new_full(g_direct_hash, g_direct_equal,
203 NULL, (GDestroyNotify) fs_sink_trace_destroy);
204 if (!fs_sink->traces) {
205 BT_LOGE_STR("Failed to allocate one GHashTable.");
206 status = BT_SELF_COMPONENT_STATUS_NOMEM;
207 goto end;
208 }
209
210 status = bt_self_component_sink_add_input_port(self_comp, in_port_name,
211 NULL, NULL);
212 if (status != BT_SELF_COMPONENT_STATUS_OK) {
213 goto end;
214 }
215
216 bt_self_component_set_data(
217 bt_self_component_sink_as_self_component(self_comp), fs_sink);
218
219end:
220 if (status != BT_SELF_COMPONENT_STATUS_OK) {
221 destroy_fs_sink_comp(fs_sink);
222 }
223
224 return status;
225}
226
227static inline
228struct fs_sink_stream *borrow_stream(struct fs_sink_comp *fs_sink,
229 const bt_stream *ir_stream)
230{
231 const bt_trace *ir_trace = bt_stream_borrow_trace_const(ir_stream);
232 struct fs_sink_trace *trace;
233 struct fs_sink_stream *stream = NULL;
234
235 trace = g_hash_table_lookup(fs_sink->traces, ir_trace);
236 if (unlikely(!trace)) {
237 if (fs_sink->assume_single_trace &&
238 g_hash_table_size(fs_sink->traces) > 0) {
239 BT_LOGE("Single trace mode, but getting more than one trace: "
240 "stream-name=\"%s\"",
241 bt_stream_get_name(ir_stream));
242 goto end;
243 }
244
245 trace = fs_sink_trace_create(fs_sink, ir_trace);
246 if (!trace) {
247 goto end;
248 }
249 }
250
251 stream = g_hash_table_lookup(trace->streams, ir_stream);
252 if (unlikely(!stream)) {
253 stream = fs_sink_stream_create(trace, ir_stream);
254 if (!stream) {
255 goto end;
256 }
257 }
258
259end:
260 return stream;
261}
262
263static inline
264bt_self_component_status handle_event_msg(struct fs_sink_comp *fs_sink,
265 const bt_message *msg)
266{
267 int ret;
268 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
269 const bt_event *ir_event = bt_message_event_borrow_event_const(msg);
270 const bt_stream *ir_stream = bt_event_borrow_stream_const(ir_event);
271 struct fs_sink_stream *stream;
272 struct fs_sink_ctf_event_class *ec = NULL;
273 const bt_clock_snapshot *cs = NULL;
274
275 stream = borrow_stream(fs_sink, ir_stream);
276 if (unlikely(!stream)) {
277 status = BT_SELF_COMPONENT_STATUS_ERROR;
278 goto end;
279 }
280
281 ret = try_translate_event_class_trace_ir_to_ctf_ir(stream->sc,
282 bt_event_borrow_class_const(ir_event), &ec);
283 if (ret) {
284 status = BT_SELF_COMPONENT_STATUS_ERROR;
285 goto end;
286 }
287
288 BT_ASSERT(ec);
289
290 if (stream->sc->default_clock_class) {
0cbc2c33
PP
291 cs = bt_message_event_borrow_default_clock_snapshot_const(
292 msg);
15fe47e0
PP
293 }
294
295 ret = fs_sink_stream_write_event(stream, cs, ir_event, ec);
296 if (unlikely(ret)) {
297 status = BT_SELF_COMPONENT_STATUS_ERROR;
298 goto end;
299 }
300
301end:
302 return status;
303}
304
305static inline
306bt_self_component_status handle_packet_beginning_msg(
307 struct fs_sink_comp *fs_sink, const bt_message *msg)
308{
309 int ret;
310 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
311 const bt_packet *ir_packet =
312 bt_message_packet_beginning_borrow_packet_const(msg);
313 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
314 struct fs_sink_stream *stream;
315 const bt_clock_snapshot *cs = NULL;
316
317 stream = borrow_stream(fs_sink, ir_stream);
318 if (unlikely(!stream)) {
319 status = BT_SELF_COMPONENT_STATUS_ERROR;
320 goto end;
321 }
322
323 if (stream->sc->default_clock_class) {
0cbc2c33
PP
324 cs = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
325 msg);
15fe47e0
PP
326 BT_ASSERT(cs);
327 }
328
329 if (stream->discarded_events_state.in_range) {
330 /*
331 * Make sure that the current discarded events range's
332 * beginning time matches what's expected for CTF 1.8.
333 */
334 if (stream->sc->default_clock_class) {
335 uint64_t expected_cs;
336
337 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
338 /* We're opening the first packet */
339 expected_cs = bt_clock_snapshot_get_value(cs);
340 } else {
341 expected_cs = stream->prev_packet_state.end_cs;
342 }
343
344 if (stream->discarded_events_state.beginning_cs !=
345 expected_cs) {
346 BT_LOGE("Incompatible discarded events message: "
347 "unexpected beginning time: "
348 "beginning-cs-val=%" PRIu64 ", "
349 "expected-beginning-cs-val=%" PRIu64 ", "
350 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
351 "trace-name=\"%s\", path=\"%s/%s\"",
352 stream->discarded_events_state.beginning_cs,
353 expected_cs,
354 bt_stream_get_id(ir_stream),
355 bt_stream_get_name(ir_stream),
356 bt_trace_get_name(
357 bt_stream_borrow_trace_const(ir_stream)),
358 stream->trace->path->str, stream->file_name->str);
359 status = BT_SELF_COMPONENT_STATUS_ERROR;
360 goto end;
361 }
362 }
363 }
364
365
366 if (stream->discarded_packets_state.in_range) {
367 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
368 BT_LOGE("Incompatible discarded packets message "
369 "occuring before the stream's first packet: "
370 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
371 "trace-name=\"%s\", path=\"%s/%s\"",
372 bt_stream_get_id(ir_stream),
373 bt_stream_get_name(ir_stream),
374 bt_trace_get_name(
375 bt_stream_borrow_trace_const(ir_stream)),
376 stream->trace->path->str, stream->file_name->str);
377 status = BT_SELF_COMPONENT_STATUS_ERROR;
378 goto end;
379 }
380
381 /*
382 * Make sure that the current discarded packets range's
383 * beginning and end times match what's expected for CTF
384 * 1.8.
385 */
386 if (stream->sc->default_clock_class) {
387 uint64_t expected_end_cs =
388 bt_clock_snapshot_get_value(cs);
389
390 if (stream->discarded_packets_state.beginning_cs !=
391 stream->prev_packet_state.end_cs) {
392 BT_LOGE("Incompatible discarded packets message: "
393 "unexpected beginning time: "
394 "beginning-cs-val=%" PRIu64 ", "
395 "expected-beginning-cs-val=%" PRIu64 ", "
396 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
397 "trace-name=\"%s\", path=\"%s/%s\"",
398 stream->discarded_packets_state.beginning_cs,
399 stream->prev_packet_state.end_cs,
400 bt_stream_get_id(ir_stream),
401 bt_stream_get_name(ir_stream),
402 bt_trace_get_name(
403 bt_stream_borrow_trace_const(ir_stream)),
404 stream->trace->path->str, stream->file_name->str);
405 status = BT_SELF_COMPONENT_STATUS_ERROR;
406 goto end;
407 }
408
409 if (stream->discarded_packets_state.end_cs !=
410 expected_end_cs) {
411 BT_LOGE("Incompatible discarded packets message: "
412 "unexpected end time: "
413 "end-cs-val=%" PRIu64 ", "
414 "expected-end-cs-val=%" PRIu64 ", "
415 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
416 "trace-name=\"%s\", path=\"%s/%s\"",
417 stream->discarded_packets_state.beginning_cs,
418 expected_end_cs,
419 bt_stream_get_id(ir_stream),
420 bt_stream_get_name(ir_stream),
421 bt_trace_get_name(
422 bt_stream_borrow_trace_const(ir_stream)),
423 stream->trace->path->str, stream->file_name->str);
424 status = BT_SELF_COMPONENT_STATUS_ERROR;
425 goto end;
426 }
427 }
428 }
429
430 stream->discarded_packets_state.in_range = false;
431 ret = fs_sink_stream_open_packet(stream, cs, ir_packet);
432 if (ret) {
433 status = BT_SELF_COMPONENT_STATUS_ERROR;
434 goto end;
435 }
436
437end:
438 return status;
439}
440
441static inline
442bt_self_component_status handle_packet_end_msg(
443 struct fs_sink_comp *fs_sink, const bt_message *msg)
444{
445 int ret;
446 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
447 const bt_packet *ir_packet =
448 bt_message_packet_end_borrow_packet_const(msg);
449 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
450 struct fs_sink_stream *stream;
451 const bt_clock_snapshot *cs = NULL;
452
453 stream = borrow_stream(fs_sink, ir_stream);
454 if (unlikely(!stream)) {
455 status = BT_SELF_COMPONENT_STATUS_ERROR;
456 goto end;
457 }
458
459 if (stream->sc->default_clock_class) {
0cbc2c33
PP
460 cs = bt_message_packet_end_borrow_default_clock_snapshot_const(
461 msg);
15fe47e0
PP
462 BT_ASSERT(cs);
463 }
464
465 if (stream->sc->default_clock_class) {
0cbc2c33
PP
466 cs = bt_message_packet_end_borrow_default_clock_snapshot_const(
467 msg);
15fe47e0
PP
468 BT_ASSERT(cs);
469 }
470
471 if (stream->discarded_events_state.in_range) {
472 /*
473 * Make sure that the current discarded events range's
474 * end time matches what's expected for CTF 1.8.
475 */
476 if (stream->sc->default_clock_class) {
477 uint64_t expected_cs = bt_clock_snapshot_get_value(cs);
478
479 if (stream->discarded_events_state.end_cs !=
480 expected_cs) {
481 BT_LOGE("Incompatible discarded events message: "
482 "unexpected end time: "
483 "end-cs-val=%" PRIu64 ", "
484 "expected-end-cs-val=%" PRIu64 ", "
485 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
486 "trace-name=\"%s\", path=\"%s/%s\"",
487 stream->discarded_events_state.end_cs,
488 expected_cs,
489 bt_stream_get_id(ir_stream),
490 bt_stream_get_name(ir_stream),
491 bt_trace_get_name(
492 bt_stream_borrow_trace_const(ir_stream)),
493 stream->trace->path->str, stream->file_name->str);
494 status = BT_SELF_COMPONENT_STATUS_ERROR;
495 goto end;
496 }
497 }
498 }
499
500 ret = fs_sink_stream_close_packet(stream, cs);
501 if (ret) {
502 status = BT_SELF_COMPONENT_STATUS_ERROR;
503 goto end;
504 }
505
506 stream->discarded_events_state.in_range = false;
507
508end:
509 return status;
510}
511
512static inline
513bt_self_component_status handle_stream_beginning_msg(
514 struct fs_sink_comp *fs_sink, const bt_message *msg)
515{
516 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
517 const bt_stream *ir_stream =
518 bt_message_stream_beginning_borrow_stream_const(msg);
649934d2
PP
519 const bt_stream_class *ir_sc =
520 bt_stream_borrow_class_const(ir_stream);
15fe47e0
PP
521 struct fs_sink_stream *stream;
522
649934d2
PP
523 /*
524 * Temporary: if the stream's class has a default clock class,
525 * make sure packet beginning and end messages have default
526 * clock snapshots until the support for not having them is
527 * implemented.
528 */
529 if (bt_stream_class_borrow_default_clock_class_const(ir_sc)) {
530 if (!bt_stream_class_packets_have_default_beginning_clock_snapshot(
531 ir_sc)) {
532 BT_LOGE("Unsupported stream: packets have "
533 "no beginning clock snapshot: "
534 "stream-addr=%p, "
535 "stream-id=%" PRIu64 ", "
536 "stream-name=\"%s\"",
537 ir_stream, bt_stream_get_id(ir_stream),
538 bt_stream_get_name(ir_stream));
539 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
540 goto end;
541 }
542
543 if (!bt_stream_class_packets_have_default_end_clock_snapshot(
544 ir_sc)) {
545 BT_LOGE("Unsupported stream: packets have "
546 "no end clock snapshot: "
547 "stream-addr=%p, "
548 "stream-id=%" PRIu64 ", "
549 "stream-name=\"%s\"",
550 ir_stream, bt_stream_get_id(ir_stream),
551 bt_stream_get_name(ir_stream));
552 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
553 goto end;
554 }
555 }
556
15fe47e0
PP
557 stream = borrow_stream(fs_sink, ir_stream);
558 if (!stream) {
559 status = BT_SELF_COMPONENT_STATUS_ERROR;
560 goto end;
561 }
562
563 BT_LOGI("Created new, empty stream file: "
564 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
565 "trace-name=\"%s\", path=\"%s/%s\"",
566 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
567 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
568 stream->trace->path->str, stream->file_name->str);
569
570end:
571 return status;
572}
573
574static inline
575bt_self_component_status handle_stream_end_msg(struct fs_sink_comp *fs_sink,
576 const bt_message *msg)
577{
578 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
579 const bt_stream *ir_stream =
580 bt_message_stream_end_borrow_stream_const(msg);
581 struct fs_sink_stream *stream;
582
583 stream = borrow_stream(fs_sink, ir_stream);
584 if (!stream) {
585 status = BT_SELF_COMPONENT_STATUS_ERROR;
586 goto end;
587 }
588
589 BT_LOGI("Closing stream file: "
590 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
591 "trace-name=\"%s\", path=\"%s/%s\"",
592 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
593 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
594 stream->trace->path->str, stream->file_name->str);
595
596 /*
597 * This destroys the stream object and frees all its resources,
598 * closing the stream file.
599 */
600 g_hash_table_remove(stream->trace->streams, ir_stream);
601
602end:
603 return status;
604}
605
606static inline
607bt_self_component_status handle_discarded_events_msg(
608 struct fs_sink_comp *fs_sink, const bt_message *msg)
609{
610 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
611 const bt_stream *ir_stream =
612 bt_message_discarded_events_borrow_stream_const(msg);
613 struct fs_sink_stream *stream;
614 const bt_clock_snapshot *cs = NULL;
615 bt_property_availability avail;
616 uint64_t count;
617
618 stream = borrow_stream(fs_sink, ir_stream);
619 if (!stream) {
620 status = BT_SELF_COMPONENT_STATUS_ERROR;
621 goto end;
622 }
623
624 if (fs_sink->ignore_discarded_events) {
625 BT_LOGI("Ignoring discarded events message: "
626 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
627 "trace-name=\"%s\", path=\"%s/%s\"",
628 bt_stream_get_id(ir_stream),
629 bt_stream_get_name(ir_stream),
630 bt_trace_get_name(
631 bt_stream_borrow_trace_const(ir_stream)),
632 stream->trace->path->str, stream->file_name->str);
633 goto end;
634 }
635
636 if (stream->discarded_events_state.in_range) {
637 BT_LOGE("Unsupported contiguous discarded events message: "
638 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
639 "trace-name=\"%s\", path=\"%s/%s\"",
640 bt_stream_get_id(ir_stream),
641 bt_stream_get_name(ir_stream),
642 bt_trace_get_name(
643 bt_stream_borrow_trace_const(ir_stream)),
644 stream->trace->path->str, stream->file_name->str);
645 status = BT_SELF_COMPONENT_STATUS_ERROR;
646 goto end;
647 }
648
649 if (stream->packet_state.is_open) {
650 BT_LOGE("Unsupported discarded events message occuring "
651 "within a packet: "
652 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
653 "trace-name=\"%s\", path=\"%s/%s\"",
654 bt_stream_get_id(ir_stream),
655 bt_stream_get_name(ir_stream),
656 bt_trace_get_name(
657 bt_stream_borrow_trace_const(ir_stream)),
658 stream->trace->path->str, stream->file_name->str);
659 status = BT_SELF_COMPONENT_STATUS_ERROR;
660 goto end;
661 }
662
663 stream->discarded_events_state.in_range = true;
664
665 if (stream->sc->default_clock_class) {
666 /*
667 * The clock snapshot values will be validated when
668 * handling the next "packet beginning" message.
669 */
0cbc2c33
PP
670 cs = bt_message_discarded_events_borrow_default_beginning_clock_snapshot_const(
671 msg);
15fe47e0
PP
672 BT_ASSERT(cs);
673 stream->discarded_events_state.beginning_cs =
674 bt_clock_snapshot_get_value(cs);
0cbc2c33
PP
675 cs = bt_message_discarded_events_borrow_default_end_clock_snapshot_const(
676 msg);
15fe47e0
PP
677 BT_ASSERT(cs);
678 stream->discarded_events_state.end_cs =
679 bt_clock_snapshot_get_value(cs);
680 } else {
681 stream->discarded_events_state.beginning_cs = UINT64_C(-1);
682 stream->discarded_events_state.end_cs = UINT64_C(-1);
683 }
684
685 avail = bt_message_discarded_events_get_count(msg, &count);
686 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
687 count = 1;
688 }
689
690 stream->packet_state.discarded_events_counter += count;
691
692end:
693 return status;
694}
695
696static inline
697bt_self_component_status handle_discarded_packets_msg(
698 struct fs_sink_comp *fs_sink, const bt_message *msg)
699{
700 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
701 const bt_stream *ir_stream =
702 bt_message_discarded_packets_borrow_stream_const(msg);
703 struct fs_sink_stream *stream;
704 const bt_clock_snapshot *cs = NULL;
705 bt_property_availability avail;
706 uint64_t count;
707
708 stream = borrow_stream(fs_sink, ir_stream);
709 if (!stream) {
710 status = BT_SELF_COMPONENT_STATUS_ERROR;
711 goto end;
712 }
713
714 if (fs_sink->ignore_discarded_packets) {
715 BT_LOGI("Ignoring discarded packets message: "
716 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
717 "trace-name=\"%s\", path=\"%s/%s\"",
718 bt_stream_get_id(ir_stream),
719 bt_stream_get_name(ir_stream),
720 bt_trace_get_name(
721 bt_stream_borrow_trace_const(ir_stream)),
722 stream->trace->path->str, stream->file_name->str);
723 goto end;
724 }
725
726 if (stream->discarded_packets_state.in_range) {
727 BT_LOGE("Unsupported contiguous discarded packets message: "
728 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
729 "trace-name=\"%s\", path=\"%s/%s\"",
730 bt_stream_get_id(ir_stream),
731 bt_stream_get_name(ir_stream),
732 bt_trace_get_name(
733 bt_stream_borrow_trace_const(ir_stream)),
734 stream->trace->path->str, stream->file_name->str);
735 status = BT_SELF_COMPONENT_STATUS_ERROR;
736 goto end;
737 }
738
739 if (stream->packet_state.is_open) {
740 BT_LOGE("Unsupported discarded packets message occuring "
741 "within a packet: "
742 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
743 "trace-name=\"%s\", path=\"%s/%s\"",
744 bt_stream_get_id(ir_stream),
745 bt_stream_get_name(ir_stream),
746 bt_trace_get_name(
747 bt_stream_borrow_trace_const(ir_stream)),
748 stream->trace->path->str, stream->file_name->str);
749 status = BT_SELF_COMPONENT_STATUS_ERROR;
750 goto end;
751 }
752
753 stream->discarded_packets_state.in_range = true;
754
755 if (stream->sc->default_clock_class) {
756 /*
757 * The clock snapshot values will be validated when
758 * handling the next "packet beginning" message.
759 */
0cbc2c33
PP
760 cs = bt_message_discarded_packets_borrow_default_beginning_clock_snapshot_const(
761 msg);
15fe47e0
PP
762 BT_ASSERT(cs);
763 stream->discarded_packets_state.beginning_cs =
764 bt_clock_snapshot_get_value(cs);
0cbc2c33
PP
765 cs = bt_message_discarded_packets_borrow_default_end_clock_snapshot_const(
766 msg);
15fe47e0
PP
767 BT_ASSERT(cs);
768 stream->discarded_packets_state.end_cs =
769 bt_clock_snapshot_get_value(cs);
770 } else {
771 stream->discarded_packets_state.beginning_cs = UINT64_C(-1);
772 stream->discarded_packets_state.end_cs = UINT64_C(-1);
773 }
774
775 avail = bt_message_discarded_packets_get_count(msg, &count);
776 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
777 count = 1;
778 }
779
780 stream->packet_state.seq_num += count;
781
782end:
783 return status;
784}
785
786static inline
787void put_messages(bt_message_array_const msgs, uint64_t count)
788{
789 uint64_t i;
790
791 for (i = 0; i < count; i++) {
792 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
793 }
794}
795
796BT_HIDDEN
797bt_self_component_status ctf_fs_sink_consume(bt_self_component_sink *self_comp)
798{
799 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
800 struct fs_sink_comp *fs_sink;
801 bt_message_iterator_status it_status;
802 uint64_t msg_count = 0;
803 bt_message_array_const msgs;
804
805 fs_sink = bt_self_component_get_data(
806 bt_self_component_sink_as_self_component(self_comp));
807 BT_ASSERT(fs_sink);
808 BT_ASSERT(fs_sink->upstream_iter);
809
810 /* Consume messages */
811 it_status = bt_self_component_port_input_message_iterator_next(
812 fs_sink->upstream_iter, &msgs, &msg_count);
813 if (it_status < 0) {
814 status = BT_SELF_COMPONENT_STATUS_ERROR;
815 goto end;
816 }
817
818 switch (it_status) {
819 case BT_MESSAGE_ITERATOR_STATUS_OK:
820 {
821 uint64_t i;
822
823 for (i = 0; i < msg_count; i++) {
824 const bt_message *msg = msgs[i];
825
826 BT_ASSERT(msg);
827
828 switch (bt_message_get_type(msg)) {
829 case BT_MESSAGE_TYPE_EVENT:
830 status = handle_event_msg(fs_sink, msg);
831 break;
832 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
833 status = handle_packet_beginning_msg(
834 fs_sink, msg);
835 break;
836 case BT_MESSAGE_TYPE_PACKET_END:
837 status = handle_packet_end_msg(
838 fs_sink, msg);
839 break;
840 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
841 /* Ignore */
842 BT_LOGD_STR("Ignoring message iterator inactivity message.");
843 break;
844 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
845 status = handle_stream_beginning_msg(
846 fs_sink, msg);
847 break;
848 case BT_MESSAGE_TYPE_STREAM_END:
849 status = handle_stream_end_msg(
850 fs_sink, msg);
851 break;
852 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
853 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
854 /* Not supported by CTF 1.8 */
855 BT_LOGD_STR("Ignoring stream activity message.");
856 break;
857 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
858 status = handle_discarded_events_msg(
859 fs_sink, msg);
860 break;
861 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
862 status = handle_discarded_packets_msg(
863 fs_sink, msg);
864 break;
865 default:
866 abort();
867 }
868
869 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
870
871 if (status != BT_SELF_COMPONENT_STATUS_OK) {
872 BT_LOGE("Failed to handle message: "
873 "generated CTF traces could be incomplete: "
874 "output-dir-path=\"%s\"",
875 fs_sink->output_dir_path->str);
876 goto error;
877 }
878 }
879
880 break;
881 }
882 case BT_MESSAGE_ITERATOR_STATUS_AGAIN:
883 status = BT_SELF_COMPONENT_STATUS_AGAIN;
884 break;
885 case BT_MESSAGE_ITERATOR_STATUS_END:
886 /* TODO: Finalize all traces (should already be done?) */
887 status = BT_SELF_COMPONENT_STATUS_END;
888 break;
889 case BT_MESSAGE_ITERATOR_STATUS_NOMEM:
890 status = BT_SELF_COMPONENT_STATUS_NOMEM;
891 break;
892 case BT_MESSAGE_ITERATOR_STATUS_ERROR:
893 status = BT_SELF_COMPONENT_STATUS_NOMEM;
894 break;
895 default:
896 break;
897 }
898
899 goto end;
900
901error:
902 BT_ASSERT(status != BT_SELF_COMPONENT_STATUS_OK);
903 put_messages(msgs, msg_count);
904
905end:
906 return status;
907}
908
909BT_HIDDEN
910bt_self_component_status ctf_fs_sink_graph_is_configured(
911 bt_self_component_sink *self_comp)
912{
913 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
914 struct fs_sink_comp *fs_sink = bt_self_component_get_data(
915 bt_self_component_sink_as_self_component(self_comp));
916
917 fs_sink->upstream_iter =
918 bt_self_component_port_input_message_iterator_create(
919 bt_self_component_sink_borrow_input_port_by_name(
920 self_comp, in_port_name));
921 if (!fs_sink->upstream_iter) {
922 status = BT_SELF_COMPONENT_STATUS_NOMEM;
923 goto end;
924 }
925
926end:
927 return status;
928}
929
930BT_HIDDEN
931void ctf_fs_sink_finalize(bt_self_component_sink *self_comp)
932{
933 struct fs_sink_comp *fs_sink = bt_self_component_get_data(
934 bt_self_component_sink_as_self_component(self_comp));
935
936 destroy_fs_sink_comp(fs_sink);
937}
This page took 0.060183 seconds and 4 git commands to generate.