42a616c9b3f4a3f8bb55b0a92b5b671069557108
[babeltrace.git] / src / plugins / ctf / fs-sink / fs-sink.cpp
1 /*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
5 */
6
7 #define BT_COMP_LOG_SELF_COMP (fs_sink->self_comp)
8 #define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level)
9 #define BT_LOG_TAG "PLUGIN/SINK.CTF.FS"
10 #include "logging/comp-logging.h"
11
12 #include <babeltrace2/babeltrace.h>
13 #include <stdio.h>
14 #include <stdbool.h>
15 #include <glib.h>
16 #include "common/assert.h"
17 #include "ctfser/ctfser.h"
18 #include "plugins/common/param-validation/param-validation.h"
19
20 #include "fs-sink.hpp"
21 #include "fs-sink-trace.hpp"
22 #include "fs-sink-stream.hpp"
23 #include "fs-sink-ctf-meta.hpp"
24 #include "translate-trace-ir-to-ctf-ir.hpp"
25 #include "translate-ctf-ir-to-tsdl.hpp"
26
27 static
28 const char * const in_port_name = "in";
29
30 static
31 bt_component_class_initialize_method_status ensure_output_dir_exists(
32 struct fs_sink_comp *fs_sink)
33 {
34 bt_component_class_initialize_method_status status =
35 BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
36 int ret;
37
38 ret = g_mkdir_with_parents(fs_sink->output_dir_path->str, 0755);
39 if (ret) {
40 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(fs_sink->self_comp,
41 "Cannot create directories for output directory",
42 ": output-dir-path=\"%s\"",
43 fs_sink->output_dir_path->str);
44 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
45 goto end;
46 }
47
48 end:
49 return status;
50 }
51
52 static bt_param_validation_map_value_entry_descr fs_sink_params_descr[] = {
53 { "path", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
54 { bt_param_validation_value_descr::string_t } },
55 { "assume-single-trace", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
56 { bt_param_validation_value_descr::bool_t } },
57 { "ignore-discarded-events", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
58 { bt_param_validation_value_descr::bool_t } },
59 { "ignore-discarded-packets", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
60 { bt_param_validation_value_descr::bool_t } },
61 { "quiet", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
62 { bt_param_validation_value_descr::bool_t } },
63 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END
64 };
65
66 static
67 bt_component_class_initialize_method_status
68 configure_component(struct fs_sink_comp *fs_sink, const bt_value *params)
69 {
70 bt_component_class_initialize_method_status status;
71 const bt_value *value;
72 enum bt_param_validation_status validation_status;
73 gchar *validation_error;
74
75 validation_status = bt_param_validation_validate(params,
76 fs_sink_params_descr, &validation_error);
77 if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
78 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
79 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
80 "%s", validation_error);
81 goto end;
82 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
83 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
84 goto end;
85 }
86
87 value = bt_value_map_borrow_entry_value_const(params, "path");
88 g_string_assign(fs_sink->output_dir_path,
89 bt_value_string_get(value));
90
91 value = bt_value_map_borrow_entry_value_const(params,
92 "assume-single-trace");
93 if (value) {
94 fs_sink->assume_single_trace = (bool) bt_value_bool_get(value);
95 }
96
97 value = bt_value_map_borrow_entry_value_const(params,
98 "ignore-discarded-events");
99 if (value) {
100 fs_sink->ignore_discarded_events =
101 (bool) bt_value_bool_get(value);
102 }
103
104 value = bt_value_map_borrow_entry_value_const(params,
105 "ignore-discarded-packets");
106 if (value) {
107 fs_sink->ignore_discarded_packets =
108 (bool) bt_value_bool_get(value);
109 }
110
111 value = bt_value_map_borrow_entry_value_const(params,
112 "quiet");
113 if (value) {
114 fs_sink->quiet = (bool) bt_value_bool_get(value);
115 }
116
117 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
118
119 end:
120 g_free(validation_error);
121 return status;
122 }
123
124 static
125 void destroy_fs_sink_comp(struct fs_sink_comp *fs_sink)
126 {
127 if (!fs_sink) {
128 goto end;
129 }
130
131 if (fs_sink->output_dir_path) {
132 g_string_free(fs_sink->output_dir_path, TRUE);
133 fs_sink->output_dir_path = NULL;
134 }
135
136 if (fs_sink->traces) {
137 g_hash_table_destroy(fs_sink->traces);
138 fs_sink->traces = NULL;
139 }
140
141 BT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(
142 fs_sink->upstream_iter);
143 g_free(fs_sink);
144
145 end:
146 return;
147 }
148
149 BT_HIDDEN
150 bt_component_class_initialize_method_status ctf_fs_sink_init(
151 bt_self_component_sink *self_comp_sink,
152 bt_self_component_sink_configuration *config,
153 const bt_value *params,
154 void *init_method_data)
155 {
156 bt_component_class_initialize_method_status status;
157 bt_self_component_add_port_status add_port_status;
158 struct fs_sink_comp *fs_sink = NULL;
159 bt_self_component *self_comp =
160 bt_self_component_sink_as_self_component(self_comp_sink);
161 bt_logging_level log_level = bt_component_get_logging_level(
162 bt_self_component_as_component(self_comp));
163
164 fs_sink = g_new0(struct fs_sink_comp, 1);
165 if (!fs_sink) {
166 BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR, log_level, self_comp,
167 "Failed to allocate one CTF FS sink structure.");
168 BT_CURRENT_THREAD_ERROR_APPEND_CAUSE_FROM_COMPONENT(
169 self_comp, "Failed to allocate one CTF FS sink structure.");
170 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
171 goto end;
172 }
173
174 fs_sink->log_level = log_level;
175 fs_sink->self_comp = self_comp;
176 fs_sink->output_dir_path = g_string_new(NULL);
177 status = configure_component(fs_sink, params);
178 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
179 /* configure_component() logs errors */
180 goto end;
181 }
182
183 if (fs_sink->assume_single_trace &&
184 g_file_test(fs_sink->output_dir_path->str,
185 G_FILE_TEST_EXISTS)) {
186 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
187 "Single trace mode, but output path exists: output-path=\"%s\"",
188 fs_sink->output_dir_path->str);
189 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
190 goto end;
191 }
192
193 status = ensure_output_dir_exists(fs_sink);
194 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
195 /* ensure_output_dir_exists() logs errors */
196 goto end;
197 }
198
199 fs_sink->traces = g_hash_table_new_full(g_direct_hash, g_direct_equal,
200 NULL, (GDestroyNotify) fs_sink_trace_destroy);
201 if (!fs_sink->traces) {
202 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate one GHashTable.");
203 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
204 goto end;
205 }
206
207 add_port_status = bt_self_component_sink_add_input_port(
208 self_comp_sink, in_port_name, NULL, NULL);
209 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
210 status = (bt_component_class_initialize_method_status) add_port_status;
211 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to add input port.");
212 goto end;
213 }
214
215 bt_self_component_set_data(self_comp, fs_sink);
216
217 end:
218 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
219 destroy_fs_sink_comp(fs_sink);
220 }
221
222 return status;
223 }
224
225 static inline
226 struct fs_sink_stream *borrow_stream(struct fs_sink_comp *fs_sink,
227 const bt_stream *ir_stream)
228 {
229 const bt_trace *ir_trace = bt_stream_borrow_trace_const(ir_stream);
230 struct fs_sink_trace *trace;
231 struct fs_sink_stream *stream = NULL;
232
233 trace = (fs_sink_trace *) g_hash_table_lookup(fs_sink->traces, ir_trace);
234 if (G_UNLIKELY(!trace)) {
235 if (fs_sink->assume_single_trace &&
236 g_hash_table_size(fs_sink->traces) > 0) {
237 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
238 "Single trace mode, but getting more than one trace: "
239 "stream-name=\"%s\"",
240 bt_stream_get_name(ir_stream));
241 goto end;
242 }
243
244 trace = fs_sink_trace_create(fs_sink, ir_trace);
245 if (!trace) {
246 goto end;
247 }
248 }
249
250 stream = (fs_sink_stream *) g_hash_table_lookup(trace->streams, ir_stream);
251 if (G_UNLIKELY(!stream)) {
252 stream = fs_sink_stream_create(trace, ir_stream);
253 if (!stream) {
254 goto end;
255 }
256 }
257
258 end:
259 return stream;
260 }
261
262 static inline
263 bt_component_class_sink_consume_method_status handle_event_msg(
264 struct fs_sink_comp *fs_sink, const bt_message *msg)
265 {
266 int ret;
267 bt_component_class_sink_consume_method_status status =
268 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
269 const bt_event *ir_event = bt_message_event_borrow_event_const(msg);
270 const bt_stream *ir_stream = bt_event_borrow_stream_const(ir_event);
271 struct fs_sink_stream *stream;
272 struct fs_sink_ctf_event_class *ec = NULL;
273 const bt_clock_snapshot *cs = NULL;
274
275 stream = borrow_stream(fs_sink, ir_stream);
276 if (G_UNLIKELY(!stream)) {
277 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
278 "Failed to borrow stream.");
279 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
280 goto end;
281 }
282
283 ret = try_translate_event_class_trace_ir_to_ctf_ir(fs_sink,
284 stream->sc, bt_event_borrow_class_const(ir_event), &ec);
285 if (ret) {
286 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
287 "Failed to translate event class to CTF IR.");
288 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
289 goto end;
290 }
291
292 BT_ASSERT_DBG(ec);
293
294 if (stream->sc->default_clock_class) {
295 cs = bt_message_event_borrow_default_clock_snapshot_const(
296 msg);
297 }
298
299 /*
300 * If this event's stream does not support packets, then we
301 * lazily create artificial packets.
302 *
303 * The size of an artificial packet is arbitrarily at least
304 * 4 MiB (it usually is greater because we close it when
305 * comes the time to write a new event and the packet's content
306 * size is >= 4 MiB), except the last one which can be smaller.
307 */
308 if (G_UNLIKELY(!stream->sc->has_packets)) {
309 if (stream->packet_state.is_open &&
310 bt_ctfser_get_offset_in_current_packet_bits(&stream->ctfser) / 8 >=
311 4 * 1024 * 1024) {
312 /*
313 * Stream's current packet is larger than 4 MiB:
314 * close it. A new packet will be opened just
315 * below.
316 */
317 ret = fs_sink_stream_close_packet(stream, NULL);
318 if (ret) {
319 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
320 "Failed to close packet.");
321 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
322 goto end;
323 }
324 }
325
326 if (!stream->packet_state.is_open) {
327 /* Stream's packet is not currently opened: open it */
328 ret = fs_sink_stream_open_packet(stream, NULL, NULL);
329 if (ret) {
330 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
331 "Failed to open packet.");
332 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
333 goto end;
334 }
335 }
336 }
337
338 BT_ASSERT_DBG(stream->packet_state.is_open);
339 ret = fs_sink_stream_write_event(stream, cs, ir_event, ec);
340 if (G_UNLIKELY(ret)) {
341 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
342 "Failed to write event.");
343 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
344 goto end;
345 }
346
347 end:
348 return status;
349 }
350
351 static inline
352 bt_component_class_sink_consume_method_status handle_packet_beginning_msg(
353 struct fs_sink_comp *fs_sink, const bt_message *msg)
354 {
355 int ret;
356 bt_component_class_sink_consume_method_status status =
357 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
358 const bt_packet *ir_packet =
359 bt_message_packet_beginning_borrow_packet_const(msg);
360 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
361 struct fs_sink_stream *stream;
362 const bt_clock_snapshot *cs = NULL;
363
364 stream = borrow_stream(fs_sink, ir_stream);
365 if (G_UNLIKELY(!stream)) {
366 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
367 "Failed to borrow stream.");
368 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
369 goto end;
370 }
371
372 if (stream->sc->packets_have_ts_begin) {
373 cs = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
374 msg);
375 BT_ASSERT(cs);
376 }
377
378 /*
379 * If we previously received a discarded events message with
380 * a time range, make sure that its beginning time matches what's
381 * expected for CTF 1.8, that is:
382 *
383 * * Its beginning time is the previous packet's end
384 * time (or the current packet's beginning time if
385 * this is the first packet).
386 *
387 * We check this here instead of in handle_packet_end_msg()
388 * because we want to catch any incompatible message as early as
389 * possible to report the error.
390 *
391 * Validation of the discarded events message's end time is
392 * performed in handle_packet_end_msg().
393 */
394 if (stream->discarded_events_state.in_range) {
395 uint64_t expected_cs;
396
397 /*
398 * `stream->discarded_events_state.in_range` is only set
399 * when the stream class's discarded events have a time
400 * range.
401 *
402 * It is required that the packet beginning and end
403 * messages for this stream class have times when
404 * discarded events have a time range.
405 */
406 BT_ASSERT(stream->sc->discarded_events_has_ts);
407 BT_ASSERT(stream->sc->packets_have_ts_begin);
408 BT_ASSERT(stream->sc->packets_have_ts_end);
409
410 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
411 /* We're opening the first packet */
412 expected_cs = bt_clock_snapshot_get_value(cs);
413 } else {
414 expected_cs = stream->prev_packet_state.end_cs;
415 }
416
417 if (stream->discarded_events_state.beginning_cs !=
418 expected_cs) {
419 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
420 "Incompatible discarded events message: "
421 "unexpected beginning time: "
422 "beginning-cs-val=%" PRIu64 ", "
423 "expected-beginning-cs-val=%" PRIu64 ", "
424 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
425 "trace-name=\"%s\", path=\"%s/%s\"",
426 stream->discarded_events_state.beginning_cs,
427 expected_cs,
428 bt_stream_get_id(ir_stream),
429 bt_stream_get_name(ir_stream),
430 bt_trace_get_name(
431 bt_stream_borrow_trace_const(ir_stream)),
432 stream->trace->path->str, stream->file_name->str);
433 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
434 goto end;
435 }
436 }
437
438 /*
439 * If we previously received a discarded packets message with a
440 * time range, make sure that its beginning and end times match
441 * what's expected for CTF 1.8, that is:
442 *
443 * * Its beginning time is the previous packet's end time.
444 *
445 * * Its end time is the current packet's beginning time.
446 */
447 if (stream->discarded_packets_state.in_range) {
448 uint64_t expected_end_cs;
449
450 /*
451 * `stream->discarded_packets_state.in_range` is only
452 * set when the stream class's discarded packets have a
453 * time range.
454 *
455 * It is required that the packet beginning and end
456 * messages for this stream class have times when
457 * discarded packets have a time range.
458 */
459 BT_ASSERT(stream->sc->discarded_packets_has_ts);
460 BT_ASSERT(stream->sc->packets_have_ts_begin);
461 BT_ASSERT(stream->sc->packets_have_ts_end);
462
463 /*
464 * It is not supported to have a discarded packets
465 * message _before_ the first packet: we cannot validate
466 * that its beginning time is compatible with CTF 1.8 in
467 * this case.
468 */
469 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
470 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
471 "Incompatible discarded packets message "
472 "occurring before the stream's first packet: "
473 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
474 "trace-name=\"%s\", path=\"%s/%s\"",
475 bt_stream_get_id(ir_stream),
476 bt_stream_get_name(ir_stream),
477 bt_trace_get_name(
478 bt_stream_borrow_trace_const(ir_stream)),
479 stream->trace->path->str, stream->file_name->str);
480 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
481 goto end;
482 }
483
484 if (stream->discarded_packets_state.beginning_cs !=
485 stream->prev_packet_state.end_cs) {
486 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
487 "Incompatible discarded packets message: "
488 "unexpected beginning time: "
489 "beginning-cs-val=%" PRIu64 ", "
490 "expected-beginning-cs-val=%" PRIu64 ", "
491 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
492 "trace-name=\"%s\", path=\"%s/%s\"",
493 stream->discarded_packets_state.beginning_cs,
494 stream->prev_packet_state.end_cs,
495 bt_stream_get_id(ir_stream),
496 bt_stream_get_name(ir_stream),
497 bt_trace_get_name(
498 bt_stream_borrow_trace_const(ir_stream)),
499 stream->trace->path->str, stream->file_name->str);
500 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
501 goto end;
502 }
503
504 expected_end_cs = bt_clock_snapshot_get_value(cs);
505
506 if (stream->discarded_packets_state.end_cs !=
507 expected_end_cs) {
508 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
509 "Incompatible discarded packets message: "
510 "unexpected end time: "
511 "end-cs-val=%" PRIu64 ", "
512 "expected-end-cs-val=%" PRIu64 ", "
513 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
514 "trace-name=\"%s\", path=\"%s/%s\"",
515 stream->discarded_packets_state.end_cs,
516 expected_end_cs,
517 bt_stream_get_id(ir_stream),
518 bt_stream_get_name(ir_stream),
519 bt_trace_get_name(
520 bt_stream_borrow_trace_const(ir_stream)),
521 stream->trace->path->str, stream->file_name->str);
522 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
523 goto end;
524 }
525 }
526
527 /*
528 * We're not in a discarded packets time range anymore since we
529 * require that the discarded packets time ranges go from one
530 * packet's end time to the next packet's beginning time, and
531 * we're handling a packet beginning message here.
532 */
533 stream->discarded_packets_state.in_range = false;
534
535 ret = fs_sink_stream_open_packet(stream, cs, ir_packet);
536 if (ret) {
537 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
538 "Failed to open packet.");
539 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
540 goto end;
541 }
542
543 end:
544 return status;
545 }
546
547 static inline
548 bt_component_class_sink_consume_method_status handle_packet_end_msg(
549 struct fs_sink_comp *fs_sink, const bt_message *msg)
550 {
551 int ret;
552 bt_component_class_sink_consume_method_status status =
553 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
554 const bt_packet *ir_packet =
555 bt_message_packet_end_borrow_packet_const(msg);
556 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
557 struct fs_sink_stream *stream;
558 const bt_clock_snapshot *cs = NULL;
559
560 stream = borrow_stream(fs_sink, ir_stream);
561 if (G_UNLIKELY(!stream)) {
562 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
563 "Failed to borrow stream.");
564 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
565 goto end;
566 }
567
568 if (stream->sc->packets_have_ts_end) {
569 cs = bt_message_packet_end_borrow_default_clock_snapshot_const(
570 msg);
571 BT_ASSERT(cs);
572 }
573
574 /*
575 * If we previously received a discarded events message with
576 * a time range, make sure that its end time matches what's
577 * expected for CTF 1.8, that is:
578 *
579 * * Its end time is the current packet's end time.
580 *
581 * Validation of the discarded events message's beginning time
582 * is performed in handle_packet_beginning_msg().
583 */
584 if (stream->discarded_events_state.in_range) {
585 uint64_t expected_cs;
586
587 /*
588 * `stream->discarded_events_state.in_range` is only set
589 * when the stream class's discarded events have a time
590 * range.
591 *
592 * It is required that the packet beginning and end
593 * messages for this stream class have times when
594 * discarded events have a time range.
595 */
596 BT_ASSERT(stream->sc->discarded_events_has_ts);
597 BT_ASSERT(stream->sc->packets_have_ts_begin);
598 BT_ASSERT(stream->sc->packets_have_ts_end);
599
600 expected_cs = bt_clock_snapshot_get_value(cs);
601
602 if (stream->discarded_events_state.end_cs != expected_cs) {
603 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
604 "Incompatible discarded events message: "
605 "unexpected end time: "
606 "end-cs-val=%" PRIu64 ", "
607 "expected-end-cs-val=%" PRIu64 ", "
608 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
609 "trace-name=\"%s\", path=\"%s/%s\"",
610 stream->discarded_events_state.end_cs,
611 expected_cs,
612 bt_stream_get_id(ir_stream),
613 bt_stream_get_name(ir_stream),
614 bt_trace_get_name(
615 bt_stream_borrow_trace_const(ir_stream)),
616 stream->trace->path->str, stream->file_name->str);
617 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
618 goto end;
619 }
620 }
621
622 ret = fs_sink_stream_close_packet(stream, cs);
623 if (ret) {
624 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
625 "Failed to close packet.");
626 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
627 goto end;
628 }
629
630 /*
631 * We're not in a discarded events time range anymore since we
632 * require that the discarded events time ranges go from one
633 * packet's end time to the next packet's end time, and we're
634 * handling a packet end message here.
635 */
636 stream->discarded_events_state.in_range = false;
637
638 end:
639 return status;
640 }
641
642 static inline
643 bt_component_class_sink_consume_method_status handle_stream_beginning_msg(
644 struct fs_sink_comp *fs_sink, const bt_message *msg)
645 {
646 bt_component_class_sink_consume_method_status status =
647 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
648 const bt_stream *ir_stream =
649 bt_message_stream_beginning_borrow_stream_const(msg);
650 const bt_stream_class *ir_sc =
651 bt_stream_borrow_class_const(ir_stream);
652 struct fs_sink_stream *stream;
653 bool packets_have_beginning_end_cs =
654 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc) &&
655 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc);
656
657 /*
658 * Not supported: discarded events or discarded packets support
659 * without packets support. Packets are the way to know where
660 * discarded events/packets occurred in CTF 1.8.
661 */
662 if (!bt_stream_class_supports_packets(ir_sc)) {
663 BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc));
664
665 if (!fs_sink->ignore_discarded_events &&
666 bt_stream_class_supports_discarded_events(ir_sc)) {
667 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
668 "Unsupported stream: "
669 "stream does not support packets, "
670 "but supports discarded events: "
671 "stream-addr=%p, "
672 "stream-id=%" PRIu64 ", "
673 "stream-name=\"%s\"",
674 ir_stream, bt_stream_get_id(ir_stream),
675 bt_stream_get_name(ir_stream));
676 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
677 goto end;
678 }
679 }
680
681 /*
682 * Not supported: discarded events with default clock snapshots,
683 * but packet beginning/end without default clock snapshot.
684 */
685 if (!fs_sink->ignore_discarded_events &&
686 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc) &&
687 !packets_have_beginning_end_cs) {
688 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
689 "Unsupported stream: discarded events have "
690 "default clock snapshots, but packets have no "
691 "beginning and/or end default clock snapshots: "
692 "stream-addr=%p, "
693 "stream-id=%" PRIu64 ", "
694 "stream-name=\"%s\"",
695 ir_stream, bt_stream_get_id(ir_stream),
696 bt_stream_get_name(ir_stream));
697 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
698 goto end;
699 }
700
701 /*
702 * Not supported: discarded packets with default clock
703 * snapshots, but packet beginning/end without default clock
704 * snapshot.
705 */
706 if (!fs_sink->ignore_discarded_packets &&
707 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc) &&
708 !packets_have_beginning_end_cs) {
709 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
710 "Unsupported stream: discarded packets have "
711 "default clock snapshots, but packets have no "
712 "beginning and/or end default clock snapshots: "
713 "stream-addr=%p, "
714 "stream-id=%" PRIu64 ", "
715 "stream-name=\"%s\"",
716 ir_stream, bt_stream_get_id(ir_stream),
717 bt_stream_get_name(ir_stream));
718 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
719 goto end;
720 }
721
722 stream = borrow_stream(fs_sink, ir_stream);
723 if (!stream) {
724 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
725 "Failed to borrow stream.");
726 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
727 goto end;
728 }
729
730 BT_COMP_LOGI("Created new, empty stream file: "
731 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
732 "trace-name=\"%s\", path=\"%s/%s\"",
733 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
734 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
735 stream->trace->path->str, stream->file_name->str);
736
737 end:
738 return status;
739 }
740
741 static inline
742 bt_component_class_sink_consume_method_status handle_stream_end_msg(
743 struct fs_sink_comp *fs_sink, const bt_message *msg)
744 {
745 bt_component_class_sink_consume_method_status status =
746 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
747 const bt_stream *ir_stream =
748 bt_message_stream_end_borrow_stream_const(msg);
749 struct fs_sink_stream *stream;
750
751 stream = borrow_stream(fs_sink, ir_stream);
752 if (!stream) {
753 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
754 "Failed to borrow stream.");
755 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
756 goto end;
757 }
758
759 if (G_UNLIKELY(!stream->sc->has_packets &&
760 stream->packet_state.is_open)) {
761 /* Close stream's current artificial packet */
762 int ret = fs_sink_stream_close_packet(stream, NULL);
763
764 if (ret) {
765 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
766 "Failed to close packet.");
767 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
768 goto end;
769 }
770 }
771
772 BT_COMP_LOGI("Closing stream file: "
773 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
774 "trace-name=\"%s\", path=\"%s/%s\"",
775 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
776 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
777 stream->trace->path->str, stream->file_name->str);
778
779 /*
780 * This destroys the stream object and frees all its resources,
781 * closing the stream file.
782 */
783 g_hash_table_remove(stream->trace->streams, ir_stream);
784
785 end:
786 return status;
787 }
788
789 static inline
790 bt_component_class_sink_consume_method_status handle_discarded_events_msg(
791 struct fs_sink_comp *fs_sink, const bt_message *msg)
792 {
793 bt_component_class_sink_consume_method_status status =
794 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
795 const bt_stream *ir_stream =
796 bt_message_discarded_events_borrow_stream_const(msg);
797 struct fs_sink_stream *stream;
798 const bt_clock_snapshot *cs = NULL;
799 bt_property_availability avail;
800 uint64_t count;
801
802 stream = borrow_stream(fs_sink, ir_stream);
803 if (!stream) {
804 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
805 "Failed to borrow stream.");
806 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
807 goto end;
808 }
809
810 if (fs_sink->ignore_discarded_events) {
811 BT_COMP_LOGI("Ignoring discarded events message: "
812 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
813 "trace-name=\"%s\", path=\"%s/%s\"",
814 bt_stream_get_id(ir_stream),
815 bt_stream_get_name(ir_stream),
816 bt_trace_get_name(
817 bt_stream_borrow_trace_const(ir_stream)),
818 stream->trace->path->str, stream->file_name->str);
819 goto end;
820 }
821
822 if (stream->discarded_events_state.in_range) {
823 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
824 "Unsupported contiguous discarded events message: "
825 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
826 "trace-name=\"%s\", path=\"%s/%s\"",
827 bt_stream_get_id(ir_stream),
828 bt_stream_get_name(ir_stream),
829 bt_trace_get_name(
830 bt_stream_borrow_trace_const(ir_stream)),
831 stream->trace->path->str, stream->file_name->str);
832 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
833 goto end;
834 }
835
836 /*
837 * If we're currently in an opened packet (got a packet
838 * beginning message, but no packet end message yet), we do not
839 * support having a discarded events message with a time range
840 * because we require that the discarded events message's time
841 * range go from a packet's end time to the next packet's end
842 * time.
843 */
844 if (stream->packet_state.is_open &&
845 stream->sc->discarded_events_has_ts) {
846 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
847 "Unsupported discarded events message with "
848 "default clock snapshots occurring within a packet: "
849 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
850 "trace-name=\"%s\", path=\"%s/%s\"",
851 bt_stream_get_id(ir_stream),
852 bt_stream_get_name(ir_stream),
853 bt_trace_get_name(
854 bt_stream_borrow_trace_const(ir_stream)),
855 stream->trace->path->str, stream->file_name->str);
856 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
857 goto end;
858 }
859
860 if (stream->sc->discarded_events_has_ts) {
861 /*
862 * Make the stream's state be in the time range of a
863 * discarded events message since we have the message's
864 * time range (`stream->sc->discarded_events_has_ts`).
865 */
866 stream->discarded_events_state.in_range = true;
867
868 /*
869 * The clock snapshot values will be validated when
870 * handling the next packet beginning and end messages
871 * (next calls to handle_packet_beginning_msg() and
872 * handle_packet_end_msg()).
873 */
874 cs = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
875 msg);
876 BT_ASSERT(cs);
877 stream->discarded_events_state.beginning_cs =
878 bt_clock_snapshot_get_value(cs);
879 cs = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(
880 msg);
881 BT_ASSERT(cs);
882 stream->discarded_events_state.end_cs = bt_clock_snapshot_get_value(cs);
883 }
884
885 avail = bt_message_discarded_events_get_count(msg, &count);
886 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
887 /*
888 * There's no specific count of discarded events: set it
889 * to 1 so that we know that we at least discarded
890 * something.
891 */
892 count = 1;
893 }
894
895 stream->packet_state.discarded_events_counter += count;
896
897 end:
898 return status;
899 }
900
901 static inline
902 bt_component_class_sink_consume_method_status handle_discarded_packets_msg(
903 struct fs_sink_comp *fs_sink, const bt_message *msg)
904 {
905 bt_component_class_sink_consume_method_status status =
906 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
907 const bt_stream *ir_stream =
908 bt_message_discarded_packets_borrow_stream_const(msg);
909 struct fs_sink_stream *stream;
910 const bt_clock_snapshot *cs = NULL;
911 bt_property_availability avail;
912 uint64_t count;
913
914 stream = borrow_stream(fs_sink, ir_stream);
915 if (!stream) {
916 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
917 "Failed to borrow stream.");
918 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
919 goto end;
920 }
921
922 if (fs_sink->ignore_discarded_packets) {
923 BT_COMP_LOGI("Ignoring discarded packets message: "
924 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
925 "trace-name=\"%s\", path=\"%s/%s\"",
926 bt_stream_get_id(ir_stream),
927 bt_stream_get_name(ir_stream),
928 bt_trace_get_name(
929 bt_stream_borrow_trace_const(ir_stream)),
930 stream->trace->path->str, stream->file_name->str);
931 goto end;
932 }
933
934 if (stream->discarded_packets_state.in_range) {
935 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
936 "Unsupported contiguous discarded packets message: "
937 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
938 "trace-name=\"%s\", path=\"%s/%s\"",
939 bt_stream_get_id(ir_stream),
940 bt_stream_get_name(ir_stream),
941 bt_trace_get_name(
942 bt_stream_borrow_trace_const(ir_stream)),
943 stream->trace->path->str, stream->file_name->str);
944 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
945 goto end;
946 }
947
948 /*
949 * Discarded packets messages are guaranteed to occur between
950 * packets.
951 */
952 BT_ASSERT(!stream->packet_state.is_open);
953
954 if (stream->sc->discarded_packets_has_ts) {
955 /*
956 * Make the stream's state be in the time range of a
957 * discarded packets message since we have the message's
958 * time range (`stream->sc->discarded_packets_has_ts`).
959 */
960 stream->discarded_packets_state.in_range = true;
961
962 /*
963 * The clock snapshot values will be validated when
964 * handling the next packet beginning message (next call
965 * to handle_packet_beginning_msg()).
966 */
967 cs = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
968 msg);
969 BT_ASSERT(cs);
970 stream->discarded_packets_state.beginning_cs =
971 bt_clock_snapshot_get_value(cs);
972 cs = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(
973 msg);
974 BT_ASSERT(cs);
975 stream->discarded_packets_state.end_cs =
976 bt_clock_snapshot_get_value(cs);
977 }
978
979 avail = bt_message_discarded_packets_get_count(msg, &count);
980 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
981 /*
982 * There's no specific count of discarded packets: set
983 * it to 1 so that we know that we at least discarded
984 * something.
985 */
986 count = 1;
987 }
988
989 stream->packet_state.seq_num += count;
990
991 end:
992 return status;
993 }
994
995 static inline
996 void put_messages(bt_message_array_const msgs, uint64_t count)
997 {
998 uint64_t i;
999
1000 for (i = 0; i < count; i++) {
1001 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
1002 }
1003 }
1004
1005 BT_HIDDEN
1006 bt_component_class_sink_consume_method_status ctf_fs_sink_consume(
1007 bt_self_component_sink *self_comp)
1008 {
1009 bt_component_class_sink_consume_method_status status =
1010 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
1011 struct fs_sink_comp *fs_sink;
1012 bt_message_iterator_next_status next_status;
1013 uint64_t msg_count = 0;
1014 bt_message_array_const msgs;
1015
1016 fs_sink = (fs_sink_comp *) bt_self_component_get_data(
1017 bt_self_component_sink_as_self_component(self_comp));
1018 BT_ASSERT_DBG(fs_sink);
1019 BT_ASSERT_DBG(fs_sink->upstream_iter);
1020
1021 /* Consume messages */
1022 next_status = bt_message_iterator_next(
1023 fs_sink->upstream_iter, &msgs, &msg_count);
1024 if (next_status < 0) {
1025 status = (bt_component_class_sink_consume_method_status) next_status;
1026 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
1027 "Failed to get next message from upstream iterator.");
1028 goto end;
1029 }
1030
1031 switch (next_status) {
1032 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK:
1033 {
1034 uint64_t i;
1035
1036 for (i = 0; i < msg_count; i++) {
1037 const bt_message *msg = msgs[i];
1038
1039 BT_ASSERT_DBG(msg);
1040
1041 switch (bt_message_get_type(msg)) {
1042 case BT_MESSAGE_TYPE_EVENT:
1043 status = handle_event_msg(fs_sink, msg);
1044 break;
1045 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
1046 status = handle_packet_beginning_msg(
1047 fs_sink, msg);
1048 break;
1049 case BT_MESSAGE_TYPE_PACKET_END:
1050 status = handle_packet_end_msg(
1051 fs_sink, msg);
1052 break;
1053 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
1054 /* Ignore */
1055 BT_COMP_LOGD_STR("Ignoring message iterator inactivity message.");
1056 break;
1057 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1058 status = handle_stream_beginning_msg(
1059 fs_sink, msg);
1060 break;
1061 case BT_MESSAGE_TYPE_STREAM_END:
1062 status = handle_stream_end_msg(
1063 fs_sink, msg);
1064 break;
1065 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1066 status = handle_discarded_events_msg(
1067 fs_sink, msg);
1068 break;
1069 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1070 status = handle_discarded_packets_msg(
1071 fs_sink, msg);
1072 break;
1073 default:
1074 bt_common_abort();
1075 }
1076
1077 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
1078
1079 if (status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK) {
1080 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
1081 "Failed to handle message: "
1082 "generated CTF traces could be incomplete: "
1083 "output-dir-path=\"%s\"",
1084 fs_sink->output_dir_path->str);
1085 goto error;
1086 }
1087 }
1088
1089 break;
1090 }
1091 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN:
1092 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN;
1093 break;
1094 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END:
1095 /* TODO: Finalize all traces (should already be done?) */
1096 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END;
1097 break;
1098 default:
1099 break;
1100 }
1101
1102 goto end;
1103
1104 error:
1105 BT_ASSERT(status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK);
1106 put_messages(msgs, msg_count);
1107
1108 end:
1109 return status;
1110 }
1111
1112 BT_HIDDEN
1113 bt_component_class_sink_graph_is_configured_method_status
1114 ctf_fs_sink_graph_is_configured(
1115 bt_self_component_sink *self_comp)
1116 {
1117 bt_component_class_sink_graph_is_configured_method_status status;
1118 bt_message_iterator_create_from_sink_component_status
1119 msg_iter_status;
1120 fs_sink_comp *fs_sink = (fs_sink_comp *) bt_self_component_get_data(
1121 bt_self_component_sink_as_self_component(self_comp));
1122
1123 msg_iter_status =
1124 bt_message_iterator_create_from_sink_component(
1125 self_comp,
1126 bt_self_component_sink_borrow_input_port_by_name(
1127 self_comp, in_port_name), &fs_sink->upstream_iter);
1128 if (msg_iter_status != BT_MESSAGE_ITERATOR_CREATE_FROM_SINK_COMPONENT_STATUS_OK) {
1129 status = (bt_component_class_sink_graph_is_configured_method_status) msg_iter_status;
1130 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
1131 "Failed to create upstream iterator.");
1132 goto end;
1133 }
1134
1135 status = BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK;
1136 end:
1137 return status;
1138 }
1139
1140 BT_HIDDEN
1141 void ctf_fs_sink_finalize(bt_self_component_sink *self_comp)
1142 {
1143 fs_sink_comp *fs_sink = (fs_sink_comp *) bt_self_component_get_data(
1144 bt_self_component_sink_as_self_component(self_comp));
1145
1146 destroy_fs_sink_comp(fs_sink);
1147 }
This page took 0.050939 seconds and 3 git commands to generate.