.gitignore: add some missing files
[babeltrace.git] / src / plugins / ctf / fs-sink / fs-sink.cpp
... / ...
CommitLineData
1/*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
5 */
6
7#include <glib.h>
8#include <stdio.h>
9
10#include <babeltrace2/babeltrace.h>
11
12#include "common/assert.h"
13#include "cpp-common/bt2/wrap.hpp"
14#include "cpp-common/vendor/fmt/format.h"
15#include "ctfser/ctfser.h"
16
17#include "plugins/common/param-validation/param-validation.h"
18
19#include "fs-sink-ctf-meta.hpp"
20#include "fs-sink-stream.hpp"
21#include "fs-sink-trace.hpp"
22#include "fs-sink.hpp"
23#include "translate-trace-ir-to-ctf-ir.hpp"
24
25static const char * const in_port_name = "in";
26
27static bt_component_class_initialize_method_status
28ensure_output_dir_exists(struct fs_sink_comp *fs_sink)
29{
30 bt_component_class_initialize_method_status status =
31 BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
32 int ret;
33
34 ret = g_mkdir_with_parents(fs_sink->output_dir_path->str, 0755);
35 if (ret) {
36 BT_CPPLOGE_ERRNO_APPEND_CAUSE_SPEC(
37 fs_sink->logger, "Cannot create directories for output directory",
38 ": output-dir-path=\"{}\"", fs_sink->output_dir_path->str);
39 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
40 goto end;
41 }
42
43end:
44 return status;
45}
46
47static bt_param_validation_map_value_entry_descr fs_sink_params_descr[] = {
48 {"path", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
49 bt_param_validation_value_descr::makeString()},
50 {"assume-single-trace", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
51 bt_param_validation_value_descr::makeBool()},
52 {"ignore-discarded-events", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
53 bt_param_validation_value_descr::makeBool()},
54 {"ignore-discarded-packets", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
55 bt_param_validation_value_descr::makeBool()},
56 {"quiet", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
57 bt_param_validation_value_descr::makeBool()},
58 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
59
60static bt_component_class_initialize_method_status configure_component(struct fs_sink_comp *fs_sink,
61 const bt_value *params)
62{
63 bt_component_class_initialize_method_status status;
64 const bt_value *value;
65 enum bt_param_validation_status validation_status;
66 gchar *validation_error;
67
68 validation_status =
69 bt_param_validation_validate(params, fs_sink_params_descr, &validation_error);
70 if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
71 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
72 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "{}", validation_error);
73 goto end;
74 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
75 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
76 goto end;
77 }
78
79 value = bt_value_map_borrow_entry_value_const(params, "path");
80 g_string_assign(fs_sink->output_dir_path, bt_value_string_get(value));
81
82 value = bt_value_map_borrow_entry_value_const(params, "assume-single-trace");
83 if (value) {
84 fs_sink->assume_single_trace = (bool) bt_value_bool_get(value);
85 }
86
87 value = bt_value_map_borrow_entry_value_const(params, "ignore-discarded-events");
88 if (value) {
89 fs_sink->ignore_discarded_events = (bool) bt_value_bool_get(value);
90 }
91
92 value = bt_value_map_borrow_entry_value_const(params, "ignore-discarded-packets");
93 if (value) {
94 fs_sink->ignore_discarded_packets = (bool) bt_value_bool_get(value);
95 }
96
97 value = bt_value_map_borrow_entry_value_const(params, "quiet");
98 if (value) {
99 fs_sink->quiet = (bool) bt_value_bool_get(value);
100 }
101
102 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
103
104end:
105 g_free(validation_error);
106 return status;
107}
108
109static void destroy_fs_sink_comp(struct fs_sink_comp *fs_sink)
110{
111 if (!fs_sink) {
112 goto end;
113 }
114
115 if (fs_sink->output_dir_path) {
116 g_string_free(fs_sink->output_dir_path, TRUE);
117 fs_sink->output_dir_path = NULL;
118 }
119
120 if (fs_sink->traces) {
121 g_hash_table_destroy(fs_sink->traces);
122 fs_sink->traces = NULL;
123 }
124
125 BT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(fs_sink->upstream_iter);
126 delete fs_sink;
127
128end:
129 return;
130}
131
132bt_component_class_initialize_method_status ctf_fs_sink_init(bt_self_component_sink *self_comp_sink,
133 bt_self_component_sink_configuration *,
134 const bt_value *params, void *)
135{
136 try {
137 bt_component_class_initialize_method_status status;
138 bt_self_component_add_port_status add_port_status;
139 struct fs_sink_comp *fs_sink = NULL;
140
141 fs_sink = new fs_sink_comp {bt2::wrap(self_comp_sink)};
142 fs_sink->output_dir_path = g_string_new(NULL);
143 status = configure_component(fs_sink, params);
144 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
145 /* configure_component() logs errors */
146 goto end;
147 }
148
149 if (fs_sink->assume_single_trace &&
150 g_file_test(fs_sink->output_dir_path->str, G_FILE_TEST_EXISTS)) {
151 BT_CPPLOGE_APPEND_CAUSE_SPEC(
152 fs_sink->logger, "Single trace mode, but output path exists: output-path=\"{}\"",
153 fs_sink->output_dir_path->str);
154 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
155 goto end;
156 }
157
158 status = ensure_output_dir_exists(fs_sink);
159 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
160 /* ensure_output_dir_exists() logs errors */
161 goto end;
162 }
163
164 fs_sink->traces = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL,
165 (GDestroyNotify) fs_sink_trace_destroy);
166 if (!fs_sink->traces) {
167 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to allocate one GHashTable.");
168 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
169 goto end;
170 }
171
172 add_port_status =
173 bt_self_component_sink_add_input_port(self_comp_sink, in_port_name, NULL, NULL);
174 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
175 status = (bt_component_class_initialize_method_status) add_port_status;
176 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to add input port.");
177 goto end;
178 }
179
180 bt_self_component_set_data(bt_self_component_sink_as_self_component(self_comp_sink),
181 fs_sink);
182
183end:
184 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
185 destroy_fs_sink_comp(fs_sink);
186 }
187
188 return status;
189 } catch (const std::bad_alloc&) {
190 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
191 } catch (const bt2::Error&) {
192 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
193 }
194}
195
196static inline struct fs_sink_stream *borrow_stream(struct fs_sink_comp *fs_sink,
197 const bt_stream *ir_stream)
198{
199 const bt_trace *ir_trace = bt_stream_borrow_trace_const(ir_stream);
200 struct fs_sink_trace *trace;
201 struct fs_sink_stream *stream = NULL;
202
203 trace = (fs_sink_trace *) g_hash_table_lookup(fs_sink->traces, ir_trace);
204 if (G_UNLIKELY(!trace)) {
205 if (fs_sink->assume_single_trace && g_hash_table_size(fs_sink->traces) > 0) {
206 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger,
207 "Single trace mode, but getting more than one trace: "
208 "stream-name=\"{}\"",
209 bt2c::maybeNull(bt_stream_get_name(ir_stream)));
210 goto end;
211 }
212
213 trace = fs_sink_trace_create(fs_sink, ir_trace);
214 if (!trace) {
215 goto end;
216 }
217 }
218
219 stream = (fs_sink_stream *) g_hash_table_lookup(trace->streams, ir_stream);
220 if (G_UNLIKELY(!stream)) {
221 stream = fs_sink_stream_create(trace, ir_stream);
222 if (!stream) {
223 goto end;
224 }
225 }
226
227end:
228 return stream;
229}
230
231static inline bt_component_class_sink_consume_method_status
232handle_event_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
233{
234 try {
235 int ret;
236 bt_component_class_sink_consume_method_status status =
237 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
238 const bt_event *ir_event = bt_message_event_borrow_event_const(msg);
239 const bt_stream *ir_stream = bt_event_borrow_stream_const(ir_event);
240 struct fs_sink_stream *stream;
241 struct fs_sink_ctf_event_class *ec = NULL;
242 const bt_clock_snapshot *cs = NULL;
243
244 stream = borrow_stream(fs_sink, ir_stream);
245 if (G_UNLIKELY(!stream)) {
246 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to borrow stream.");
247 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
248 goto end;
249 }
250
251 ret = try_translate_event_class_trace_ir_to_ctf_ir(
252 fs_sink, stream->sc, bt_event_borrow_class_const(ir_event), &ec);
253 if (ret) {
254 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger,
255 "Failed to translate event class to CTF IR.");
256 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
257 goto end;
258 }
259
260 BT_ASSERT_DBG(ec);
261
262 if (stream->sc->default_clock_class) {
263 cs = bt_message_event_borrow_default_clock_snapshot_const(msg);
264 }
265
266 /*
267 * If this event's stream does not support packets, then we
268 * lazily create artificial packets.
269 *
270 * The size of an artificial packet is arbitrarily at least
271 * 4 MiB (it usually is greater because we close it when
272 * comes the time to write a new event and the packet's content
273 * size is >= 4 MiB), except the last one which can be smaller.
274 */
275 if (G_UNLIKELY(!stream->sc->has_packets)) {
276 if (stream->packet_state.is_open &&
277 bt_ctfser_get_offset_in_current_packet_bits(&stream->ctfser) / 8 >=
278 4 * 1024 * 1024) {
279 /*
280 * Stream's current packet is larger than 4 MiB:
281 * close it. A new packet will be opened just
282 * below.
283 */
284 ret = fs_sink_stream_close_packet(stream, NULL);
285 if (ret) {
286 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to close packet.");
287 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
288 goto end;
289 }
290 }
291
292 if (!stream->packet_state.is_open) {
293 /* Stream's packet is not currently opened: open it */
294 ret = fs_sink_stream_open_packet(stream, NULL, NULL);
295 if (ret) {
296 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to open packet.");
297 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
298 goto end;
299 }
300 }
301 }
302
303 BT_ASSERT_DBG(stream->packet_state.is_open);
304 ret = fs_sink_stream_write_event(stream, cs, ir_event, ec);
305 if (G_UNLIKELY(ret)) {
306 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to write event.");
307 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
308 goto end;
309 }
310
311end:
312 return status;
313 } catch (const std::bad_alloc&) {
314 return BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_MEMORY_ERROR;
315 } catch (const bt2::Error&) {
316 return BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
317 }
318}
319
320static inline bt_component_class_sink_consume_method_status
321handle_packet_beginning_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
322{
323 int ret;
324 bt_component_class_sink_consume_method_status status =
325 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
326 const bt_packet *ir_packet = bt_message_packet_beginning_borrow_packet_const(msg);
327 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
328 struct fs_sink_stream *stream;
329 const bt_clock_snapshot *cs = NULL;
330
331 stream = borrow_stream(fs_sink, ir_stream);
332 if (G_UNLIKELY(!stream)) {
333 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to borrow stream.");
334 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
335 goto end;
336 }
337
338 if (stream->sc->packets_have_ts_begin) {
339 cs = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
340 BT_ASSERT(cs);
341 }
342
343 /*
344 * If we previously received a discarded events message with
345 * a time range, make sure that its beginning time matches what's
346 * expected for CTF 1.8, that is:
347 *
348 * * Its beginning time is the previous packet's end
349 * time (or the current packet's beginning time if
350 * this is the first packet).
351 *
352 * We check this here instead of in handle_packet_end_msg()
353 * because we want to catch any incompatible message as early as
354 * possible to report the error.
355 *
356 * Validation of the discarded events message's end time is
357 * performed in handle_packet_end_msg().
358 */
359 if (stream->discarded_events_state.in_range) {
360 uint64_t expected_cs;
361
362 /*
363 * `stream->discarded_events_state.in_range` is only set
364 * when the stream class's discarded events have a time
365 * range.
366 *
367 * It is required that the packet beginning and end
368 * messages for this stream class have times when
369 * discarded events have a time range.
370 */
371 BT_ASSERT(stream->sc->discarded_events_has_ts);
372 BT_ASSERT(stream->sc->packets_have_ts_begin);
373 BT_ASSERT(stream->sc->packets_have_ts_end);
374
375 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
376 /* We're opening the first packet */
377 expected_cs = bt_clock_snapshot_get_value(cs);
378 } else {
379 expected_cs = stream->prev_packet_state.end_cs;
380 }
381
382 if (stream->discarded_events_state.beginning_cs != expected_cs) {
383 BT_CPPLOGE_APPEND_CAUSE_SPEC(
384 fs_sink->logger,
385 "Incompatible discarded events message: "
386 "unexpected beginning time: "
387 "beginning-cs-val={}, "
388 "expected-beginning-cs-val={}, "
389 "stream-id={}, stream-name=\"{}\", "
390 "trace-name=\"{}\", path=\"{}/{}\"",
391 stream->discarded_events_state.beginning_cs, expected_cs,
392 bt_stream_get_id(ir_stream), bt2c::maybeNull(bt_stream_get_name(ir_stream)),
393 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream))),
394 stream->trace->path->str, stream->file_name->str);
395 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
396 goto end;
397 }
398 }
399
400 /*
401 * If we previously received a discarded packets message with a
402 * time range, make sure that its beginning and end times match
403 * what's expected for CTF 1.8, that is:
404 *
405 * * Its beginning time is the previous packet's end time.
406 *
407 * * Its end time is the current packet's beginning time.
408 */
409 if (stream->discarded_packets_state.in_range) {
410 uint64_t expected_end_cs;
411
412 /*
413 * `stream->discarded_packets_state.in_range` is only
414 * set when the stream class's discarded packets have a
415 * time range.
416 *
417 * It is required that the packet beginning and end
418 * messages for this stream class have times when
419 * discarded packets have a time range.
420 */
421 BT_ASSERT(stream->sc->discarded_packets_has_ts);
422 BT_ASSERT(stream->sc->packets_have_ts_begin);
423 BT_ASSERT(stream->sc->packets_have_ts_end);
424
425 /*
426 * It is not supported to have a discarded packets
427 * message _before_ the first packet: we cannot validate
428 * that its beginning time is compatible with CTF 1.8 in
429 * this case.
430 */
431 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
432 BT_CPPLOGE_APPEND_CAUSE_SPEC(
433 fs_sink->logger,
434 "Incompatible discarded packets message "
435 "occurring before the stream's first packet: "
436 "stream-id={}, stream-name=\"{}\", "
437 "trace-name=\"{}\", path=\"{}/{}\"",
438 bt_stream_get_id(ir_stream), bt2c::maybeNull(bt_stream_get_name(ir_stream)),
439 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream))),
440 stream->trace->path->str, stream->file_name->str);
441 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
442 goto end;
443 }
444
445 if (stream->discarded_packets_state.beginning_cs != stream->prev_packet_state.end_cs) {
446 BT_CPPLOGE_APPEND_CAUSE_SPEC(
447 fs_sink->logger,
448 "Incompatible discarded packets message: "
449 "unexpected beginning time: "
450 "beginning-cs-val={}, "
451 "expected-beginning-cs-val={}, "
452 "stream-id={}, stream-name=\"{}\", "
453 "trace-name=\"{}\", path=\"{}/{}\"",
454 stream->discarded_packets_state.beginning_cs, stream->prev_packet_state.end_cs,
455 bt_stream_get_id(ir_stream), bt2c::maybeNull(bt_stream_get_name(ir_stream)),
456 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream))),
457 stream->trace->path->str, stream->file_name->str);
458 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
459 goto end;
460 }
461
462 expected_end_cs = bt_clock_snapshot_get_value(cs);
463
464 if (stream->discarded_packets_state.end_cs != expected_end_cs) {
465 BT_CPPLOGE_APPEND_CAUSE_SPEC(
466 fs_sink->logger,
467 "Incompatible discarded packets message: "
468 "unexpected end time: "
469 "end-cs-val={}, "
470 "expected-end-cs-val={}, "
471 "stream-id={}, stream-name=\"{}\", "
472 "trace-name=\"{}\", path=\"{}/{}\"",
473 stream->discarded_packets_state.end_cs, expected_end_cs,
474 bt_stream_get_id(ir_stream), bt2c::maybeNull(bt_stream_get_name(ir_stream)),
475 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream))),
476 stream->trace->path->str, stream->file_name->str);
477 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
478 goto end;
479 }
480 }
481
482 /*
483 * We're not in a discarded packets time range anymore since we
484 * require that the discarded packets time ranges go from one
485 * packet's end time to the next packet's beginning time, and
486 * we're handling a packet beginning message here.
487 */
488 stream->discarded_packets_state.in_range = false;
489
490 ret = fs_sink_stream_open_packet(stream, cs, ir_packet);
491 if (ret) {
492 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to open packet.");
493 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
494 goto end;
495 }
496
497end:
498 return status;
499}
500
501static inline bt_component_class_sink_consume_method_status
502handle_packet_end_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
503{
504 int ret;
505 bt_component_class_sink_consume_method_status status =
506 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
507 const bt_packet *ir_packet = bt_message_packet_end_borrow_packet_const(msg);
508 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
509 struct fs_sink_stream *stream;
510 const bt_clock_snapshot *cs = NULL;
511
512 stream = borrow_stream(fs_sink, ir_stream);
513 if (G_UNLIKELY(!stream)) {
514 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to borrow stream.");
515 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
516 goto end;
517 }
518
519 if (stream->sc->packets_have_ts_end) {
520 cs = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
521 BT_ASSERT(cs);
522 }
523
524 /*
525 * If we previously received a discarded events message with
526 * a time range, make sure that its end time matches what's
527 * expected for CTF 1.8, that is:
528 *
529 * * Its end time is the current packet's end time.
530 *
531 * Validation of the discarded events message's beginning time
532 * is performed in handle_packet_beginning_msg().
533 */
534 if (stream->discarded_events_state.in_range) {
535 uint64_t expected_cs;
536
537 /*
538 * `stream->discarded_events_state.in_range` is only set
539 * when the stream class's discarded events have a time
540 * range.
541 *
542 * It is required that the packet beginning and end
543 * messages for this stream class have times when
544 * discarded events have a time range.
545 */
546 BT_ASSERT(stream->sc->discarded_events_has_ts);
547 BT_ASSERT(stream->sc->packets_have_ts_begin);
548 BT_ASSERT(stream->sc->packets_have_ts_end);
549
550 expected_cs = bt_clock_snapshot_get_value(cs);
551
552 if (stream->discarded_events_state.end_cs != expected_cs) {
553 BT_CPPLOGE_APPEND_CAUSE_SPEC(
554 fs_sink->logger,
555 "Incompatible discarded events message: "
556 "unexpected end time: "
557 "end-cs-val={}, "
558 "expected-end-cs-val={}, "
559 "stream-id={}, stream-name=\"{}\", "
560 "trace-name=\"{}\", path=\"{}/{}\"",
561 stream->discarded_events_state.end_cs, expected_cs, bt_stream_get_id(ir_stream),
562 bt2c::maybeNull(bt_stream_get_name(ir_stream)),
563 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream))),
564 stream->trace->path->str, stream->file_name->str);
565 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
566 goto end;
567 }
568 }
569
570 ret = fs_sink_stream_close_packet(stream, cs);
571 if (ret) {
572 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to close packet.");
573 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
574 goto end;
575 }
576
577 /*
578 * We're not in a discarded events time range anymore since we
579 * require that the discarded events time ranges go from one
580 * packet's end time to the next packet's end time, and we're
581 * handling a packet end message here.
582 */
583 stream->discarded_events_state.in_range = false;
584
585end:
586 return status;
587}
588
589static inline bt_component_class_sink_consume_method_status
590handle_stream_beginning_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
591{
592 bt_component_class_sink_consume_method_status status =
593 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
594 const bt_stream *ir_stream = bt_message_stream_beginning_borrow_stream_const(msg);
595 const bt_stream_class *ir_sc = bt_stream_borrow_class_const(ir_stream);
596 struct fs_sink_stream *stream;
597 bool packets_have_beginning_end_cs =
598 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc) &&
599 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc);
600
601 /*
602 * Not supported: discarded events or discarded packets support
603 * without packets support. Packets are the way to know where
604 * discarded events/packets occurred in CTF 1.8.
605 */
606 if (!bt_stream_class_supports_packets(ir_sc)) {
607 BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc));
608
609 if (!fs_sink->ignore_discarded_events && bt_stream_class_supports_discarded_events(ir_sc)) {
610 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger,
611 "Unsupported stream: "
612 "stream does not support packets, "
613 "but supports discarded events: "
614 "stream-addr={}, "
615 "stream-id={}, "
616 "stream-name=\"{}\"",
617 fmt::ptr(ir_stream), bt_stream_get_id(ir_stream),
618 bt2c::maybeNull(bt_stream_get_name(ir_stream)));
619 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
620 goto end;
621 }
622 }
623
624 /*
625 * Not supported: discarded events with default clock snapshots,
626 * but packet beginning/end without default clock snapshot.
627 */
628 if (!fs_sink->ignore_discarded_events &&
629 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc) &&
630 !packets_have_beginning_end_cs) {
631 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger,
632 "Unsupported stream: discarded events have "
633 "default clock snapshots, but packets have no "
634 "beginning and/or end default clock snapshots: "
635 "stream-addr={}, "
636 "stream-id={}, "
637 "stream-name=\"{}\"",
638 fmt::ptr(ir_stream), bt_stream_get_id(ir_stream),
639 bt2c::maybeNull(bt_stream_get_name(ir_stream)));
640 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
641 goto end;
642 }
643
644 /*
645 * Not supported: discarded packets with default clock
646 * snapshots, but packet beginning/end without default clock
647 * snapshot.
648 */
649 if (!fs_sink->ignore_discarded_packets &&
650 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc) &&
651 !packets_have_beginning_end_cs) {
652 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger,
653 "Unsupported stream: discarded packets have "
654 "default clock snapshots, but packets have no "
655 "beginning and/or end default clock snapshots: "
656 "stream-addr={}, "
657 "stream-id={}, "
658 "stream-name=\"{}\"",
659 fmt::ptr(ir_stream), bt_stream_get_id(ir_stream),
660 bt2c::maybeNull(bt_stream_get_name(ir_stream)));
661 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
662 goto end;
663 }
664
665 stream = borrow_stream(fs_sink, ir_stream);
666 if (!stream) {
667 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to borrow stream.");
668 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
669 goto end;
670 }
671
672 BT_CPPLOGI_SPEC(fs_sink->logger,
673 "Created new, empty stream file: "
674 "stream-id={}, stream-name=\"{}\", "
675 "trace-name=\"{}\", path=\"{}/{}\"",
676 bt_stream_get_id(ir_stream), bt2c::maybeNull(bt_stream_get_name(ir_stream)),
677 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream))),
678 stream->trace->path->str, stream->file_name->str);
679
680end:
681 return status;
682}
683
684static inline bt_component_class_sink_consume_method_status
685handle_stream_end_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
686{
687 bt_component_class_sink_consume_method_status status =
688 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
689 const bt_stream *ir_stream = bt_message_stream_end_borrow_stream_const(msg);
690 struct fs_sink_stream *stream;
691
692 stream = borrow_stream(fs_sink, ir_stream);
693 if (!stream) {
694 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to borrow stream.");
695 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
696 goto end;
697 }
698
699 if (G_UNLIKELY(!stream->sc->has_packets && stream->packet_state.is_open)) {
700 /* Close stream's current artificial packet */
701 int ret = fs_sink_stream_close_packet(stream, NULL);
702
703 if (ret) {
704 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to close packet.");
705 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
706 goto end;
707 }
708 }
709
710 BT_CPPLOGI_SPEC(fs_sink->logger,
711 "Closing stream file: "
712 "stream-id={}, stream-name=\"{}\", "
713 "trace-name=\"{}\", path=\"{}/{}\"",
714 bt_stream_get_id(ir_stream), bt2c::maybeNull(bt_stream_get_name(ir_stream)),
715 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream))),
716 stream->trace->path->str, stream->file_name->str);
717
718 /*
719 * This destroys the stream object and frees all its resources,
720 * closing the stream file.
721 */
722 g_hash_table_remove(stream->trace->streams, ir_stream);
723
724end:
725 return status;
726}
727
728static inline bt_component_class_sink_consume_method_status
729handle_discarded_events_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
730{
731 bt_component_class_sink_consume_method_status status =
732 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
733 const bt_stream *ir_stream = bt_message_discarded_events_borrow_stream_const(msg);
734 struct fs_sink_stream *stream;
735 const bt_clock_snapshot *cs = NULL;
736 bt_property_availability avail;
737 uint64_t count;
738
739 stream = borrow_stream(fs_sink, ir_stream);
740 if (!stream) {
741 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to borrow stream.");
742 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
743 goto end;
744 }
745
746 if (fs_sink->ignore_discarded_events) {
747 BT_CPPLOGI_SPEC(fs_sink->logger,
748 "Ignoring discarded events message: "
749 "stream-id={}, stream-name=\"{}\", "
750 "trace-name=\"{}\", path=\"{}/{}\"",
751 bt_stream_get_id(ir_stream), bt2c::maybeNull(bt_stream_get_name(ir_stream)),
752 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream))),
753 stream->trace->path->str, stream->file_name->str);
754 goto end;
755 }
756
757 if (stream->discarded_events_state.in_range) {
758 BT_CPPLOGE_APPEND_CAUSE_SPEC(
759 fs_sink->logger,
760 "Unsupported contiguous discarded events message: "
761 "stream-id={}, stream-name=\"{}\", "
762 "trace-name=\"{}\", path=\"{}/{}\"",
763 bt_stream_get_id(ir_stream), bt2c::maybeNull(bt_stream_get_name(ir_stream)),
764 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream))),
765 stream->trace->path->str, stream->file_name->str);
766 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
767 goto end;
768 }
769
770 /*
771 * If we're currently in an opened packet (got a packet
772 * beginning message, but no packet end message yet), we do not
773 * support having a discarded events message with a time range
774 * because we require that the discarded events message's time
775 * range go from a packet's end time to the next packet's end
776 * time.
777 */
778 if (stream->packet_state.is_open && stream->sc->discarded_events_has_ts) {
779 BT_CPPLOGE_APPEND_CAUSE_SPEC(
780 fs_sink->logger,
781 "Unsupported discarded events message with "
782 "default clock snapshots occurring within a packet: "
783 "stream-id={}, stream-name=\"{}\", "
784 "trace-name=\"{}\", path=\"{}/{}\"",
785 bt_stream_get_id(ir_stream), bt2c::maybeNull(bt_stream_get_name(ir_stream)),
786 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream))),
787 stream->trace->path->str, stream->file_name->str);
788 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
789 goto end;
790 }
791
792 if (stream->sc->discarded_events_has_ts) {
793 /*
794 * Make the stream's state be in the time range of a
795 * discarded events message since we have the message's
796 * time range (`stream->sc->discarded_events_has_ts`).
797 */
798 stream->discarded_events_state.in_range = true;
799
800 /*
801 * The clock snapshot values will be validated when
802 * handling the next packet beginning and end messages
803 * (next calls to handle_packet_beginning_msg() and
804 * handle_packet_end_msg()).
805 */
806 cs = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
807 BT_ASSERT(cs);
808 stream->discarded_events_state.beginning_cs = bt_clock_snapshot_get_value(cs);
809 cs = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg);
810 BT_ASSERT(cs);
811 stream->discarded_events_state.end_cs = bt_clock_snapshot_get_value(cs);
812 }
813
814 avail = bt_message_discarded_events_get_count(msg, &count);
815 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
816 /*
817 * There's no specific count of discarded events: set it
818 * to 1 so that we know that we at least discarded
819 * something.
820 */
821 count = 1;
822 }
823
824 stream->packet_state.discarded_events_counter += count;
825
826end:
827 return status;
828}
829
830static inline bt_component_class_sink_consume_method_status
831handle_discarded_packets_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
832{
833 bt_component_class_sink_consume_method_status status =
834 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
835 const bt_stream *ir_stream = bt_message_discarded_packets_borrow_stream_const(msg);
836 struct fs_sink_stream *stream;
837 const bt_clock_snapshot *cs = NULL;
838 bt_property_availability avail;
839 uint64_t count;
840
841 stream = borrow_stream(fs_sink, ir_stream);
842 if (!stream) {
843 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to borrow stream.");
844 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
845 goto end;
846 }
847
848 if (fs_sink->ignore_discarded_packets) {
849 BT_CPPLOGI_SPEC(fs_sink->logger,
850 "Ignoring discarded packets message: "
851 "stream-id={}, stream-name=\"{}\", "
852 "trace-name=\"{}\", path=\"{}/{}\"",
853 bt_stream_get_id(ir_stream), bt2c::maybeNull(bt_stream_get_name(ir_stream)),
854 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream))),
855 stream->trace->path->str, stream->file_name->str);
856 goto end;
857 }
858
859 if (stream->discarded_packets_state.in_range) {
860 BT_CPPLOGE_APPEND_CAUSE_SPEC(
861 fs_sink->logger,
862 "Unsupported contiguous discarded packets message: "
863 "stream-id={}, stream-name=\"{}\", "
864 "trace-name=\"{}\", path=\"{}/{}\"",
865 bt_stream_get_id(ir_stream), bt2c::maybeNull(bt_stream_get_name(ir_stream)),
866 bt2c::maybeNull(bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream))),
867 stream->trace->path->str, stream->file_name->str);
868 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
869 goto end;
870 }
871
872 /*
873 * Discarded packets messages are guaranteed to occur between
874 * packets.
875 */
876 BT_ASSERT(!stream->packet_state.is_open);
877
878 if (stream->sc->discarded_packets_has_ts) {
879 /*
880 * Make the stream's state be in the time range of a
881 * discarded packets message since we have the message's
882 * time range (`stream->sc->discarded_packets_has_ts`).
883 */
884 stream->discarded_packets_state.in_range = true;
885
886 /*
887 * The clock snapshot values will be validated when
888 * handling the next packet beginning message (next call
889 * to handle_packet_beginning_msg()).
890 */
891 cs = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
892 BT_ASSERT(cs);
893 stream->discarded_packets_state.beginning_cs = bt_clock_snapshot_get_value(cs);
894 cs = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg);
895 BT_ASSERT(cs);
896 stream->discarded_packets_state.end_cs = bt_clock_snapshot_get_value(cs);
897 }
898
899 avail = bt_message_discarded_packets_get_count(msg, &count);
900 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
901 /*
902 * There's no specific count of discarded packets: set
903 * it to 1 so that we know that we at least discarded
904 * something.
905 */
906 count = 1;
907 }
908
909 stream->packet_state.seq_num += count;
910
911end:
912 return status;
913}
914
915static inline void put_messages(bt_message_array_const msgs, uint64_t count)
916{
917 uint64_t i;
918
919 for (i = 0; i < count; i++) {
920 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
921 }
922}
923
924bt_component_class_sink_consume_method_status ctf_fs_sink_consume(bt_self_component_sink *self_comp)
925{
926 bt_component_class_sink_consume_method_status status =
927 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
928 struct fs_sink_comp *fs_sink;
929 bt_message_iterator_next_status next_status;
930 uint64_t msg_count = 0;
931 bt_message_array_const msgs;
932
933 fs_sink = (fs_sink_comp *) bt_self_component_get_data(
934 bt_self_component_sink_as_self_component(self_comp));
935 BT_ASSERT_DBG(fs_sink);
936 BT_ASSERT_DBG(fs_sink->upstream_iter);
937
938 /* Consume messages */
939 next_status = bt_message_iterator_next(fs_sink->upstream_iter, &msgs, &msg_count);
940 if (next_status < 0) {
941 status = (bt_component_class_sink_consume_method_status) next_status;
942 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger,
943 "Failed to get next message from upstream iterator.");
944 goto end;
945 }
946
947 switch (next_status) {
948 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK:
949 {
950 uint64_t i;
951
952 for (i = 0; i < msg_count; i++) {
953 const bt_message *msg = msgs[i];
954
955 BT_ASSERT_DBG(msg);
956
957 switch (bt_message_get_type(msg)) {
958 case BT_MESSAGE_TYPE_EVENT:
959 status = handle_event_msg(fs_sink, msg);
960 break;
961 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
962 status = handle_packet_beginning_msg(fs_sink, msg);
963 break;
964 case BT_MESSAGE_TYPE_PACKET_END:
965 status = handle_packet_end_msg(fs_sink, msg);
966 break;
967 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
968 /* Ignore */
969 BT_CPPLOGD_SPEC(fs_sink->logger, "Ignoring message iterator inactivity message.");
970 break;
971 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
972 status = handle_stream_beginning_msg(fs_sink, msg);
973 break;
974 case BT_MESSAGE_TYPE_STREAM_END:
975 status = handle_stream_end_msg(fs_sink, msg);
976 break;
977 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
978 status = handle_discarded_events_msg(fs_sink, msg);
979 break;
980 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
981 status = handle_discarded_packets_msg(fs_sink, msg);
982 break;
983 default:
984 bt_common_abort();
985 }
986
987 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
988
989 if (status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK) {
990 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger,
991 "Failed to handle message: "
992 "generated CTF traces could be incomplete: "
993 "output-dir-path=\"{}\"",
994 fs_sink->output_dir_path->str);
995 goto error;
996 }
997 }
998
999 break;
1000 }
1001 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN:
1002 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN;
1003 break;
1004 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END:
1005 /* TODO: Finalize all traces (should already be done?) */
1006 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END;
1007 break;
1008 default:
1009 break;
1010 }
1011
1012 goto end;
1013
1014error:
1015 BT_ASSERT(status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK);
1016 put_messages(msgs, msg_count);
1017
1018end:
1019 return status;
1020}
1021
1022bt_component_class_sink_graph_is_configured_method_status
1023ctf_fs_sink_graph_is_configured(bt_self_component_sink *self_comp)
1024{
1025 try {
1026 bt_component_class_sink_graph_is_configured_method_status status;
1027 bt_message_iterator_create_from_sink_component_status msg_iter_status;
1028 fs_sink_comp *fs_sink = (fs_sink_comp *) bt_self_component_get_data(
1029 bt_self_component_sink_as_self_component(self_comp));
1030
1031 msg_iter_status = bt_message_iterator_create_from_sink_component(
1032 self_comp, bt_self_component_sink_borrow_input_port_by_name(self_comp, in_port_name),
1033 &fs_sink->upstream_iter);
1034 if (msg_iter_status != BT_MESSAGE_ITERATOR_CREATE_FROM_SINK_COMPONENT_STATUS_OK) {
1035 status = (bt_component_class_sink_graph_is_configured_method_status) msg_iter_status;
1036 BT_CPPLOGE_APPEND_CAUSE_SPEC(fs_sink->logger, "Failed to create upstream iterator.");
1037 goto end;
1038 }
1039
1040 status = BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK;
1041end:
1042 return status;
1043 } catch (const std::bad_alloc&) {
1044 return BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_MEMORY_ERROR;
1045 } catch (const bt2c::Error&) {
1046 return BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_ERROR;
1047 }
1048}
1049
1050void ctf_fs_sink_finalize(bt_self_component_sink *self_comp)
1051{
1052 fs_sink_comp *fs_sink = (fs_sink_comp *) bt_self_component_get_data(
1053 bt_self_component_sink_as_self_component(self_comp));
1054
1055 destroy_fs_sink_comp(fs_sink);
1056}
This page took 0.027682 seconds and 5 git commands to generate.