Rename "default beginning/end CS" -> "beginning/end default CS"
[babeltrace.git] / plugins / ctf / fs-sink / fs-sink.c
CommitLineData
15fe47e0
PP
1/*
2 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 * SOFTWARE.
21 */
22
23#define BT_LOG_TAG "PLUGIN-CTF-FS-SINK"
24#include "logging.h"
25
26#include <babeltrace/babeltrace.h>
27#include <stdio.h>
28#include <stdbool.h>
29#include <glib.h>
30#include <babeltrace/assert-internal.h>
31#include <babeltrace/ctfser-internal.h>
32
33#include "fs-sink.h"
34#include "fs-sink-trace.h"
35#include "fs-sink-stream.h"
36#include "fs-sink-ctf-meta.h"
37#include "translate-trace-ir-to-ctf-ir.h"
38#include "translate-ctf-ir-to-tsdl.h"
39
40static
41const char * const in_port_name = "in";
42
43static
44bt_self_component_status ensure_output_dir_exists(
45 struct fs_sink_comp *fs_sink)
46{
47 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
48 int ret;
49
50 ret = g_mkdir_with_parents(fs_sink->output_dir_path->str, 0755);
51 if (ret) {
52 BT_LOGE_ERRNO("Cannot create directories for output directory",
53 ": output-dir-path=\"%s\"",
54 fs_sink->output_dir_path->str);
55 status = BT_SELF_COMPONENT_STATUS_ERROR;
56 goto end;
57 }
58
59end:
60 return status;
61}
62
63static
64bt_self_component_status configure_component(struct fs_sink_comp *fs_sink,
65 const bt_value *params)
66{
67 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
68 const bt_value *value;
69
70 value = bt_value_map_borrow_entry_value_const(params, "path");
71 if (!value) {
72 BT_LOGE_STR("Missing mandatory `path` parameter.");
73 status = BT_SELF_COMPONENT_STATUS_ERROR;
74 goto end;
75 }
76
77 if (!bt_value_is_string(value)) {
78 BT_LOGE_STR("`path` parameter: expecting a string.");
79 status = BT_SELF_COMPONENT_STATUS_ERROR;
80 goto end;
81 }
82
83 g_string_assign(fs_sink->output_dir_path,
84 bt_value_string_get(value));
85 value = bt_value_map_borrow_entry_value_const(params,
86 "assume-single-trace");
87 if (value) {
88 if (!bt_value_is_bool(value)) {
89 BT_LOGE_STR("`assume-single-trace` parameter: expecting a boolean.");
90 status = BT_SELF_COMPONENT_STATUS_ERROR;
91 goto end;
92 }
93
94 fs_sink->assume_single_trace = (bool) bt_value_bool_get(value);
95 }
96
97 value = bt_value_map_borrow_entry_value_const(params,
98 "ignore-discarded-events");
99 if (value) {
100 if (!bt_value_is_bool(value)) {
101 BT_LOGE_STR("`ignore-discarded-events` parameter: expecting a boolean.");
102 status = BT_SELF_COMPONENT_STATUS_ERROR;
103 goto end;
104 }
105
106 fs_sink->ignore_discarded_events =
107 (bool) bt_value_bool_get(value);
108 }
109
110 value = bt_value_map_borrow_entry_value_const(params,
111 "ignore-discarded-packets");
112 if (value) {
113 if (!bt_value_is_bool(value)) {
114 BT_LOGE_STR("`ignore-discarded-packets` parameter: expecting a boolean.");
115 status = BT_SELF_COMPONENT_STATUS_ERROR;
116 goto end;
117 }
118
119 fs_sink->ignore_discarded_packets =
120 (bool) bt_value_bool_get(value);
121 }
122
123 value = bt_value_map_borrow_entry_value_const(params,
124 "quiet");
125 if (value) {
126 if (!bt_value_is_bool(value)) {
127 BT_LOGE_STR("`quiet` parameter: expecting a boolean.");
128 status = BT_SELF_COMPONENT_STATUS_ERROR;
129 goto end;
130 }
131
132 fs_sink->quiet = (bool) bt_value_bool_get(value);
133 }
134
135end:
136 return status;
137}
138
139static
140void destroy_fs_sink_comp(struct fs_sink_comp *fs_sink)
141{
142 if (!fs_sink) {
143 goto end;
144 }
145
146 if (fs_sink->output_dir_path) {
147 g_string_free(fs_sink->output_dir_path, TRUE);
148 fs_sink->output_dir_path = NULL;
149 }
150
151 if (fs_sink->traces) {
152 g_hash_table_destroy(fs_sink->traces);
153 fs_sink->traces = NULL;
154 }
155
156 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(
157 fs_sink->upstream_iter);
158 g_free(fs_sink);
159
160end:
161 return;
162}
163
164BT_HIDDEN
165bt_self_component_status ctf_fs_sink_init(
166 bt_self_component_sink *self_comp, const bt_value *params,
167 void *init_method_data)
168{
169 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
170 struct fs_sink_comp *fs_sink = NULL;
171
172 fs_sink = g_new0(struct fs_sink_comp, 1);
173 if (!fs_sink) {
174 BT_LOGE_STR("Failed to allocate one CTF FS sink structure.");
175 status = BT_SELF_COMPONENT_STATUS_NOMEM;
176 goto end;
177 }
178
179 fs_sink->output_dir_path = g_string_new(NULL);
180 fs_sink->self_comp = self_comp;
181 status = configure_component(fs_sink, params);
182 if (status != BT_SELF_COMPONENT_STATUS_OK) {
183 /* configure_component() logs errors */
184 goto end;
185 }
186
187 if (fs_sink->assume_single_trace &&
188 g_file_test(fs_sink->output_dir_path->str,
189 G_FILE_TEST_EXISTS)) {
190 BT_LOGE("Single trace mode, but output path exists: "
191 "output-path=\"%s\"", fs_sink->output_dir_path->str);
192 status = BT_SELF_COMPONENT_STATUS_ERROR;
193 goto end;
194 }
195
196 status = ensure_output_dir_exists(fs_sink);
197 if (status != BT_SELF_COMPONENT_STATUS_OK) {
198 /* ensure_output_dir_exists() logs errors */
199 goto end;
200 }
201
202 fs_sink->traces = g_hash_table_new_full(g_direct_hash, g_direct_equal,
203 NULL, (GDestroyNotify) fs_sink_trace_destroy);
204 if (!fs_sink->traces) {
205 BT_LOGE_STR("Failed to allocate one GHashTable.");
206 status = BT_SELF_COMPONENT_STATUS_NOMEM;
207 goto end;
208 }
209
210 status = bt_self_component_sink_add_input_port(self_comp, in_port_name,
211 NULL, NULL);
212 if (status != BT_SELF_COMPONENT_STATUS_OK) {
213 goto end;
214 }
215
216 bt_self_component_set_data(
217 bt_self_component_sink_as_self_component(self_comp), fs_sink);
218
219end:
220 if (status != BT_SELF_COMPONENT_STATUS_OK) {
221 destroy_fs_sink_comp(fs_sink);
222 }
223
224 return status;
225}
226
227static inline
228struct fs_sink_stream *borrow_stream(struct fs_sink_comp *fs_sink,
229 const bt_stream *ir_stream)
230{
231 const bt_trace *ir_trace = bt_stream_borrow_trace_const(ir_stream);
232 struct fs_sink_trace *trace;
233 struct fs_sink_stream *stream = NULL;
234
235 trace = g_hash_table_lookup(fs_sink->traces, ir_trace);
236 if (unlikely(!trace)) {
237 if (fs_sink->assume_single_trace &&
238 g_hash_table_size(fs_sink->traces) > 0) {
239 BT_LOGE("Single trace mode, but getting more than one trace: "
240 "stream-name=\"%s\"",
241 bt_stream_get_name(ir_stream));
242 goto end;
243 }
244
245 trace = fs_sink_trace_create(fs_sink, ir_trace);
246 if (!trace) {
247 goto end;
248 }
249 }
250
251 stream = g_hash_table_lookup(trace->streams, ir_stream);
252 if (unlikely(!stream)) {
253 stream = fs_sink_stream_create(trace, ir_stream);
254 if (!stream) {
255 goto end;
256 }
257 }
258
259end:
260 return stream;
261}
262
263static inline
264bt_self_component_status handle_event_msg(struct fs_sink_comp *fs_sink,
265 const bt_message *msg)
266{
267 int ret;
268 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
269 const bt_event *ir_event = bt_message_event_borrow_event_const(msg);
270 const bt_stream *ir_stream = bt_event_borrow_stream_const(ir_event);
271 struct fs_sink_stream *stream;
272 struct fs_sink_ctf_event_class *ec = NULL;
273 const bt_clock_snapshot *cs = NULL;
274
275 stream = borrow_stream(fs_sink, ir_stream);
276 if (unlikely(!stream)) {
277 status = BT_SELF_COMPONENT_STATUS_ERROR;
278 goto end;
279 }
280
281 ret = try_translate_event_class_trace_ir_to_ctf_ir(stream->sc,
282 bt_event_borrow_class_const(ir_event), &ec);
283 if (ret) {
284 status = BT_SELF_COMPONENT_STATUS_ERROR;
285 goto end;
286 }
287
288 BT_ASSERT(ec);
289
290 if (stream->sc->default_clock_class) {
0cbc2c33
PP
291 cs = bt_message_event_borrow_default_clock_snapshot_const(
292 msg);
15fe47e0
PP
293 }
294
295 ret = fs_sink_stream_write_event(stream, cs, ir_event, ec);
296 if (unlikely(ret)) {
297 status = BT_SELF_COMPONENT_STATUS_ERROR;
298 goto end;
299 }
300
301end:
302 return status;
303}
304
305static inline
306bt_self_component_status handle_packet_beginning_msg(
307 struct fs_sink_comp *fs_sink, const bt_message *msg)
308{
309 int ret;
310 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
311 const bt_packet *ir_packet =
312 bt_message_packet_beginning_borrow_packet_const(msg);
313 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
314 struct fs_sink_stream *stream;
315 const bt_clock_snapshot *cs = NULL;
316
317 stream = borrow_stream(fs_sink, ir_stream);
318 if (unlikely(!stream)) {
319 status = BT_SELF_COMPONENT_STATUS_ERROR;
320 goto end;
321 }
322
ffb5c13c 323 if (stream->sc->packets_have_ts_begin) {
0cbc2c33
PP
324 cs = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
325 msg);
15fe47e0
PP
326 BT_ASSERT(cs);
327 }
328
329 if (stream->discarded_events_state.in_range) {
ffb5c13c
PP
330 uint64_t expected_cs;
331
332 BT_ASSERT(stream->sc->discarded_events_has_ts);
333 BT_ASSERT(stream->sc->packets_have_ts_begin);
334 BT_ASSERT(stream->sc->packets_have_ts_end);
335
15fe47e0
PP
336 /*
337 * Make sure that the current discarded events range's
338 * beginning time matches what's expected for CTF 1.8.
339 */
ffb5c13c
PP
340 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
341 /* We're opening the first packet */
342 expected_cs = bt_clock_snapshot_get_value(cs);
343 } else {
344 expected_cs = stream->prev_packet_state.end_cs;
15fe47e0 345 }
15fe47e0 346
ffb5c13c
PP
347 if (stream->discarded_events_state.beginning_cs !=
348 expected_cs) {
349 BT_LOGE("Incompatible discarded events message: "
350 "unexpected beginning time: "
351 "beginning-cs-val=%" PRIu64 ", "
352 "expected-beginning-cs-val=%" PRIu64 ", "
15fe47e0
PP
353 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
354 "trace-name=\"%s\", path=\"%s/%s\"",
ffb5c13c
PP
355 stream->discarded_events_state.beginning_cs,
356 expected_cs,
15fe47e0
PP
357 bt_stream_get_id(ir_stream),
358 bt_stream_get_name(ir_stream),
359 bt_trace_get_name(
360 bt_stream_borrow_trace_const(ir_stream)),
361 stream->trace->path->str, stream->file_name->str);
362 status = BT_SELF_COMPONENT_STATUS_ERROR;
363 goto end;
364 }
ffb5c13c
PP
365 }
366
367 if (stream->discarded_packets_state.in_range) {
368 uint64_t expected_end_cs;
369
370 BT_ASSERT(stream->sc->discarded_packets_has_ts);
371 BT_ASSERT(stream->sc->packets_have_ts_begin);
372 BT_ASSERT(stream->sc->packets_have_ts_end);
15fe47e0
PP
373
374 /*
375 * Make sure that the current discarded packets range's
376 * beginning and end times match what's expected for CTF
377 * 1.8.
378 */
ffb5c13c
PP
379 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
380 BT_LOGE("Incompatible discarded packets message "
381 "occuring before the stream's first packet: "
382 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
383 "trace-name=\"%s\", path=\"%s/%s\"",
384 bt_stream_get_id(ir_stream),
385 bt_stream_get_name(ir_stream),
386 bt_trace_get_name(
387 bt_stream_borrow_trace_const(ir_stream)),
388 stream->trace->path->str, stream->file_name->str);
389 status = BT_SELF_COMPONENT_STATUS_ERROR;
390 goto end;
391 }
392
393 if (stream->discarded_packets_state.beginning_cs !=
394 stream->prev_packet_state.end_cs) {
395 BT_LOGE("Incompatible discarded packets message: "
396 "unexpected beginning time: "
397 "beginning-cs-val=%" PRIu64 ", "
398 "expected-beginning-cs-val=%" PRIu64 ", "
399 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
400 "trace-name=\"%s\", path=\"%s/%s\"",
401 stream->discarded_packets_state.beginning_cs,
402 stream->prev_packet_state.end_cs,
403 bt_stream_get_id(ir_stream),
404 bt_stream_get_name(ir_stream),
405 bt_trace_get_name(
406 bt_stream_borrow_trace_const(ir_stream)),
407 stream->trace->path->str, stream->file_name->str);
408 status = BT_SELF_COMPONENT_STATUS_ERROR;
409 goto end;
15fe47e0 410 }
ffb5c13c
PP
411
412 expected_end_cs = bt_clock_snapshot_get_value(cs);
413
414 if (stream->discarded_packets_state.end_cs !=
415 expected_end_cs) {
416 BT_LOGE("Incompatible discarded packets message: "
417 "unexpected end time: "
418 "end-cs-val=%" PRIu64 ", "
419 "expected-end-cs-val=%" PRIu64 ", "
420 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
421 "trace-name=\"%s\", path=\"%s/%s\"",
422 stream->discarded_packets_state.end_cs,
423 expected_end_cs,
424 bt_stream_get_id(ir_stream),
425 bt_stream_get_name(ir_stream),
426 bt_trace_get_name(
427 bt_stream_borrow_trace_const(ir_stream)),
428 stream->trace->path->str, stream->file_name->str);
429 status = BT_SELF_COMPONENT_STATUS_ERROR;
430 goto end;
431 }
432 }
433
434 if (stream->sc->discarded_packets_has_ts) {
435 stream->discarded_packets_state.in_range = false;
15fe47e0
PP
436 }
437
15fe47e0
PP
438 ret = fs_sink_stream_open_packet(stream, cs, ir_packet);
439 if (ret) {
440 status = BT_SELF_COMPONENT_STATUS_ERROR;
441 goto end;
442 }
443
444end:
445 return status;
446}
447
448static inline
449bt_self_component_status handle_packet_end_msg(
450 struct fs_sink_comp *fs_sink, const bt_message *msg)
451{
452 int ret;
453 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
454 const bt_packet *ir_packet =
455 bt_message_packet_end_borrow_packet_const(msg);
456 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
457 struct fs_sink_stream *stream;
458 const bt_clock_snapshot *cs = NULL;
459
460 stream = borrow_stream(fs_sink, ir_stream);
461 if (unlikely(!stream)) {
462 status = BT_SELF_COMPONENT_STATUS_ERROR;
463 goto end;
464 }
465
ffb5c13c 466 if (stream->sc->packets_have_ts_end) {
0cbc2c33
PP
467 cs = bt_message_packet_end_borrow_default_clock_snapshot_const(
468 msg);
15fe47e0
PP
469 BT_ASSERT(cs);
470 }
471
472 if (stream->discarded_events_state.in_range) {
ffb5c13c
PP
473 uint64_t expected_cs;
474
475 BT_ASSERT(stream->sc->discarded_events_has_ts);
476 BT_ASSERT(stream->sc->packets_have_ts_begin);
477 BT_ASSERT(stream->sc->packets_have_ts_end);
478
15fe47e0
PP
479 /*
480 * Make sure that the current discarded events range's
481 * end time matches what's expected for CTF 1.8.
482 */
ffb5c13c
PP
483 expected_cs = bt_clock_snapshot_get_value(cs);
484
485 if (stream->discarded_events_state.end_cs != expected_cs) {
486 BT_LOGE("Incompatible discarded events message: "
487 "unexpected end time: "
488 "end-cs-val=%" PRIu64 ", "
489 "expected-end-cs-val=%" PRIu64 ", "
490 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
491 "trace-name=\"%s\", path=\"%s/%s\"",
492 stream->discarded_events_state.end_cs,
493 expected_cs,
494 bt_stream_get_id(ir_stream),
495 bt_stream_get_name(ir_stream),
496 bt_trace_get_name(
497 bt_stream_borrow_trace_const(ir_stream)),
498 stream->trace->path->str, stream->file_name->str);
499 status = BT_SELF_COMPONENT_STATUS_ERROR;
500 goto end;
15fe47e0
PP
501 }
502 }
503
504 ret = fs_sink_stream_close_packet(stream, cs);
505 if (ret) {
506 status = BT_SELF_COMPONENT_STATUS_ERROR;
507 goto end;
508 }
509
ffb5c13c
PP
510 if (stream->sc->discarded_events_has_ts) {
511 stream->discarded_events_state.in_range = false;
512 }
15fe47e0
PP
513
514end:
515 return status;
516}
517
518static inline
519bt_self_component_status handle_stream_beginning_msg(
520 struct fs_sink_comp *fs_sink, const bt_message *msg)
521{
522 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
523 const bt_stream *ir_stream =
524 bt_message_stream_beginning_borrow_stream_const(msg);
649934d2
PP
525 const bt_stream_class *ir_sc =
526 bt_stream_borrow_class_const(ir_stream);
15fe47e0 527 struct fs_sink_stream *stream;
ffb5c13c 528 bool packets_have_beginning_end_cs =
9b24b6aa
PP
529 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc) &&
530 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc);
15fe47e0 531
649934d2 532 /*
ffb5c13c
PP
533 * Not supported: discarded events with default clock snapshots,
534 * but packet beginning/end without default clock snapshot.
649934d2 535 */
ffb5c13c
PP
536 if (!fs_sink->ignore_discarded_events &&
537 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc) &&
538 !packets_have_beginning_end_cs) {
539 BT_LOGE("Unsupported stream: discarded events have "
540 "default clock snapshots, but packets have no "
541 "beginning and/or end default clock snapshots: "
542 "stream-addr=%p, "
543 "stream-id=%" PRIu64 ", "
544 "stream-name=\"%s\"",
545 ir_stream, bt_stream_get_id(ir_stream),
546 bt_stream_get_name(ir_stream));
547 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
548 goto end;
549 }
2e90378a 550
ffb5c13c
PP
551 /*
552 * Not supported: discarded packets with default clock
553 * snapshots, but packet beginning/end without default clock
554 * snapshot.
555 */
556 if (!fs_sink->ignore_discarded_packets &&
557 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc) &&
558 !packets_have_beginning_end_cs) {
559 BT_LOGE("Unsupported stream: discarded packets have "
560 "default clock snapshots, but packets have no "
561 "beginning and/or end default clock snapshots: "
562 "stream-addr=%p, "
563 "stream-id=%" PRIu64 ", "
564 "stream-name=\"%s\"",
565 ir_stream, bt_stream_get_id(ir_stream),
566 bt_stream_get_name(ir_stream));
567 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
568 goto end;
649934d2
PP
569 }
570
15fe47e0
PP
571 stream = borrow_stream(fs_sink, ir_stream);
572 if (!stream) {
573 status = BT_SELF_COMPONENT_STATUS_ERROR;
574 goto end;
575 }
576
577 BT_LOGI("Created new, empty stream file: "
578 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
579 "trace-name=\"%s\", path=\"%s/%s\"",
580 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
581 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
582 stream->trace->path->str, stream->file_name->str);
583
584end:
585 return status;
586}
587
588static inline
589bt_self_component_status handle_stream_end_msg(struct fs_sink_comp *fs_sink,
590 const bt_message *msg)
591{
592 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
593 const bt_stream *ir_stream =
594 bt_message_stream_end_borrow_stream_const(msg);
595 struct fs_sink_stream *stream;
596
597 stream = borrow_stream(fs_sink, ir_stream);
598 if (!stream) {
599 status = BT_SELF_COMPONENT_STATUS_ERROR;
600 goto end;
601 }
602
603 BT_LOGI("Closing stream file: "
604 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
605 "trace-name=\"%s\", path=\"%s/%s\"",
606 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
607 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
608 stream->trace->path->str, stream->file_name->str);
609
610 /*
611 * This destroys the stream object and frees all its resources,
612 * closing the stream file.
613 */
614 g_hash_table_remove(stream->trace->streams, ir_stream);
615
616end:
617 return status;
618}
619
620static inline
621bt_self_component_status handle_discarded_events_msg(
622 struct fs_sink_comp *fs_sink, const bt_message *msg)
623{
624 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
625 const bt_stream *ir_stream =
626 bt_message_discarded_events_borrow_stream_const(msg);
627 struct fs_sink_stream *stream;
628 const bt_clock_snapshot *cs = NULL;
629 bt_property_availability avail;
630 uint64_t count;
631
632 stream = borrow_stream(fs_sink, ir_stream);
633 if (!stream) {
634 status = BT_SELF_COMPONENT_STATUS_ERROR;
635 goto end;
636 }
637
638 if (fs_sink->ignore_discarded_events) {
639 BT_LOGI("Ignoring discarded events message: "
640 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
641 "trace-name=\"%s\", path=\"%s/%s\"",
642 bt_stream_get_id(ir_stream),
643 bt_stream_get_name(ir_stream),
644 bt_trace_get_name(
645 bt_stream_borrow_trace_const(ir_stream)),
646 stream->trace->path->str, stream->file_name->str);
647 goto end;
648 }
649
650 if (stream->discarded_events_state.in_range) {
651 BT_LOGE("Unsupported contiguous discarded events message: "
652 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
653 "trace-name=\"%s\", path=\"%s/%s\"",
654 bt_stream_get_id(ir_stream),
655 bt_stream_get_name(ir_stream),
656 bt_trace_get_name(
657 bt_stream_borrow_trace_const(ir_stream)),
658 stream->trace->path->str, stream->file_name->str);
659 status = BT_SELF_COMPONENT_STATUS_ERROR;
660 goto end;
661 }
662
ffb5c13c
PP
663 if (stream->packet_state.is_open &&
664 stream->sc->discarded_events_has_ts) {
665 BT_LOGE("Unsupported discarded events message with "
666 "default clock snapshots occuring within a packet: "
15fe47e0
PP
667 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
668 "trace-name=\"%s\", path=\"%s/%s\"",
669 bt_stream_get_id(ir_stream),
670 bt_stream_get_name(ir_stream),
671 bt_trace_get_name(
672 bt_stream_borrow_trace_const(ir_stream)),
673 stream->trace->path->str, stream->file_name->str);
674 status = BT_SELF_COMPONENT_STATUS_ERROR;
675 goto end;
676 }
677
ffb5c13c
PP
678 if (stream->sc->discarded_events_has_ts) {
679 stream->discarded_events_state.in_range = true;
15fe47e0 680
15fe47e0
PP
681 /*
682 * The clock snapshot values will be validated when
683 * handling the next "packet beginning" message.
684 */
9b24b6aa 685 cs = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
0cbc2c33 686 msg);
15fe47e0
PP
687 BT_ASSERT(cs);
688 stream->discarded_events_state.beginning_cs =
689 bt_clock_snapshot_get_value(cs);
9b24b6aa 690 cs = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(
0cbc2c33 691 msg);
15fe47e0 692 BT_ASSERT(cs);
ffb5c13c 693 stream->discarded_events_state.end_cs = bt_clock_snapshot_get_value(cs);
15fe47e0
PP
694 }
695
696 avail = bt_message_discarded_events_get_count(msg, &count);
697 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
698 count = 1;
699 }
700
701 stream->packet_state.discarded_events_counter += count;
702
703end:
704 return status;
705}
706
707static inline
708bt_self_component_status handle_discarded_packets_msg(
709 struct fs_sink_comp *fs_sink, const bt_message *msg)
710{
711 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
712 const bt_stream *ir_stream =
713 bt_message_discarded_packets_borrow_stream_const(msg);
714 struct fs_sink_stream *stream;
715 const bt_clock_snapshot *cs = NULL;
716 bt_property_availability avail;
717 uint64_t count;
718
719 stream = borrow_stream(fs_sink, ir_stream);
720 if (!stream) {
721 status = BT_SELF_COMPONENT_STATUS_ERROR;
722 goto end;
723 }
724
725 if (fs_sink->ignore_discarded_packets) {
726 BT_LOGI("Ignoring discarded packets message: "
727 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
728 "trace-name=\"%s\", path=\"%s/%s\"",
729 bt_stream_get_id(ir_stream),
730 bt_stream_get_name(ir_stream),
731 bt_trace_get_name(
732 bt_stream_borrow_trace_const(ir_stream)),
733 stream->trace->path->str, stream->file_name->str);
734 goto end;
735 }
736
737 if (stream->discarded_packets_state.in_range) {
738 BT_LOGE("Unsupported contiguous discarded packets message: "
739 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
740 "trace-name=\"%s\", path=\"%s/%s\"",
741 bt_stream_get_id(ir_stream),
742 bt_stream_get_name(ir_stream),
743 bt_trace_get_name(
744 bt_stream_borrow_trace_const(ir_stream)),
745 stream->trace->path->str, stream->file_name->str);
746 status = BT_SELF_COMPONENT_STATUS_ERROR;
747 goto end;
748 }
749
ffb5c13c 750 BT_ASSERT(!stream->packet_state.is_open);
15fe47e0 751
ffb5c13c
PP
752 if (stream->sc->discarded_packets_has_ts) {
753 stream->discarded_packets_state.in_range = true;
15fe47e0 754
15fe47e0
PP
755 /*
756 * The clock snapshot values will be validated when
757 * handling the next "packet beginning" message.
758 */
9b24b6aa 759 cs = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
0cbc2c33 760 msg);
15fe47e0
PP
761 BT_ASSERT(cs);
762 stream->discarded_packets_state.beginning_cs =
763 bt_clock_snapshot_get_value(cs);
9b24b6aa 764 cs = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(
0cbc2c33 765 msg);
15fe47e0
PP
766 BT_ASSERT(cs);
767 stream->discarded_packets_state.end_cs =
768 bt_clock_snapshot_get_value(cs);
15fe47e0
PP
769 }
770
771 avail = bt_message_discarded_packets_get_count(msg, &count);
772 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
773 count = 1;
774 }
775
776 stream->packet_state.seq_num += count;
777
778end:
779 return status;
780}
781
782static inline
783void put_messages(bt_message_array_const msgs, uint64_t count)
784{
785 uint64_t i;
786
787 for (i = 0; i < count; i++) {
788 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
789 }
790}
791
792BT_HIDDEN
793bt_self_component_status ctf_fs_sink_consume(bt_self_component_sink *self_comp)
794{
795 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
796 struct fs_sink_comp *fs_sink;
797 bt_message_iterator_status it_status;
798 uint64_t msg_count = 0;
799 bt_message_array_const msgs;
800
801 fs_sink = bt_self_component_get_data(
802 bt_self_component_sink_as_self_component(self_comp));
803 BT_ASSERT(fs_sink);
804 BT_ASSERT(fs_sink->upstream_iter);
805
806 /* Consume messages */
807 it_status = bt_self_component_port_input_message_iterator_next(
808 fs_sink->upstream_iter, &msgs, &msg_count);
809 if (it_status < 0) {
810 status = BT_SELF_COMPONENT_STATUS_ERROR;
811 goto end;
812 }
813
814 switch (it_status) {
815 case BT_MESSAGE_ITERATOR_STATUS_OK:
816 {
817 uint64_t i;
818
819 for (i = 0; i < msg_count; i++) {
820 const bt_message *msg = msgs[i];
821
822 BT_ASSERT(msg);
823
824 switch (bt_message_get_type(msg)) {
825 case BT_MESSAGE_TYPE_EVENT:
826 status = handle_event_msg(fs_sink, msg);
827 break;
828 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
829 status = handle_packet_beginning_msg(
830 fs_sink, msg);
831 break;
832 case BT_MESSAGE_TYPE_PACKET_END:
833 status = handle_packet_end_msg(
834 fs_sink, msg);
835 break;
836 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
837 /* Ignore */
838 BT_LOGD_STR("Ignoring message iterator inactivity message.");
839 break;
840 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
841 status = handle_stream_beginning_msg(
842 fs_sink, msg);
843 break;
844 case BT_MESSAGE_TYPE_STREAM_END:
845 status = handle_stream_end_msg(
846 fs_sink, msg);
847 break;
848 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
849 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
850 /* Not supported by CTF 1.8 */
851 BT_LOGD_STR("Ignoring stream activity message.");
852 break;
853 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
854 status = handle_discarded_events_msg(
855 fs_sink, msg);
856 break;
857 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
858 status = handle_discarded_packets_msg(
859 fs_sink, msg);
860 break;
861 default:
862 abort();
863 }
864
865 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
866
867 if (status != BT_SELF_COMPONENT_STATUS_OK) {
868 BT_LOGE("Failed to handle message: "
869 "generated CTF traces could be incomplete: "
870 "output-dir-path=\"%s\"",
871 fs_sink->output_dir_path->str);
872 goto error;
873 }
874 }
875
876 break;
877 }
878 case BT_MESSAGE_ITERATOR_STATUS_AGAIN:
879 status = BT_SELF_COMPONENT_STATUS_AGAIN;
880 break;
881 case BT_MESSAGE_ITERATOR_STATUS_END:
882 /* TODO: Finalize all traces (should already be done?) */
883 status = BT_SELF_COMPONENT_STATUS_END;
884 break;
885 case BT_MESSAGE_ITERATOR_STATUS_NOMEM:
886 status = BT_SELF_COMPONENT_STATUS_NOMEM;
887 break;
888 case BT_MESSAGE_ITERATOR_STATUS_ERROR:
889 status = BT_SELF_COMPONENT_STATUS_NOMEM;
890 break;
891 default:
892 break;
893 }
894
895 goto end;
896
897error:
898 BT_ASSERT(status != BT_SELF_COMPONENT_STATUS_OK);
899 put_messages(msgs, msg_count);
900
901end:
902 return status;
903}
904
905BT_HIDDEN
906bt_self_component_status ctf_fs_sink_graph_is_configured(
907 bt_self_component_sink *self_comp)
908{
909 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
910 struct fs_sink_comp *fs_sink = bt_self_component_get_data(
911 bt_self_component_sink_as_self_component(self_comp));
912
913 fs_sink->upstream_iter =
914 bt_self_component_port_input_message_iterator_create(
915 bt_self_component_sink_borrow_input_port_by_name(
916 self_comp, in_port_name));
917 if (!fs_sink->upstream_iter) {
918 status = BT_SELF_COMPONENT_STATUS_NOMEM;
919 goto end;
920 }
921
922end:
923 return status;
924}
925
926BT_HIDDEN
927void ctf_fs_sink_finalize(bt_self_component_sink *self_comp)
928{
929 struct fs_sink_comp *fs_sink = bt_self_component_get_data(
930 bt_self_component_sink_as_self_component(self_comp));
931
932 destroy_fs_sink_comp(fs_sink);
933}
This page took 0.060412 seconds and 4 git commands to generate.