Move to kernel style SPDX license identifiers
[babeltrace.git] / src / plugins / ctf / fs-sink / fs-sink.c
1 /*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
5 */
6
7 #define BT_COMP_LOG_SELF_COMP (fs_sink->self_comp)
8 #define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level)
9 #define BT_LOG_TAG "PLUGIN/SINK.CTF.FS"
10 #include "logging/comp-logging.h"
11
12 #include <babeltrace2/babeltrace.h>
13 #include <stdio.h>
14 #include <stdbool.h>
15 #include <glib.h>
16 #include "common/assert.h"
17 #include "ctfser/ctfser.h"
18 #include "plugins/common/param-validation/param-validation.h"
19
20 #include "fs-sink.h"
21 #include "fs-sink-trace.h"
22 #include "fs-sink-stream.h"
23 #include "fs-sink-ctf-meta.h"
24 #include "translate-trace-ir-to-ctf-ir.h"
25 #include "translate-ctf-ir-to-tsdl.h"
26
27 static
28 const char * const in_port_name = "in";
29
30 static
31 bt_component_class_initialize_method_status ensure_output_dir_exists(
32 struct fs_sink_comp *fs_sink)
33 {
34 bt_component_class_initialize_method_status status =
35 BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
36 int ret;
37
38 ret = g_mkdir_with_parents(fs_sink->output_dir_path->str, 0755);
39 if (ret) {
40 BT_COMP_LOGE_ERRNO(
41 "Cannot create directories for output directory",
42 ": output-dir-path=\"%s\"",
43 fs_sink->output_dir_path->str);
44 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
45 goto end;
46 }
47
48 end:
49 return status;
50 }
51
52 static struct bt_param_validation_map_value_entry_descr fs_sink_params_descr[] = {
53 { "path", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY, { .type = BT_VALUE_TYPE_STRING } },
54 { "assume-single-trace", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .type = BT_VALUE_TYPE_BOOL } },
55 { "ignore-discarded-events", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .type = BT_VALUE_TYPE_BOOL } },
56 { "ignore-discarded-packets", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .type = BT_VALUE_TYPE_BOOL } },
57 { "quiet", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .type = BT_VALUE_TYPE_BOOL } },
58 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END
59 };
60
61 static
62 bt_component_class_initialize_method_status
63 configure_component(struct fs_sink_comp *fs_sink, const bt_value *params)
64 {
65 bt_component_class_initialize_method_status status;
66 const bt_value *value;
67 enum bt_param_validation_status validation_status;
68 gchar *validation_error;
69
70 validation_status = bt_param_validation_validate(params,
71 fs_sink_params_descr, &validation_error);
72 if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
73 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
74 goto end;
75 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
76 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
77 goto end;
78 }
79
80 value = bt_value_map_borrow_entry_value_const(params, "path");
81 g_string_assign(fs_sink->output_dir_path,
82 bt_value_string_get(value));
83
84 value = bt_value_map_borrow_entry_value_const(params,
85 "assume-single-trace");
86 if (value) {
87 fs_sink->assume_single_trace = (bool) bt_value_bool_get(value);
88 }
89
90 value = bt_value_map_borrow_entry_value_const(params,
91 "ignore-discarded-events");
92 if (value) {
93 fs_sink->ignore_discarded_events =
94 (bool) bt_value_bool_get(value);
95 }
96
97 value = bt_value_map_borrow_entry_value_const(params,
98 "ignore-discarded-packets");
99 if (value) {
100 fs_sink->ignore_discarded_packets =
101 (bool) bt_value_bool_get(value);
102 }
103
104 value = bt_value_map_borrow_entry_value_const(params,
105 "quiet");
106 if (value) {
107 fs_sink->quiet = (bool) bt_value_bool_get(value);
108 }
109
110 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
111
112 end:
113 g_free(validation_error);
114 return status;
115 }
116
117 static
118 void destroy_fs_sink_comp(struct fs_sink_comp *fs_sink)
119 {
120 if (!fs_sink) {
121 goto end;
122 }
123
124 if (fs_sink->output_dir_path) {
125 g_string_free(fs_sink->output_dir_path, TRUE);
126 fs_sink->output_dir_path = NULL;
127 }
128
129 if (fs_sink->traces) {
130 g_hash_table_destroy(fs_sink->traces);
131 fs_sink->traces = NULL;
132 }
133
134 BT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(
135 fs_sink->upstream_iter);
136 g_free(fs_sink);
137
138 end:
139 return;
140 }
141
142 BT_HIDDEN
143 bt_component_class_initialize_method_status ctf_fs_sink_init(
144 bt_self_component_sink *self_comp_sink,
145 bt_self_component_sink_configuration *config,
146 const bt_value *params,
147 void *init_method_data)
148 {
149 bt_component_class_initialize_method_status status =
150 BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
151 bt_self_component_add_port_status add_port_status;
152 struct fs_sink_comp *fs_sink = NULL;
153 bt_self_component *self_comp =
154 bt_self_component_sink_as_self_component(self_comp_sink);
155 bt_logging_level log_level = bt_component_get_logging_level(
156 bt_self_component_as_component(self_comp));
157
158 fs_sink = g_new0(struct fs_sink_comp, 1);
159 if (!fs_sink) {
160 BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR, log_level, self_comp,
161 "Failed to allocate one CTF FS sink structure.");
162 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
163 goto end;
164 }
165
166 fs_sink->log_level = log_level;
167 fs_sink->self_comp = self_comp;
168 fs_sink->output_dir_path = g_string_new(NULL);
169 status = configure_component(fs_sink, params);
170 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
171 /* configure_component() logs errors */
172 goto end;
173 }
174
175 if (fs_sink->assume_single_trace &&
176 g_file_test(fs_sink->output_dir_path->str,
177 G_FILE_TEST_EXISTS)) {
178 BT_COMP_LOGE("Single trace mode, but output path exists: "
179 "output-path=\"%s\"", fs_sink->output_dir_path->str);
180 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
181 goto end;
182 }
183
184 status = ensure_output_dir_exists(fs_sink);
185 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
186 /* ensure_output_dir_exists() logs errors */
187 goto end;
188 }
189
190 fs_sink->traces = g_hash_table_new_full(g_direct_hash, g_direct_equal,
191 NULL, (GDestroyNotify) fs_sink_trace_destroy);
192 if (!fs_sink->traces) {
193 BT_COMP_LOGE_STR("Failed to allocate one GHashTable.");
194 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
195 goto end;
196 }
197
198 add_port_status = bt_self_component_sink_add_input_port(
199 self_comp_sink, in_port_name, NULL, NULL);
200 switch (add_port_status) {
201 case BT_SELF_COMPONENT_ADD_PORT_STATUS_ERROR:
202 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
203 goto end;
204 case BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR:
205 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
206 goto end;
207 default:
208 break;
209 }
210
211 bt_self_component_set_data(self_comp, fs_sink);
212
213 end:
214 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
215 destroy_fs_sink_comp(fs_sink);
216 }
217
218 return status;
219 }
220
221 static inline
222 struct fs_sink_stream *borrow_stream(struct fs_sink_comp *fs_sink,
223 const bt_stream *ir_stream)
224 {
225 const bt_trace *ir_trace = bt_stream_borrow_trace_const(ir_stream);
226 struct fs_sink_trace *trace;
227 struct fs_sink_stream *stream = NULL;
228
229 trace = g_hash_table_lookup(fs_sink->traces, ir_trace);
230 if (G_UNLIKELY(!trace)) {
231 if (fs_sink->assume_single_trace &&
232 g_hash_table_size(fs_sink->traces) > 0) {
233 BT_COMP_LOGE("Single trace mode, but getting more than one trace: "
234 "stream-name=\"%s\"",
235 bt_stream_get_name(ir_stream));
236 goto end;
237 }
238
239 trace = fs_sink_trace_create(fs_sink, ir_trace);
240 if (!trace) {
241 goto end;
242 }
243 }
244
245 stream = g_hash_table_lookup(trace->streams, ir_stream);
246 if (G_UNLIKELY(!stream)) {
247 stream = fs_sink_stream_create(trace, ir_stream);
248 if (!stream) {
249 goto end;
250 }
251 }
252
253 end:
254 return stream;
255 }
256
257 static inline
258 bt_component_class_sink_consume_method_status handle_event_msg(
259 struct fs_sink_comp *fs_sink, const bt_message *msg)
260 {
261 int ret;
262 bt_component_class_sink_consume_method_status status =
263 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
264 const bt_event *ir_event = bt_message_event_borrow_event_const(msg);
265 const bt_stream *ir_stream = bt_event_borrow_stream_const(ir_event);
266 struct fs_sink_stream *stream;
267 struct fs_sink_ctf_event_class *ec = NULL;
268 const bt_clock_snapshot *cs = NULL;
269
270 stream = borrow_stream(fs_sink, ir_stream);
271 if (G_UNLIKELY(!stream)) {
272 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
273 goto end;
274 }
275
276 ret = try_translate_event_class_trace_ir_to_ctf_ir(fs_sink,
277 stream->sc, bt_event_borrow_class_const(ir_event), &ec);
278 if (ret) {
279 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
280 goto end;
281 }
282
283 BT_ASSERT_DBG(ec);
284
285 if (stream->sc->default_clock_class) {
286 cs = bt_message_event_borrow_default_clock_snapshot_const(
287 msg);
288 }
289
290 /*
291 * If this event's stream does not support packets, then we
292 * lazily create artificial packets.
293 *
294 * The size of an artificial packet is arbitrarily at least
295 * 4 MiB (it usually is greater because we close it when
296 * comes the time to write a new event and the packet's content
297 * size is >= 4 MiB), except the last one which can be smaller.
298 */
299 if (G_UNLIKELY(!stream->sc->has_packets)) {
300 if (stream->packet_state.is_open &&
301 bt_ctfser_get_offset_in_current_packet_bits(&stream->ctfser) / 8 >=
302 4 * 1024 * 1024) {
303 /*
304 * Stream's current packet is larger than 4 MiB:
305 * close it. A new packet will be opened just
306 * below.
307 */
308 ret = fs_sink_stream_close_packet(stream, NULL);
309 if (ret) {
310 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
311 goto end;
312 }
313 }
314
315 if (!stream->packet_state.is_open) {
316 /* Stream's packet is not currently opened: open it */
317 ret = fs_sink_stream_open_packet(stream, NULL, NULL);
318 if (ret) {
319 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
320 goto end;
321 }
322 }
323 }
324
325 BT_ASSERT_DBG(stream->packet_state.is_open);
326 ret = fs_sink_stream_write_event(stream, cs, ir_event, ec);
327 if (G_UNLIKELY(ret)) {
328 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
329 goto end;
330 }
331
332 end:
333 return status;
334 }
335
336 static inline
337 bt_component_class_sink_consume_method_status handle_packet_beginning_msg(
338 struct fs_sink_comp *fs_sink, const bt_message *msg)
339 {
340 int ret;
341 bt_component_class_sink_consume_method_status status =
342 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
343 const bt_packet *ir_packet =
344 bt_message_packet_beginning_borrow_packet_const(msg);
345 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
346 struct fs_sink_stream *stream;
347 const bt_clock_snapshot *cs = NULL;
348
349 stream = borrow_stream(fs_sink, ir_stream);
350 if (G_UNLIKELY(!stream)) {
351 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
352 goto end;
353 }
354
355 if (stream->sc->packets_have_ts_begin) {
356 cs = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
357 msg);
358 BT_ASSERT(cs);
359 }
360
361 /*
362 * If we previously received a discarded events message with
363 * a time range, make sure that its beginning time matches what's
364 * expected for CTF 1.8, that is:
365 *
366 * * Its beginning time is the previous packet's end
367 * time (or the current packet's beginning time if
368 * this is the first packet).
369 *
370 * We check this here instead of in handle_packet_end_msg()
371 * because we want to catch any incompatible message as early as
372 * possible to report the error.
373 *
374 * Validation of the discarded events message's end time is
375 * performed in handle_packet_end_msg().
376 */
377 if (stream->discarded_events_state.in_range) {
378 uint64_t expected_cs;
379
380 /*
381 * `stream->discarded_events_state.in_range` is only set
382 * when the stream class's discarded events have a time
383 * range.
384 *
385 * It is required that the packet beginning and end
386 * messages for this stream class have times when
387 * discarded events have a time range.
388 */
389 BT_ASSERT(stream->sc->discarded_events_has_ts);
390 BT_ASSERT(stream->sc->packets_have_ts_begin);
391 BT_ASSERT(stream->sc->packets_have_ts_end);
392
393 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
394 /* We're opening the first packet */
395 expected_cs = bt_clock_snapshot_get_value(cs);
396 } else {
397 expected_cs = stream->prev_packet_state.end_cs;
398 }
399
400 if (stream->discarded_events_state.beginning_cs !=
401 expected_cs) {
402 BT_COMP_LOGE("Incompatible discarded events message: "
403 "unexpected beginning time: "
404 "beginning-cs-val=%" PRIu64 ", "
405 "expected-beginning-cs-val=%" PRIu64 ", "
406 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
407 "trace-name=\"%s\", path=\"%s/%s\"",
408 stream->discarded_events_state.beginning_cs,
409 expected_cs,
410 bt_stream_get_id(ir_stream),
411 bt_stream_get_name(ir_stream),
412 bt_trace_get_name(
413 bt_stream_borrow_trace_const(ir_stream)),
414 stream->trace->path->str, stream->file_name->str);
415 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
416 goto end;
417 }
418 }
419
420 /*
421 * If we previously received a discarded packets message with a
422 * time range, make sure that its beginning and end times match
423 * what's expected for CTF 1.8, that is:
424 *
425 * * Its beginning time is the previous packet's end time.
426 *
427 * * Its end time is the current packet's beginning time.
428 */
429 if (stream->discarded_packets_state.in_range) {
430 uint64_t expected_end_cs;
431
432 /*
433 * `stream->discarded_packets_state.in_range` is only
434 * set when the stream class's discarded packets have a
435 * time range.
436 *
437 * It is required that the packet beginning and end
438 * messages for this stream class have times when
439 * discarded packets have a time range.
440 */
441 BT_ASSERT(stream->sc->discarded_packets_has_ts);
442 BT_ASSERT(stream->sc->packets_have_ts_begin);
443 BT_ASSERT(stream->sc->packets_have_ts_end);
444
445 /*
446 * It is not supported to have a discarded packets
447 * message _before_ the first packet: we cannot validate
448 * that its beginning time is compatible with CTF 1.8 in
449 * this case.
450 */
451 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
452 BT_COMP_LOGE("Incompatible discarded packets message "
453 "occurring before the stream's first packet: "
454 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
455 "trace-name=\"%s\", path=\"%s/%s\"",
456 bt_stream_get_id(ir_stream),
457 bt_stream_get_name(ir_stream),
458 bt_trace_get_name(
459 bt_stream_borrow_trace_const(ir_stream)),
460 stream->trace->path->str, stream->file_name->str);
461 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
462 goto end;
463 }
464
465 if (stream->discarded_packets_state.beginning_cs !=
466 stream->prev_packet_state.end_cs) {
467 BT_COMP_LOGE("Incompatible discarded packets message: "
468 "unexpected beginning time: "
469 "beginning-cs-val=%" PRIu64 ", "
470 "expected-beginning-cs-val=%" PRIu64 ", "
471 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
472 "trace-name=\"%s\", path=\"%s/%s\"",
473 stream->discarded_packets_state.beginning_cs,
474 stream->prev_packet_state.end_cs,
475 bt_stream_get_id(ir_stream),
476 bt_stream_get_name(ir_stream),
477 bt_trace_get_name(
478 bt_stream_borrow_trace_const(ir_stream)),
479 stream->trace->path->str, stream->file_name->str);
480 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
481 goto end;
482 }
483
484 expected_end_cs = bt_clock_snapshot_get_value(cs);
485
486 if (stream->discarded_packets_state.end_cs !=
487 expected_end_cs) {
488 BT_COMP_LOGE("Incompatible discarded packets message: "
489 "unexpected end time: "
490 "end-cs-val=%" PRIu64 ", "
491 "expected-end-cs-val=%" PRIu64 ", "
492 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
493 "trace-name=\"%s\", path=\"%s/%s\"",
494 stream->discarded_packets_state.end_cs,
495 expected_end_cs,
496 bt_stream_get_id(ir_stream),
497 bt_stream_get_name(ir_stream),
498 bt_trace_get_name(
499 bt_stream_borrow_trace_const(ir_stream)),
500 stream->trace->path->str, stream->file_name->str);
501 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
502 goto end;
503 }
504 }
505
506 /*
507 * We're not in a discarded packets time range anymore since we
508 * require that the discarded packets time ranges go from one
509 * packet's end time to the next packet's beginning time, and
510 * we're handling a packet beginning message here.
511 */
512 stream->discarded_packets_state.in_range = false;
513
514 ret = fs_sink_stream_open_packet(stream, cs, ir_packet);
515 if (ret) {
516 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
517 goto end;
518 }
519
520 end:
521 return status;
522 }
523
524 static inline
525 bt_component_class_sink_consume_method_status handle_packet_end_msg(
526 struct fs_sink_comp *fs_sink, const bt_message *msg)
527 {
528 int ret;
529 bt_component_class_sink_consume_method_status status =
530 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
531 const bt_packet *ir_packet =
532 bt_message_packet_end_borrow_packet_const(msg);
533 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
534 struct fs_sink_stream *stream;
535 const bt_clock_snapshot *cs = NULL;
536
537 stream = borrow_stream(fs_sink, ir_stream);
538 if (G_UNLIKELY(!stream)) {
539 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
540 goto end;
541 }
542
543 if (stream->sc->packets_have_ts_end) {
544 cs = bt_message_packet_end_borrow_default_clock_snapshot_const(
545 msg);
546 BT_ASSERT(cs);
547 }
548
549 /*
550 * If we previously received a discarded events message with
551 * a time range, make sure that its end time matches what's
552 * expected for CTF 1.8, that is:
553 *
554 * * Its end time is the current packet's end time.
555 *
556 * Validation of the discarded events message's beginning time
557 * is performed in handle_packet_beginning_msg().
558 */
559 if (stream->discarded_events_state.in_range) {
560 uint64_t expected_cs;
561
562 /*
563 * `stream->discarded_events_state.in_range` is only set
564 * when the stream class's discarded events have a time
565 * range.
566 *
567 * It is required that the packet beginning and end
568 * messages for this stream class have times when
569 * discarded events have a time range.
570 */
571 BT_ASSERT(stream->sc->discarded_events_has_ts);
572 BT_ASSERT(stream->sc->packets_have_ts_begin);
573 BT_ASSERT(stream->sc->packets_have_ts_end);
574
575 expected_cs = bt_clock_snapshot_get_value(cs);
576
577 if (stream->discarded_events_state.end_cs != expected_cs) {
578 BT_COMP_LOGE("Incompatible discarded events message: "
579 "unexpected end time: "
580 "end-cs-val=%" PRIu64 ", "
581 "expected-end-cs-val=%" PRIu64 ", "
582 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
583 "trace-name=\"%s\", path=\"%s/%s\"",
584 stream->discarded_events_state.end_cs,
585 expected_cs,
586 bt_stream_get_id(ir_stream),
587 bt_stream_get_name(ir_stream),
588 bt_trace_get_name(
589 bt_stream_borrow_trace_const(ir_stream)),
590 stream->trace->path->str, stream->file_name->str);
591 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
592 goto end;
593 }
594 }
595
596 ret = fs_sink_stream_close_packet(stream, cs);
597 if (ret) {
598 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
599 goto end;
600 }
601
602 /*
603 * We're not in a discarded events time range anymore since we
604 * require that the discarded events time ranges go from one
605 * packet's end time to the next packet's end time, and we're
606 * handling a packet end message here.
607 */
608 stream->discarded_events_state.in_range = false;
609
610 end:
611 return status;
612 }
613
614 static inline
615 bt_component_class_sink_consume_method_status handle_stream_beginning_msg(
616 struct fs_sink_comp *fs_sink, const bt_message *msg)
617 {
618 bt_component_class_sink_consume_method_status status =
619 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
620 const bt_stream *ir_stream =
621 bt_message_stream_beginning_borrow_stream_const(msg);
622 const bt_stream_class *ir_sc =
623 bt_stream_borrow_class_const(ir_stream);
624 struct fs_sink_stream *stream;
625 bool packets_have_beginning_end_cs =
626 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc) &&
627 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc);
628
629 /*
630 * Not supported: discarded events or discarded packets support
631 * without packets support. Packets are the way to know where
632 * discarded events/packets occurred in CTF 1.8.
633 */
634 if (!bt_stream_class_supports_packets(ir_sc)) {
635 BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc));
636
637 if (!fs_sink->ignore_discarded_events &&
638 bt_stream_class_supports_discarded_events(ir_sc)) {
639 BT_COMP_LOGE("Unsupported stream: "
640 "stream does not support packets, "
641 "but supports discarded events: "
642 "stream-addr=%p, "
643 "stream-id=%" PRIu64 ", "
644 "stream-name=\"%s\"",
645 ir_stream, bt_stream_get_id(ir_stream),
646 bt_stream_get_name(ir_stream));
647 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
648 goto end;
649 }
650 }
651
652 /*
653 * Not supported: discarded events with default clock snapshots,
654 * but packet beginning/end without default clock snapshot.
655 */
656 if (!fs_sink->ignore_discarded_events &&
657 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc) &&
658 !packets_have_beginning_end_cs) {
659 BT_COMP_LOGE("Unsupported stream: discarded events have "
660 "default clock snapshots, but packets have no "
661 "beginning and/or end default clock snapshots: "
662 "stream-addr=%p, "
663 "stream-id=%" PRIu64 ", "
664 "stream-name=\"%s\"",
665 ir_stream, bt_stream_get_id(ir_stream),
666 bt_stream_get_name(ir_stream));
667 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
668 goto end;
669 }
670
671 /*
672 * Not supported: discarded packets with default clock
673 * snapshots, but packet beginning/end without default clock
674 * snapshot.
675 */
676 if (!fs_sink->ignore_discarded_packets &&
677 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc) &&
678 !packets_have_beginning_end_cs) {
679 BT_COMP_LOGE("Unsupported stream: discarded packets have "
680 "default clock snapshots, but packets have no "
681 "beginning and/or end default clock snapshots: "
682 "stream-addr=%p, "
683 "stream-id=%" PRIu64 ", "
684 "stream-name=\"%s\"",
685 ir_stream, bt_stream_get_id(ir_stream),
686 bt_stream_get_name(ir_stream));
687 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
688 goto end;
689 }
690
691 stream = borrow_stream(fs_sink, ir_stream);
692 if (!stream) {
693 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
694 goto end;
695 }
696
697 BT_COMP_LOGI("Created new, empty stream file: "
698 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
699 "trace-name=\"%s\", path=\"%s/%s\"",
700 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
701 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
702 stream->trace->path->str, stream->file_name->str);
703
704 end:
705 return status;
706 }
707
708 static inline
709 bt_component_class_sink_consume_method_status handle_stream_end_msg(
710 struct fs_sink_comp *fs_sink, const bt_message *msg)
711 {
712 bt_component_class_sink_consume_method_status status =
713 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
714 const bt_stream *ir_stream =
715 bt_message_stream_end_borrow_stream_const(msg);
716 struct fs_sink_stream *stream;
717
718 stream = borrow_stream(fs_sink, ir_stream);
719 if (!stream) {
720 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
721 goto end;
722 }
723
724 if (G_UNLIKELY(!stream->sc->has_packets &&
725 stream->packet_state.is_open)) {
726 /* Close stream's current artificial packet */
727 int ret = fs_sink_stream_close_packet(stream, NULL);
728
729 if (ret) {
730 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
731 goto end;
732 }
733 }
734
735 BT_COMP_LOGI("Closing stream file: "
736 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
737 "trace-name=\"%s\", path=\"%s/%s\"",
738 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
739 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
740 stream->trace->path->str, stream->file_name->str);
741
742 /*
743 * This destroys the stream object and frees all its resources,
744 * closing the stream file.
745 */
746 g_hash_table_remove(stream->trace->streams, ir_stream);
747
748 end:
749 return status;
750 }
751
752 static inline
753 bt_component_class_sink_consume_method_status handle_discarded_events_msg(
754 struct fs_sink_comp *fs_sink, const bt_message *msg)
755 {
756 bt_component_class_sink_consume_method_status status =
757 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
758 const bt_stream *ir_stream =
759 bt_message_discarded_events_borrow_stream_const(msg);
760 struct fs_sink_stream *stream;
761 const bt_clock_snapshot *cs = NULL;
762 bt_property_availability avail;
763 uint64_t count;
764
765 stream = borrow_stream(fs_sink, ir_stream);
766 if (!stream) {
767 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
768 goto end;
769 }
770
771 if (fs_sink->ignore_discarded_events) {
772 BT_COMP_LOGI("Ignoring discarded events message: "
773 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
774 "trace-name=\"%s\", path=\"%s/%s\"",
775 bt_stream_get_id(ir_stream),
776 bt_stream_get_name(ir_stream),
777 bt_trace_get_name(
778 bt_stream_borrow_trace_const(ir_stream)),
779 stream->trace->path->str, stream->file_name->str);
780 goto end;
781 }
782
783 if (stream->discarded_events_state.in_range) {
784 BT_COMP_LOGE("Unsupported contiguous discarded events message: "
785 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
786 "trace-name=\"%s\", path=\"%s/%s\"",
787 bt_stream_get_id(ir_stream),
788 bt_stream_get_name(ir_stream),
789 bt_trace_get_name(
790 bt_stream_borrow_trace_const(ir_stream)),
791 stream->trace->path->str, stream->file_name->str);
792 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
793 goto end;
794 }
795
796 /*
797 * If we're currently in an opened packet (got a packet
798 * beginning message, but no packet end message yet), we do not
799 * support having a discarded events message with a time range
800 * because we require that the discarded events message's time
801 * range go from a packet's end time to the next packet's end
802 * time.
803 */
804 if (stream->packet_state.is_open &&
805 stream->sc->discarded_events_has_ts) {
806 BT_COMP_LOGE("Unsupported discarded events message with "
807 "default clock snapshots occurring within a packet: "
808 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
809 "trace-name=\"%s\", path=\"%s/%s\"",
810 bt_stream_get_id(ir_stream),
811 bt_stream_get_name(ir_stream),
812 bt_trace_get_name(
813 bt_stream_borrow_trace_const(ir_stream)),
814 stream->trace->path->str, stream->file_name->str);
815 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
816 goto end;
817 }
818
819 if (stream->sc->discarded_events_has_ts) {
820 /*
821 * Make the stream's state be in the time range of a
822 * discarded events message since we have the message's
823 * time range (`stream->sc->discarded_events_has_ts`).
824 */
825 stream->discarded_events_state.in_range = true;
826
827 /*
828 * The clock snapshot values will be validated when
829 * handling the next packet beginning and end messages
830 * (next calls to handle_packet_beginning_msg() and
831 * handle_packet_end_msg()).
832 */
833 cs = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
834 msg);
835 BT_ASSERT(cs);
836 stream->discarded_events_state.beginning_cs =
837 bt_clock_snapshot_get_value(cs);
838 cs = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(
839 msg);
840 BT_ASSERT(cs);
841 stream->discarded_events_state.end_cs = bt_clock_snapshot_get_value(cs);
842 }
843
844 avail = bt_message_discarded_events_get_count(msg, &count);
845 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
846 /*
847 * There's no specific count of discarded events: set it
848 * to 1 so that we know that we at least discarded
849 * something.
850 */
851 count = 1;
852 }
853
854 stream->packet_state.discarded_events_counter += count;
855
856 end:
857 return status;
858 }
859
860 static inline
861 bt_component_class_sink_consume_method_status handle_discarded_packets_msg(
862 struct fs_sink_comp *fs_sink, const bt_message *msg)
863 {
864 bt_component_class_sink_consume_method_status status =
865 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
866 const bt_stream *ir_stream =
867 bt_message_discarded_packets_borrow_stream_const(msg);
868 struct fs_sink_stream *stream;
869 const bt_clock_snapshot *cs = NULL;
870 bt_property_availability avail;
871 uint64_t count;
872
873 stream = borrow_stream(fs_sink, ir_stream);
874 if (!stream) {
875 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
876 goto end;
877 }
878
879 if (fs_sink->ignore_discarded_packets) {
880 BT_COMP_LOGI("Ignoring discarded packets message: "
881 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
882 "trace-name=\"%s\", path=\"%s/%s\"",
883 bt_stream_get_id(ir_stream),
884 bt_stream_get_name(ir_stream),
885 bt_trace_get_name(
886 bt_stream_borrow_trace_const(ir_stream)),
887 stream->trace->path->str, stream->file_name->str);
888 goto end;
889 }
890
891 if (stream->discarded_packets_state.in_range) {
892 BT_COMP_LOGE("Unsupported contiguous discarded packets message: "
893 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
894 "trace-name=\"%s\", path=\"%s/%s\"",
895 bt_stream_get_id(ir_stream),
896 bt_stream_get_name(ir_stream),
897 bt_trace_get_name(
898 bt_stream_borrow_trace_const(ir_stream)),
899 stream->trace->path->str, stream->file_name->str);
900 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
901 goto end;
902 }
903
904 /*
905 * Discarded packets messages are guaranteed to occur between
906 * packets.
907 */
908 BT_ASSERT(!stream->packet_state.is_open);
909
910 if (stream->sc->discarded_packets_has_ts) {
911 /*
912 * Make the stream's state be in the time range of a
913 * discarded packets message since we have the message's
914 * time range (`stream->sc->discarded_packets_has_ts`).
915 */
916 stream->discarded_packets_state.in_range = true;
917
918 /*
919 * The clock snapshot values will be validated when
920 * handling the next packet beginning message (next call
921 * to handle_packet_beginning_msg()).
922 */
923 cs = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
924 msg);
925 BT_ASSERT(cs);
926 stream->discarded_packets_state.beginning_cs =
927 bt_clock_snapshot_get_value(cs);
928 cs = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(
929 msg);
930 BT_ASSERT(cs);
931 stream->discarded_packets_state.end_cs =
932 bt_clock_snapshot_get_value(cs);
933 }
934
935 avail = bt_message_discarded_packets_get_count(msg, &count);
936 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
937 /*
938 * There's no specific count of discarded packets: set
939 * it to 1 so that we know that we at least discarded
940 * something.
941 */
942 count = 1;
943 }
944
945 stream->packet_state.seq_num += count;
946
947 end:
948 return status;
949 }
950
951 static inline
952 void put_messages(bt_message_array_const msgs, uint64_t count)
953 {
954 uint64_t i;
955
956 for (i = 0; i < count; i++) {
957 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
958 }
959 }
960
961 BT_HIDDEN
962 bt_component_class_sink_consume_method_status ctf_fs_sink_consume(
963 bt_self_component_sink *self_comp)
964 {
965 bt_component_class_sink_consume_method_status status =
966 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
967 struct fs_sink_comp *fs_sink;
968 bt_message_iterator_next_status next_status;
969 uint64_t msg_count = 0;
970 bt_message_array_const msgs;
971
972 fs_sink = bt_self_component_get_data(
973 bt_self_component_sink_as_self_component(self_comp));
974 BT_ASSERT_DBG(fs_sink);
975 BT_ASSERT_DBG(fs_sink->upstream_iter);
976
977 /* Consume messages */
978 next_status = bt_message_iterator_next(
979 fs_sink->upstream_iter, &msgs, &msg_count);
980 if (next_status < 0) {
981 status = (int) next_status;
982 goto end;
983 }
984
985 switch (next_status) {
986 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK:
987 {
988 uint64_t i;
989
990 for (i = 0; i < msg_count; i++) {
991 const bt_message *msg = msgs[i];
992
993 BT_ASSERT_DBG(msg);
994
995 switch (bt_message_get_type(msg)) {
996 case BT_MESSAGE_TYPE_EVENT:
997 status = handle_event_msg(fs_sink, msg);
998 break;
999 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
1000 status = handle_packet_beginning_msg(
1001 fs_sink, msg);
1002 break;
1003 case BT_MESSAGE_TYPE_PACKET_END:
1004 status = handle_packet_end_msg(
1005 fs_sink, msg);
1006 break;
1007 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
1008 /* Ignore */
1009 BT_COMP_LOGD_STR("Ignoring message iterator inactivity message.");
1010 break;
1011 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1012 status = handle_stream_beginning_msg(
1013 fs_sink, msg);
1014 break;
1015 case BT_MESSAGE_TYPE_STREAM_END:
1016 status = handle_stream_end_msg(
1017 fs_sink, msg);
1018 break;
1019 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1020 status = handle_discarded_events_msg(
1021 fs_sink, msg);
1022 break;
1023 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1024 status = handle_discarded_packets_msg(
1025 fs_sink, msg);
1026 break;
1027 default:
1028 bt_common_abort();
1029 }
1030
1031 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
1032
1033 if (status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK) {
1034 BT_COMP_LOGE("Failed to handle message: "
1035 "generated CTF traces could be incomplete: "
1036 "output-dir-path=\"%s\"",
1037 fs_sink->output_dir_path->str);
1038 goto error;
1039 }
1040 }
1041
1042 break;
1043 }
1044 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN:
1045 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN;
1046 break;
1047 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END:
1048 /* TODO: Finalize all traces (should already be done?) */
1049 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END;
1050 break;
1051 case BT_MESSAGE_ITERATOR_NEXT_STATUS_MEMORY_ERROR:
1052 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_MEMORY_ERROR;
1053 break;
1054 case BT_MESSAGE_ITERATOR_NEXT_STATUS_ERROR:
1055 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_MEMORY_ERROR;
1056 break;
1057 default:
1058 break;
1059 }
1060
1061 goto end;
1062
1063 error:
1064 BT_ASSERT(status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK);
1065 put_messages(msgs, msg_count);
1066
1067 end:
1068 return status;
1069 }
1070
1071 BT_HIDDEN
1072 bt_component_class_sink_graph_is_configured_method_status
1073 ctf_fs_sink_graph_is_configured(
1074 bt_self_component_sink *self_comp)
1075 {
1076 bt_component_class_sink_graph_is_configured_method_status status;
1077 bt_message_iterator_create_from_sink_component_status
1078 msg_iter_status;
1079 struct fs_sink_comp *fs_sink = bt_self_component_get_data(
1080 bt_self_component_sink_as_self_component(self_comp));
1081
1082 msg_iter_status =
1083 bt_message_iterator_create_from_sink_component(
1084 self_comp,
1085 bt_self_component_sink_borrow_input_port_by_name(
1086 self_comp, in_port_name), &fs_sink->upstream_iter);
1087 if (msg_iter_status != BT_MESSAGE_ITERATOR_CREATE_FROM_SINK_COMPONENT_STATUS_OK) {
1088 status = (int) msg_iter_status;
1089 goto end;
1090 }
1091
1092 status = BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK;
1093 end:
1094 return status;
1095 }
1096
1097 BT_HIDDEN
1098 void ctf_fs_sink_finalize(bt_self_component_sink *self_comp)
1099 {
1100 struct fs_sink_comp *fs_sink = bt_self_component_get_data(
1101 bt_self_component_sink_as_self_component(self_comp));
1102
1103 destroy_fs_sink_comp(fs_sink);
1104 }
This page took 0.088121 seconds and 4 git commands to generate.