lib: strictly type function return status enumerations
[babeltrace.git] / src / plugins / ctf / fs-sink / fs-sink.c
1 /*
2 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 * SOFTWARE.
21 */
22
23 #define BT_COMP_LOG_SELF_COMP (fs_sink->self_comp)
24 #define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level)
25 #define BT_LOG_TAG "PLUGIN/SINK.CTF.FS"
26 #include "plugins/comp-logging.h"
27
28 #include <babeltrace2/babeltrace.h>
29 #include <stdio.h>
30 #include <stdbool.h>
31 #include <glib.h>
32 #include "common/assert.h"
33 #include "ctfser/ctfser.h"
34
35 #include "fs-sink.h"
36 #include "fs-sink-trace.h"
37 #include "fs-sink-stream.h"
38 #include "fs-sink-ctf-meta.h"
39 #include "translate-trace-ir-to-ctf-ir.h"
40 #include "translate-ctf-ir-to-tsdl.h"
41
42 static
43 const char * const in_port_name = "in";
44
45 static
46 bt_component_class_init_method_status ensure_output_dir_exists(
47 struct fs_sink_comp *fs_sink)
48 {
49 bt_component_class_init_method_status status =
50 BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK;
51 int ret;
52
53 ret = g_mkdir_with_parents(fs_sink->output_dir_path->str, 0755);
54 if (ret) {
55 BT_COMP_LOGE_ERRNO(
56 "Cannot create directories for output directory",
57 ": output-dir-path=\"%s\"",
58 fs_sink->output_dir_path->str);
59 status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR;
60 goto end;
61 }
62
63 end:
64 return status;
65 }
66
67 static
68 bt_component_class_init_method_status
69 configure_component(struct fs_sink_comp *fs_sink,
70 const bt_value *params)
71 {
72 bt_component_class_init_method_status status =
73 BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK;
74 const bt_value *value;
75
76 value = bt_value_map_borrow_entry_value_const(params, "path");
77 if (!value) {
78 BT_COMP_LOGE_STR("Missing mandatory `path` parameter.");
79 status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR;
80 goto end;
81 }
82
83 if (!bt_value_is_string(value)) {
84 BT_COMP_LOGE_STR("`path` parameter: expecting a string.");
85 status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR;
86 goto end;
87 }
88
89 g_string_assign(fs_sink->output_dir_path,
90 bt_value_string_get(value));
91 value = bt_value_map_borrow_entry_value_const(params,
92 "assume-single-trace");
93 if (value) {
94 if (!bt_value_is_bool(value)) {
95 BT_COMP_LOGE_STR("`assume-single-trace` parameter: expecting a boolean.");
96 status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR;
97 goto end;
98 }
99
100 fs_sink->assume_single_trace = (bool) bt_value_bool_get(value);
101 }
102
103 value = bt_value_map_borrow_entry_value_const(params,
104 "ignore-discarded-events");
105 if (value) {
106 if (!bt_value_is_bool(value)) {
107 BT_COMP_LOGE_STR("`ignore-discarded-events` parameter: expecting a boolean.");
108 status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR;
109 goto end;
110 }
111
112 fs_sink->ignore_discarded_events =
113 (bool) bt_value_bool_get(value);
114 }
115
116 value = bt_value_map_borrow_entry_value_const(params,
117 "ignore-discarded-packets");
118 if (value) {
119 if (!bt_value_is_bool(value)) {
120 BT_COMP_LOGE_STR("`ignore-discarded-packets` parameter: expecting a boolean.");
121 status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR;
122 goto end;
123 }
124
125 fs_sink->ignore_discarded_packets =
126 (bool) bt_value_bool_get(value);
127 }
128
129 value = bt_value_map_borrow_entry_value_const(params,
130 "quiet");
131 if (value) {
132 if (!bt_value_is_bool(value)) {
133 BT_COMP_LOGE_STR("`quiet` parameter: expecting a boolean.");
134 status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR;
135 goto end;
136 }
137
138 fs_sink->quiet = (bool) bt_value_bool_get(value);
139 }
140
141 end:
142 return status;
143 }
144
145 static
146 void destroy_fs_sink_comp(struct fs_sink_comp *fs_sink)
147 {
148 if (!fs_sink) {
149 goto end;
150 }
151
152 if (fs_sink->output_dir_path) {
153 g_string_free(fs_sink->output_dir_path, TRUE);
154 fs_sink->output_dir_path = NULL;
155 }
156
157 if (fs_sink->traces) {
158 g_hash_table_destroy(fs_sink->traces);
159 fs_sink->traces = NULL;
160 }
161
162 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(
163 fs_sink->upstream_iter);
164 g_free(fs_sink);
165
166 end:
167 return;
168 }
169
170 BT_HIDDEN
171 bt_component_class_init_method_status ctf_fs_sink_init(
172 bt_self_component_sink *self_comp_sink, const bt_value *params,
173 void *init_method_data)
174 {
175 bt_component_class_init_method_status status =
176 BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK;
177 bt_self_component_add_port_status add_port_status;
178 struct fs_sink_comp *fs_sink = NULL;
179 bt_self_component *self_comp =
180 bt_self_component_sink_as_self_component(self_comp_sink);
181 bt_logging_level log_level = bt_component_get_logging_level(
182 bt_self_component_as_component(self_comp));
183
184 fs_sink = g_new0(struct fs_sink_comp, 1);
185 if (!fs_sink) {
186 BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR, log_level, self_comp,
187 "Failed to allocate one CTF FS sink structure.");
188 status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR;
189 goto end;
190 }
191
192 fs_sink->log_level = log_level;
193 fs_sink->self_comp = self_comp;
194 fs_sink->output_dir_path = g_string_new(NULL);
195 status = configure_component(fs_sink, params);
196 if (status != BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK) {
197 /* configure_component() logs errors */
198 goto end;
199 }
200
201 if (fs_sink->assume_single_trace &&
202 g_file_test(fs_sink->output_dir_path->str,
203 G_FILE_TEST_EXISTS)) {
204 BT_COMP_LOGE("Single trace mode, but output path exists: "
205 "output-path=\"%s\"", fs_sink->output_dir_path->str);
206 status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR;
207 goto end;
208 }
209
210 status = ensure_output_dir_exists(fs_sink);
211 if (status != BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK) {
212 /* ensure_output_dir_exists() logs errors */
213 goto end;
214 }
215
216 fs_sink->traces = g_hash_table_new_full(g_direct_hash, g_direct_equal,
217 NULL, (GDestroyNotify) fs_sink_trace_destroy);
218 if (!fs_sink->traces) {
219 BT_COMP_LOGE_STR("Failed to allocate one GHashTable.");
220 status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR;
221 goto end;
222 }
223
224 add_port_status = bt_self_component_sink_add_input_port(
225 self_comp_sink, in_port_name, NULL, NULL);
226 switch (add_port_status) {
227 case BT_SELF_COMPONENT_ADD_PORT_STATUS_ERROR:
228 status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR;
229 goto end;
230 case BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR:
231 status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR;
232 goto end;
233 default:
234 break;
235 }
236
237 bt_self_component_set_data(self_comp, fs_sink);
238
239 end:
240 if (status != BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK) {
241 destroy_fs_sink_comp(fs_sink);
242 }
243
244 return status;
245 }
246
247 static inline
248 struct fs_sink_stream *borrow_stream(struct fs_sink_comp *fs_sink,
249 const bt_stream *ir_stream)
250 {
251 const bt_trace *ir_trace = bt_stream_borrow_trace_const(ir_stream);
252 struct fs_sink_trace *trace;
253 struct fs_sink_stream *stream = NULL;
254
255 trace = g_hash_table_lookup(fs_sink->traces, ir_trace);
256 if (G_UNLIKELY(!trace)) {
257 if (fs_sink->assume_single_trace &&
258 g_hash_table_size(fs_sink->traces) > 0) {
259 BT_COMP_LOGE("Single trace mode, but getting more than one trace: "
260 "stream-name=\"%s\"",
261 bt_stream_get_name(ir_stream));
262 goto end;
263 }
264
265 trace = fs_sink_trace_create(fs_sink, ir_trace);
266 if (!trace) {
267 goto end;
268 }
269 }
270
271 stream = g_hash_table_lookup(trace->streams, ir_stream);
272 if (G_UNLIKELY(!stream)) {
273 stream = fs_sink_stream_create(trace, ir_stream);
274 if (!stream) {
275 goto end;
276 }
277 }
278
279 end:
280 return stream;
281 }
282
283 static inline
284 bt_component_class_sink_consume_method_status handle_event_msg(
285 struct fs_sink_comp *fs_sink, const bt_message *msg)
286 {
287 int ret;
288 bt_component_class_sink_consume_method_status status =
289 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
290 const bt_event *ir_event = bt_message_event_borrow_event_const(msg);
291 const bt_stream *ir_stream = bt_event_borrow_stream_const(ir_event);
292 struct fs_sink_stream *stream;
293 struct fs_sink_ctf_event_class *ec = NULL;
294 const bt_clock_snapshot *cs = NULL;
295
296 stream = borrow_stream(fs_sink, ir_stream);
297 if (G_UNLIKELY(!stream)) {
298 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
299 goto end;
300 }
301
302 ret = try_translate_event_class_trace_ir_to_ctf_ir(fs_sink,
303 stream->sc, bt_event_borrow_class_const(ir_event), &ec);
304 if (ret) {
305 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
306 goto end;
307 }
308
309 BT_ASSERT(ec);
310
311 if (stream->sc->default_clock_class) {
312 cs = bt_message_event_borrow_default_clock_snapshot_const(
313 msg);
314 }
315
316 ret = fs_sink_stream_write_event(stream, cs, ir_event, ec);
317 if (G_UNLIKELY(ret)) {
318 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
319 goto end;
320 }
321
322 end:
323 return status;
324 }
325
326 static inline
327 bt_component_class_sink_consume_method_status handle_packet_beginning_msg(
328 struct fs_sink_comp *fs_sink, const bt_message *msg)
329 {
330 int ret;
331 bt_component_class_sink_consume_method_status status =
332 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
333 const bt_packet *ir_packet =
334 bt_message_packet_beginning_borrow_packet_const(msg);
335 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
336 struct fs_sink_stream *stream;
337 const bt_clock_snapshot *cs = NULL;
338
339 stream = borrow_stream(fs_sink, ir_stream);
340 if (G_UNLIKELY(!stream)) {
341 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
342 goto end;
343 }
344
345 if (stream->sc->packets_have_ts_begin) {
346 cs = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
347 msg);
348 BT_ASSERT(cs);
349 }
350
351 /*
352 * If we previously received a discarded events message with
353 * a time range, make sure that its beginning time matches what's
354 * expected for CTF 1.8, that is:
355 *
356 * * Its beginning time is the previous packet's end
357 * time (or the current packet's beginning time if
358 * this is the first packet).
359 *
360 * We check this here instead of in handle_packet_end_msg()
361 * because we want to catch any incompatible message as early as
362 * possible to report the error.
363 *
364 * Validation of the discarded events message's end time is
365 * performed in handle_packet_end_msg().
366 */
367 if (stream->discarded_events_state.in_range) {
368 uint64_t expected_cs;
369
370 /*
371 * `stream->discarded_events_state.in_range` is only set
372 * when the stream class's discarded events have a time
373 * range.
374 *
375 * It is required that the packet beginning and end
376 * messages for this stream class have times when
377 * discarded events have a time range.
378 */
379 BT_ASSERT(stream->sc->discarded_events_has_ts);
380 BT_ASSERT(stream->sc->packets_have_ts_begin);
381 BT_ASSERT(stream->sc->packets_have_ts_end);
382
383 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
384 /* We're opening the first packet */
385 expected_cs = bt_clock_snapshot_get_value(cs);
386 } else {
387 expected_cs = stream->prev_packet_state.end_cs;
388 }
389
390 if (stream->discarded_events_state.beginning_cs !=
391 expected_cs) {
392 BT_COMP_LOGE("Incompatible discarded events message: "
393 "unexpected beginning time: "
394 "beginning-cs-val=%" PRIu64 ", "
395 "expected-beginning-cs-val=%" PRIu64 ", "
396 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
397 "trace-name=\"%s\", path=\"%s/%s\"",
398 stream->discarded_events_state.beginning_cs,
399 expected_cs,
400 bt_stream_get_id(ir_stream),
401 bt_stream_get_name(ir_stream),
402 bt_trace_get_name(
403 bt_stream_borrow_trace_const(ir_stream)),
404 stream->trace->path->str, stream->file_name->str);
405 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
406 goto end;
407 }
408 }
409
410 /*
411 * If we previously received a discarded packets message with a
412 * time range, make sure that its beginning and end times match
413 * what's expected for CTF 1.8, that is:
414 *
415 * * Its beginning time is the previous packet's end time.
416 *
417 * * Its end time is the current packet's beginning time.
418 */
419 if (stream->discarded_packets_state.in_range) {
420 uint64_t expected_end_cs;
421
422 /*
423 * `stream->discarded_packets_state.in_range` is only
424 * set when the stream class's discarded packets have a
425 * time range.
426 *
427 * It is required that the packet beginning and end
428 * messages for this stream class have times when
429 * discarded packets have a time range.
430 */
431 BT_ASSERT(stream->sc->discarded_packets_has_ts);
432 BT_ASSERT(stream->sc->packets_have_ts_begin);
433 BT_ASSERT(stream->sc->packets_have_ts_end);
434
435 /*
436 * It is not supported to have a discarded packets
437 * message _before_ the first packet: we cannot validate
438 * that its beginning time is compatible with CTF 1.8 in
439 * this case.
440 */
441 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
442 BT_COMP_LOGE("Incompatible discarded packets message "
443 "occuring before the stream's first packet: "
444 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
445 "trace-name=\"%s\", path=\"%s/%s\"",
446 bt_stream_get_id(ir_stream),
447 bt_stream_get_name(ir_stream),
448 bt_trace_get_name(
449 bt_stream_borrow_trace_const(ir_stream)),
450 stream->trace->path->str, stream->file_name->str);
451 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
452 goto end;
453 }
454
455 if (stream->discarded_packets_state.beginning_cs !=
456 stream->prev_packet_state.end_cs) {
457 BT_COMP_LOGE("Incompatible discarded packets message: "
458 "unexpected beginning time: "
459 "beginning-cs-val=%" PRIu64 ", "
460 "expected-beginning-cs-val=%" PRIu64 ", "
461 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
462 "trace-name=\"%s\", path=\"%s/%s\"",
463 stream->discarded_packets_state.beginning_cs,
464 stream->prev_packet_state.end_cs,
465 bt_stream_get_id(ir_stream),
466 bt_stream_get_name(ir_stream),
467 bt_trace_get_name(
468 bt_stream_borrow_trace_const(ir_stream)),
469 stream->trace->path->str, stream->file_name->str);
470 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
471 goto end;
472 }
473
474 expected_end_cs = bt_clock_snapshot_get_value(cs);
475
476 if (stream->discarded_packets_state.end_cs !=
477 expected_end_cs) {
478 BT_COMP_LOGE("Incompatible discarded packets message: "
479 "unexpected end time: "
480 "end-cs-val=%" PRIu64 ", "
481 "expected-end-cs-val=%" PRIu64 ", "
482 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
483 "trace-name=\"%s\", path=\"%s/%s\"",
484 stream->discarded_packets_state.end_cs,
485 expected_end_cs,
486 bt_stream_get_id(ir_stream),
487 bt_stream_get_name(ir_stream),
488 bt_trace_get_name(
489 bt_stream_borrow_trace_const(ir_stream)),
490 stream->trace->path->str, stream->file_name->str);
491 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
492 goto end;
493 }
494 }
495
496 /*
497 * We're not in a discarded packets time range anymore since we
498 * require that the discarded packets time ranges go from one
499 * packet's end time to the next packet's beginning time, and
500 * we're handling a packet beginning message here.
501 */
502 stream->discarded_packets_state.in_range = false;
503
504 ret = fs_sink_stream_open_packet(stream, cs, ir_packet);
505 if (ret) {
506 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
507 goto end;
508 }
509
510 end:
511 return status;
512 }
513
514 static inline
515 bt_component_class_sink_consume_method_status handle_packet_end_msg(
516 struct fs_sink_comp *fs_sink, const bt_message *msg)
517 {
518 int ret;
519 bt_component_class_sink_consume_method_status status =
520 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
521 const bt_packet *ir_packet =
522 bt_message_packet_end_borrow_packet_const(msg);
523 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
524 struct fs_sink_stream *stream;
525 const bt_clock_snapshot *cs = NULL;
526
527 stream = borrow_stream(fs_sink, ir_stream);
528 if (G_UNLIKELY(!stream)) {
529 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
530 goto end;
531 }
532
533 if (stream->sc->packets_have_ts_end) {
534 cs = bt_message_packet_end_borrow_default_clock_snapshot_const(
535 msg);
536 BT_ASSERT(cs);
537 }
538
539 /*
540 * If we previously received a discarded events message with
541 * a time range, make sure that its end time matches what's
542 * expected for CTF 1.8, that is:
543 *
544 * * Its end time is the current packet's end time.
545 *
546 * Validation of the discarded events message's beginning time
547 * is performed in handle_packet_beginning_msg().
548 */
549 if (stream->discarded_events_state.in_range) {
550 uint64_t expected_cs;
551
552 /*
553 * `stream->discarded_events_state.in_range` is only set
554 * when the stream class's discarded events have a time
555 * range.
556 *
557 * It is required that the packet beginning and end
558 * messages for this stream class have times when
559 * discarded events have a time range.
560 */
561 BT_ASSERT(stream->sc->discarded_events_has_ts);
562 BT_ASSERT(stream->sc->packets_have_ts_begin);
563 BT_ASSERT(stream->sc->packets_have_ts_end);
564
565 expected_cs = bt_clock_snapshot_get_value(cs);
566
567 if (stream->discarded_events_state.end_cs != expected_cs) {
568 BT_COMP_LOGE("Incompatible discarded events message: "
569 "unexpected end time: "
570 "end-cs-val=%" PRIu64 ", "
571 "expected-end-cs-val=%" PRIu64 ", "
572 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
573 "trace-name=\"%s\", path=\"%s/%s\"",
574 stream->discarded_events_state.end_cs,
575 expected_cs,
576 bt_stream_get_id(ir_stream),
577 bt_stream_get_name(ir_stream),
578 bt_trace_get_name(
579 bt_stream_borrow_trace_const(ir_stream)),
580 stream->trace->path->str, stream->file_name->str);
581 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
582 goto end;
583 }
584 }
585
586 ret = fs_sink_stream_close_packet(stream, cs);
587 if (ret) {
588 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
589 goto end;
590 }
591
592 /*
593 * We're not in a discarded events time range anymore since we
594 * require that the discarded events time ranges go from one
595 * packet's end time to the next packet's end time, and we're
596 * handling a packet end message here.
597 */
598 stream->discarded_events_state.in_range = false;
599
600 end:
601 return status;
602 }
603
604 static inline
605 bt_component_class_sink_consume_method_status handle_stream_beginning_msg(
606 struct fs_sink_comp *fs_sink, const bt_message *msg)
607 {
608 bt_component_class_sink_consume_method_status status =
609 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
610 const bt_stream *ir_stream =
611 bt_message_stream_beginning_borrow_stream_const(msg);
612 const bt_stream_class *ir_sc =
613 bt_stream_borrow_class_const(ir_stream);
614 struct fs_sink_stream *stream;
615 bool packets_have_beginning_end_cs =
616 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc) &&
617 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc);
618
619 /*
620 * Not supported: discarded events with default clock snapshots,
621 * but packet beginning/end without default clock snapshot.
622 */
623 if (!fs_sink->ignore_discarded_events &&
624 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc) &&
625 !packets_have_beginning_end_cs) {
626 BT_COMP_LOGE("Unsupported stream: discarded events have "
627 "default clock snapshots, but packets have no "
628 "beginning and/or end default clock snapshots: "
629 "stream-addr=%p, "
630 "stream-id=%" PRIu64 ", "
631 "stream-name=\"%s\"",
632 ir_stream, bt_stream_get_id(ir_stream),
633 bt_stream_get_name(ir_stream));
634 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
635 goto end;
636 }
637
638 /*
639 * Not supported: discarded packets with default clock
640 * snapshots, but packet beginning/end without default clock
641 * snapshot.
642 */
643 if (!fs_sink->ignore_discarded_packets &&
644 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc) &&
645 !packets_have_beginning_end_cs) {
646 BT_COMP_LOGE("Unsupported stream: discarded packets have "
647 "default clock snapshots, but packets have no "
648 "beginning and/or end default clock snapshots: "
649 "stream-addr=%p, "
650 "stream-id=%" PRIu64 ", "
651 "stream-name=\"%s\"",
652 ir_stream, bt_stream_get_id(ir_stream),
653 bt_stream_get_name(ir_stream));
654 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
655 goto end;
656 }
657
658 stream = borrow_stream(fs_sink, ir_stream);
659 if (!stream) {
660 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
661 goto end;
662 }
663
664 BT_COMP_LOGI("Created new, empty stream file: "
665 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
666 "trace-name=\"%s\", path=\"%s/%s\"",
667 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
668 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
669 stream->trace->path->str, stream->file_name->str);
670
671 end:
672 return status;
673 }
674
675 static inline
676 bt_component_class_sink_consume_method_status handle_stream_end_msg(
677 struct fs_sink_comp *fs_sink, const bt_message *msg)
678 {
679 bt_component_class_sink_consume_method_status status =
680 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
681 const bt_stream *ir_stream =
682 bt_message_stream_end_borrow_stream_const(msg);
683 struct fs_sink_stream *stream;
684
685 stream = borrow_stream(fs_sink, ir_stream);
686 if (!stream) {
687 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
688 goto end;
689 }
690
691 BT_COMP_LOGI("Closing stream file: "
692 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
693 "trace-name=\"%s\", path=\"%s/%s\"",
694 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
695 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
696 stream->trace->path->str, stream->file_name->str);
697
698 /*
699 * This destroys the stream object and frees all its resources,
700 * closing the stream file.
701 */
702 g_hash_table_remove(stream->trace->streams, ir_stream);
703
704 end:
705 return status;
706 }
707
708 static inline
709 bt_component_class_sink_consume_method_status handle_discarded_events_msg(
710 struct fs_sink_comp *fs_sink, const bt_message *msg)
711 {
712 bt_component_class_sink_consume_method_status status =
713 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
714 const bt_stream *ir_stream =
715 bt_message_discarded_events_borrow_stream_const(msg);
716 struct fs_sink_stream *stream;
717 const bt_clock_snapshot *cs = NULL;
718 bt_property_availability avail;
719 uint64_t count;
720
721 stream = borrow_stream(fs_sink, ir_stream);
722 if (!stream) {
723 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
724 goto end;
725 }
726
727 if (fs_sink->ignore_discarded_events) {
728 BT_COMP_LOGI("Ignoring discarded events message: "
729 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
730 "trace-name=\"%s\", path=\"%s/%s\"",
731 bt_stream_get_id(ir_stream),
732 bt_stream_get_name(ir_stream),
733 bt_trace_get_name(
734 bt_stream_borrow_trace_const(ir_stream)),
735 stream->trace->path->str, stream->file_name->str);
736 goto end;
737 }
738
739 if (stream->discarded_events_state.in_range) {
740 BT_COMP_LOGE("Unsupported contiguous discarded events message: "
741 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
742 "trace-name=\"%s\", path=\"%s/%s\"",
743 bt_stream_get_id(ir_stream),
744 bt_stream_get_name(ir_stream),
745 bt_trace_get_name(
746 bt_stream_borrow_trace_const(ir_stream)),
747 stream->trace->path->str, stream->file_name->str);
748 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
749 goto end;
750 }
751
752 /*
753 * If we're currently in an opened packet (got a packet
754 * beginning message, but no packet end message yet), we do not
755 * support having a discarded events message with a time range
756 * because we require that the discarded events message's time
757 * range go from a packet's end time to the next packet's end
758 * time.
759 */
760 if (stream->packet_state.is_open &&
761 stream->sc->discarded_events_has_ts) {
762 BT_COMP_LOGE("Unsupported discarded events message with "
763 "default clock snapshots occuring within a packet: "
764 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
765 "trace-name=\"%s\", path=\"%s/%s\"",
766 bt_stream_get_id(ir_stream),
767 bt_stream_get_name(ir_stream),
768 bt_trace_get_name(
769 bt_stream_borrow_trace_const(ir_stream)),
770 stream->trace->path->str, stream->file_name->str);
771 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
772 goto end;
773 }
774
775 if (stream->sc->discarded_events_has_ts) {
776 /*
777 * Make the stream's state be in the time range of a
778 * discarded events message since we have the message's
779 * time range (`stream->sc->discarded_events_has_ts`).
780 */
781 stream->discarded_events_state.in_range = true;
782
783 /*
784 * The clock snapshot values will be validated when
785 * handling the next packet beginning and end messages
786 * (next calls to handle_packet_beginning_msg() and
787 * handle_packet_end_msg()).
788 */
789 cs = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
790 msg);
791 BT_ASSERT(cs);
792 stream->discarded_events_state.beginning_cs =
793 bt_clock_snapshot_get_value(cs);
794 cs = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(
795 msg);
796 BT_ASSERT(cs);
797 stream->discarded_events_state.end_cs = bt_clock_snapshot_get_value(cs);
798 }
799
800 avail = bt_message_discarded_events_get_count(msg, &count);
801 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
802 /*
803 * There's no specific count of discarded events: set it
804 * to 1 so that we know that we at least discarded
805 * something.
806 */
807 count = 1;
808 }
809
810 stream->packet_state.discarded_events_counter += count;
811
812 end:
813 return status;
814 }
815
816 static inline
817 bt_component_class_sink_consume_method_status handle_discarded_packets_msg(
818 struct fs_sink_comp *fs_sink, const bt_message *msg)
819 {
820 bt_component_class_sink_consume_method_status status =
821 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
822 const bt_stream *ir_stream =
823 bt_message_discarded_packets_borrow_stream_const(msg);
824 struct fs_sink_stream *stream;
825 const bt_clock_snapshot *cs = NULL;
826 bt_property_availability avail;
827 uint64_t count;
828
829 stream = borrow_stream(fs_sink, ir_stream);
830 if (!stream) {
831 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
832 goto end;
833 }
834
835 if (fs_sink->ignore_discarded_packets) {
836 BT_COMP_LOGI("Ignoring discarded packets message: "
837 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
838 "trace-name=\"%s\", path=\"%s/%s\"",
839 bt_stream_get_id(ir_stream),
840 bt_stream_get_name(ir_stream),
841 bt_trace_get_name(
842 bt_stream_borrow_trace_const(ir_stream)),
843 stream->trace->path->str, stream->file_name->str);
844 goto end;
845 }
846
847 if (stream->discarded_packets_state.in_range) {
848 BT_COMP_LOGE("Unsupported contiguous discarded packets message: "
849 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
850 "trace-name=\"%s\", path=\"%s/%s\"",
851 bt_stream_get_id(ir_stream),
852 bt_stream_get_name(ir_stream),
853 bt_trace_get_name(
854 bt_stream_borrow_trace_const(ir_stream)),
855 stream->trace->path->str, stream->file_name->str);
856 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
857 goto end;
858 }
859
860 /*
861 * Discarded packets messages are guaranteed to occur between
862 * packets.
863 */
864 BT_ASSERT(!stream->packet_state.is_open);
865
866 if (stream->sc->discarded_packets_has_ts) {
867 /*
868 * Make the stream's state be in the time range of a
869 * discarded packets message since we have the message's
870 * time range (`stream->sc->discarded_packets_has_ts`).
871 */
872 stream->discarded_packets_state.in_range = true;
873
874 /*
875 * The clock snapshot values will be validated when
876 * handling the next packet beginning message (next call
877 * to handle_packet_beginning_msg()).
878 */
879 cs = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
880 msg);
881 BT_ASSERT(cs);
882 stream->discarded_packets_state.beginning_cs =
883 bt_clock_snapshot_get_value(cs);
884 cs = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(
885 msg);
886 BT_ASSERT(cs);
887 stream->discarded_packets_state.end_cs =
888 bt_clock_snapshot_get_value(cs);
889 }
890
891 avail = bt_message_discarded_packets_get_count(msg, &count);
892 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
893 /*
894 * There's no specific count of discarded packets: set
895 * it to 1 so that we know that we at least discarded
896 * something.
897 */
898 count = 1;
899 }
900
901 stream->packet_state.seq_num += count;
902
903 end:
904 return status;
905 }
906
907 static inline
908 void put_messages(bt_message_array_const msgs, uint64_t count)
909 {
910 uint64_t i;
911
912 for (i = 0; i < count; i++) {
913 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
914 }
915 }
916
917 BT_HIDDEN
918 bt_component_class_sink_consume_method_status ctf_fs_sink_consume(
919 bt_self_component_sink *self_comp)
920 {
921 bt_component_class_sink_consume_method_status status =
922 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
923 struct fs_sink_comp *fs_sink;
924 bt_message_iterator_next_status next_status;
925 uint64_t msg_count = 0;
926 bt_message_array_const msgs;
927
928 fs_sink = bt_self_component_get_data(
929 bt_self_component_sink_as_self_component(self_comp));
930 BT_ASSERT(fs_sink);
931 BT_ASSERT(fs_sink->upstream_iter);
932
933 /* Consume messages */
934 next_status = bt_self_component_port_input_message_iterator_next(
935 fs_sink->upstream_iter, &msgs, &msg_count);
936 if (next_status < 0) {
937 status = (int) next_status;
938 goto end;
939 }
940
941 switch (next_status) {
942 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK:
943 {
944 uint64_t i;
945
946 for (i = 0; i < msg_count; i++) {
947 const bt_message *msg = msgs[i];
948
949 BT_ASSERT(msg);
950
951 switch (bt_message_get_type(msg)) {
952 case BT_MESSAGE_TYPE_EVENT:
953 status = handle_event_msg(fs_sink, msg);
954 break;
955 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
956 status = handle_packet_beginning_msg(
957 fs_sink, msg);
958 break;
959 case BT_MESSAGE_TYPE_PACKET_END:
960 status = handle_packet_end_msg(
961 fs_sink, msg);
962 break;
963 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
964 /* Ignore */
965 BT_COMP_LOGD_STR("Ignoring message iterator inactivity message.");
966 break;
967 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
968 status = handle_stream_beginning_msg(
969 fs_sink, msg);
970 break;
971 case BT_MESSAGE_TYPE_STREAM_END:
972 status = handle_stream_end_msg(
973 fs_sink, msg);
974 break;
975 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
976 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
977 /* Not supported by CTF 1.8 */
978 BT_COMP_LOGD_STR("Ignoring stream activity message.");
979 break;
980 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
981 status = handle_discarded_events_msg(
982 fs_sink, msg);
983 break;
984 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
985 status = handle_discarded_packets_msg(
986 fs_sink, msg);
987 break;
988 default:
989 abort();
990 }
991
992 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
993
994 if (status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK) {
995 BT_COMP_LOGE("Failed to handle message: "
996 "generated CTF traces could be incomplete: "
997 "output-dir-path=\"%s\"",
998 fs_sink->output_dir_path->str);
999 goto error;
1000 }
1001 }
1002
1003 break;
1004 }
1005 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN:
1006 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN;
1007 break;
1008 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END:
1009 /* TODO: Finalize all traces (should already be done?) */
1010 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END;
1011 break;
1012 case BT_MESSAGE_ITERATOR_NEXT_STATUS_MEMORY_ERROR:
1013 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_MEMORY_ERROR;
1014 break;
1015 case BT_MESSAGE_ITERATOR_NEXT_STATUS_ERROR:
1016 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_MEMORY_ERROR;
1017 break;
1018 default:
1019 break;
1020 }
1021
1022 goto end;
1023
1024 error:
1025 BT_ASSERT(status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK);
1026 put_messages(msgs, msg_count);
1027
1028 end:
1029 return status;
1030 }
1031
1032 BT_HIDDEN
1033 bt_component_class_sink_graph_is_configured_method_status ctf_fs_sink_graph_is_configured(
1034 bt_self_component_sink *self_comp)
1035 {
1036 bt_component_class_sink_graph_is_configured_method_status status =
1037 BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK;
1038 struct fs_sink_comp *fs_sink = bt_self_component_get_data(
1039 bt_self_component_sink_as_self_component(self_comp));
1040
1041 fs_sink->upstream_iter =
1042 bt_self_component_port_input_message_iterator_create(
1043 bt_self_component_sink_borrow_input_port_by_name(
1044 self_comp, in_port_name));
1045 if (!fs_sink->upstream_iter) {
1046 status = BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_ERROR;
1047 goto end;
1048 }
1049
1050 end:
1051 return status;
1052 }
1053
1054 BT_HIDDEN
1055 void ctf_fs_sink_finalize(bt_self_component_sink *self_comp)
1056 {
1057 struct fs_sink_comp *fs_sink = bt_self_component_get_data(
1058 bt_self_component_sink_as_self_component(self_comp));
1059
1060 destroy_fs_sink_comp(fs_sink);
1061 }
This page took 0.050353 seconds and 4 git commands to generate.