e99f130d9b4ecf78b75ab53a82cdc88089d39646
[babeltrace.git] / src / plugins / ctf / fs-sink / fs-sink.cpp
1 /*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
5 */
6
7 #define BT_COMP_LOG_SELF_COMP (fs_sink->self_comp)
8 #define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level)
9 #define BT_LOG_TAG "PLUGIN/SINK.CTF.FS"
10 #include "logging/comp-logging.h"
11
12 #include <babeltrace2/babeltrace.h>
13 #include <stdio.h>
14 #include <stdbool.h>
15 #include <glib.h>
16 #include "common/assert.h"
17 #include "ctfser/ctfser.h"
18 #include "plugins/common/param-validation/param-validation.h"
19
20 #include "fs-sink.hpp"
21 #include "fs-sink-trace.hpp"
22 #include "fs-sink-stream.hpp"
23 #include "fs-sink-ctf-meta.hpp"
24 #include "translate-trace-ir-to-ctf-ir.hpp"
25 #include "translate-ctf-ir-to-tsdl.hpp"
26
27 static const char * const in_port_name = "in";
28
29 static bt_component_class_initialize_method_status
30 ensure_output_dir_exists(struct fs_sink_comp *fs_sink)
31 {
32 bt_component_class_initialize_method_status status =
33 BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
34 int ret;
35
36 ret = g_mkdir_with_parents(fs_sink->output_dir_path->str, 0755);
37 if (ret) {
38 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(fs_sink->self_comp,
39 "Cannot create directories for output directory",
40 ": output-dir-path=\"%s\"", fs_sink->output_dir_path->str);
41 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
42 goto end;
43 }
44
45 end:
46 return status;
47 }
48
49 static bt_param_validation_map_value_entry_descr fs_sink_params_descr[] = {
50 {"path", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
51 bt_param_validation_value_descr::makeString()},
52 {"assume-single-trace", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
53 bt_param_validation_value_descr::makeBool()},
54 {"ignore-discarded-events", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
55 bt_param_validation_value_descr::makeBool()},
56 {"ignore-discarded-packets", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
57 bt_param_validation_value_descr::makeBool()},
58 {"quiet", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
59 bt_param_validation_value_descr::makeBool()},
60 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
61
62 static bt_component_class_initialize_method_status configure_component(struct fs_sink_comp *fs_sink,
63 const bt_value *params)
64 {
65 bt_component_class_initialize_method_status status;
66 const bt_value *value;
67 enum bt_param_validation_status validation_status;
68 gchar *validation_error;
69
70 validation_status =
71 bt_param_validation_validate(params, fs_sink_params_descr, &validation_error);
72 if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
73 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
74 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "%s", validation_error);
75 goto end;
76 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
77 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
78 goto end;
79 }
80
81 value = bt_value_map_borrow_entry_value_const(params, "path");
82 g_string_assign(fs_sink->output_dir_path, bt_value_string_get(value));
83
84 value = bt_value_map_borrow_entry_value_const(params, "assume-single-trace");
85 if (value) {
86 fs_sink->assume_single_trace = (bool) bt_value_bool_get(value);
87 }
88
89 value = bt_value_map_borrow_entry_value_const(params, "ignore-discarded-events");
90 if (value) {
91 fs_sink->ignore_discarded_events = (bool) bt_value_bool_get(value);
92 }
93
94 value = bt_value_map_borrow_entry_value_const(params, "ignore-discarded-packets");
95 if (value) {
96 fs_sink->ignore_discarded_packets = (bool) bt_value_bool_get(value);
97 }
98
99 value = bt_value_map_borrow_entry_value_const(params, "quiet");
100 if (value) {
101 fs_sink->quiet = (bool) bt_value_bool_get(value);
102 }
103
104 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
105
106 end:
107 g_free(validation_error);
108 return status;
109 }
110
111 static void destroy_fs_sink_comp(struct fs_sink_comp *fs_sink)
112 {
113 if (!fs_sink) {
114 goto end;
115 }
116
117 if (fs_sink->output_dir_path) {
118 g_string_free(fs_sink->output_dir_path, TRUE);
119 fs_sink->output_dir_path = NULL;
120 }
121
122 if (fs_sink->traces) {
123 g_hash_table_destroy(fs_sink->traces);
124 fs_sink->traces = NULL;
125 }
126
127 BT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(fs_sink->upstream_iter);
128 g_free(fs_sink);
129
130 end:
131 return;
132 }
133
134 bt_component_class_initialize_method_status ctf_fs_sink_init(bt_self_component_sink *self_comp_sink,
135 bt_self_component_sink_configuration *,
136 const bt_value *params, void *)
137 {
138 bt_component_class_initialize_method_status status;
139 bt_self_component_add_port_status add_port_status;
140 struct fs_sink_comp *fs_sink = NULL;
141 bt_self_component *self_comp = bt_self_component_sink_as_self_component(self_comp_sink);
142 bt_logging_level log_level =
143 bt_component_get_logging_level(bt_self_component_as_component(self_comp));
144
145 fs_sink = g_new0(struct fs_sink_comp, 1);
146 if (!fs_sink) {
147 BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR, log_level, self_comp,
148 "Failed to allocate one CTF FS sink structure.");
149 BT_CURRENT_THREAD_ERROR_APPEND_CAUSE_FROM_COMPONENT(
150 self_comp, "Failed to allocate one CTF FS sink structure.");
151 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
152 goto end;
153 }
154
155 fs_sink->log_level = log_level;
156 fs_sink->self_comp = self_comp;
157 fs_sink->output_dir_path = g_string_new(NULL);
158 status = configure_component(fs_sink, params);
159 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
160 /* configure_component() logs errors */
161 goto end;
162 }
163
164 if (fs_sink->assume_single_trace &&
165 g_file_test(fs_sink->output_dir_path->str, G_FILE_TEST_EXISTS)) {
166 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
167 "Single trace mode, but output path exists: output-path=\"%s\"",
168 fs_sink->output_dir_path->str);
169 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
170 goto end;
171 }
172
173 status = ensure_output_dir_exists(fs_sink);
174 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
175 /* ensure_output_dir_exists() logs errors */
176 goto end;
177 }
178
179 fs_sink->traces = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL,
180 (GDestroyNotify) fs_sink_trace_destroy);
181 if (!fs_sink->traces) {
182 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate one GHashTable.");
183 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
184 goto end;
185 }
186
187 add_port_status =
188 bt_self_component_sink_add_input_port(self_comp_sink, in_port_name, NULL, NULL);
189 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
190 status = (bt_component_class_initialize_method_status) add_port_status;
191 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to add input port.");
192 goto end;
193 }
194
195 bt_self_component_set_data(self_comp, fs_sink);
196
197 end:
198 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
199 destroy_fs_sink_comp(fs_sink);
200 }
201
202 return status;
203 }
204
205 static inline struct fs_sink_stream *borrow_stream(struct fs_sink_comp *fs_sink,
206 const bt_stream *ir_stream)
207 {
208 const bt_trace *ir_trace = bt_stream_borrow_trace_const(ir_stream);
209 struct fs_sink_trace *trace;
210 struct fs_sink_stream *stream = NULL;
211
212 trace = (fs_sink_trace *) g_hash_table_lookup(fs_sink->traces, ir_trace);
213 if (G_UNLIKELY(!trace)) {
214 if (fs_sink->assume_single_trace && g_hash_table_size(fs_sink->traces) > 0) {
215 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
216 "Single trace mode, but getting more than one trace: "
217 "stream-name=\"%s\"",
218 bt_stream_get_name(ir_stream));
219 goto end;
220 }
221
222 trace = fs_sink_trace_create(fs_sink, ir_trace);
223 if (!trace) {
224 goto end;
225 }
226 }
227
228 stream = (fs_sink_stream *) g_hash_table_lookup(trace->streams, ir_stream);
229 if (G_UNLIKELY(!stream)) {
230 stream = fs_sink_stream_create(trace, ir_stream);
231 if (!stream) {
232 goto end;
233 }
234 }
235
236 end:
237 return stream;
238 }
239
240 static inline bt_component_class_sink_consume_method_status
241 handle_event_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
242 {
243 int ret;
244 bt_component_class_sink_consume_method_status status =
245 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
246 const bt_event *ir_event = bt_message_event_borrow_event_const(msg);
247 const bt_stream *ir_stream = bt_event_borrow_stream_const(ir_event);
248 struct fs_sink_stream *stream;
249 struct fs_sink_ctf_event_class *ec = NULL;
250 const bt_clock_snapshot *cs = NULL;
251
252 stream = borrow_stream(fs_sink, ir_stream);
253 if (G_UNLIKELY(!stream)) {
254 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
255 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
256 goto end;
257 }
258
259 ret = try_translate_event_class_trace_ir_to_ctf_ir(fs_sink, stream->sc,
260 bt_event_borrow_class_const(ir_event), &ec);
261 if (ret) {
262 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to translate event class to CTF IR.");
263 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
264 goto end;
265 }
266
267 BT_ASSERT_DBG(ec);
268
269 if (stream->sc->default_clock_class) {
270 cs = bt_message_event_borrow_default_clock_snapshot_const(msg);
271 }
272
273 /*
274 * If this event's stream does not support packets, then we
275 * lazily create artificial packets.
276 *
277 * The size of an artificial packet is arbitrarily at least
278 * 4 MiB (it usually is greater because we close it when
279 * comes the time to write a new event and the packet's content
280 * size is >= 4 MiB), except the last one which can be smaller.
281 */
282 if (G_UNLIKELY(!stream->sc->has_packets)) {
283 if (stream->packet_state.is_open &&
284 bt_ctfser_get_offset_in_current_packet_bits(&stream->ctfser) / 8 >= 4 * 1024 * 1024) {
285 /*
286 * Stream's current packet is larger than 4 MiB:
287 * close it. A new packet will be opened just
288 * below.
289 */
290 ret = fs_sink_stream_close_packet(stream, NULL);
291 if (ret) {
292 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to close packet.");
293 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
294 goto end;
295 }
296 }
297
298 if (!stream->packet_state.is_open) {
299 /* Stream's packet is not currently opened: open it */
300 ret = fs_sink_stream_open_packet(stream, NULL, NULL);
301 if (ret) {
302 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to open packet.");
303 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
304 goto end;
305 }
306 }
307 }
308
309 BT_ASSERT_DBG(stream->packet_state.is_open);
310 ret = fs_sink_stream_write_event(stream, cs, ir_event, ec);
311 if (G_UNLIKELY(ret)) {
312 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to write event.");
313 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
314 goto end;
315 }
316
317 end:
318 return status;
319 }
320
321 static inline bt_component_class_sink_consume_method_status
322 handle_packet_beginning_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
323 {
324 int ret;
325 bt_component_class_sink_consume_method_status status =
326 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
327 const bt_packet *ir_packet = bt_message_packet_beginning_borrow_packet_const(msg);
328 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
329 struct fs_sink_stream *stream;
330 const bt_clock_snapshot *cs = NULL;
331
332 stream = borrow_stream(fs_sink, ir_stream);
333 if (G_UNLIKELY(!stream)) {
334 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
335 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
336 goto end;
337 }
338
339 if (stream->sc->packets_have_ts_begin) {
340 cs = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
341 BT_ASSERT(cs);
342 }
343
344 /*
345 * If we previously received a discarded events message with
346 * a time range, make sure that its beginning time matches what's
347 * expected for CTF 1.8, that is:
348 *
349 * * Its beginning time is the previous packet's end
350 * time (or the current packet's beginning time if
351 * this is the first packet).
352 *
353 * We check this here instead of in handle_packet_end_msg()
354 * because we want to catch any incompatible message as early as
355 * possible to report the error.
356 *
357 * Validation of the discarded events message's end time is
358 * performed in handle_packet_end_msg().
359 */
360 if (stream->discarded_events_state.in_range) {
361 uint64_t expected_cs;
362
363 /*
364 * `stream->discarded_events_state.in_range` is only set
365 * when the stream class's discarded events have a time
366 * range.
367 *
368 * It is required that the packet beginning and end
369 * messages for this stream class have times when
370 * discarded events have a time range.
371 */
372 BT_ASSERT(stream->sc->discarded_events_has_ts);
373 BT_ASSERT(stream->sc->packets_have_ts_begin);
374 BT_ASSERT(stream->sc->packets_have_ts_end);
375
376 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
377 /* We're opening the first packet */
378 expected_cs = bt_clock_snapshot_get_value(cs);
379 } else {
380 expected_cs = stream->prev_packet_state.end_cs;
381 }
382
383 if (stream->discarded_events_state.beginning_cs != expected_cs) {
384 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
385 "Incompatible discarded events message: "
386 "unexpected beginning time: "
387 "beginning-cs-val=%" PRIu64 ", "
388 "expected-beginning-cs-val=%" PRIu64 ", "
389 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
390 "trace-name=\"%s\", path=\"%s/%s\"",
391 stream->discarded_events_state.beginning_cs, expected_cs,
392 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
393 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
394 stream->trace->path->str, stream->file_name->str);
395 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
396 goto end;
397 }
398 }
399
400 /*
401 * If we previously received a discarded packets message with a
402 * time range, make sure that its beginning and end times match
403 * what's expected for CTF 1.8, that is:
404 *
405 * * Its beginning time is the previous packet's end time.
406 *
407 * * Its end time is the current packet's beginning time.
408 */
409 if (stream->discarded_packets_state.in_range) {
410 uint64_t expected_end_cs;
411
412 /*
413 * `stream->discarded_packets_state.in_range` is only
414 * set when the stream class's discarded packets have a
415 * time range.
416 *
417 * It is required that the packet beginning and end
418 * messages for this stream class have times when
419 * discarded packets have a time range.
420 */
421 BT_ASSERT(stream->sc->discarded_packets_has_ts);
422 BT_ASSERT(stream->sc->packets_have_ts_begin);
423 BT_ASSERT(stream->sc->packets_have_ts_end);
424
425 /*
426 * It is not supported to have a discarded packets
427 * message _before_ the first packet: we cannot validate
428 * that its beginning time is compatible with CTF 1.8 in
429 * this case.
430 */
431 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
432 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
433 "Incompatible discarded packets message "
434 "occurring before the stream's first packet: "
435 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
436 "trace-name=\"%s\", path=\"%s/%s\"",
437 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
438 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
439 stream->trace->path->str, stream->file_name->str);
440 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
441 goto end;
442 }
443
444 if (stream->discarded_packets_state.beginning_cs != stream->prev_packet_state.end_cs) {
445 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
446 "Incompatible discarded packets message: "
447 "unexpected beginning time: "
448 "beginning-cs-val=%" PRIu64 ", "
449 "expected-beginning-cs-val=%" PRIu64 ", "
450 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
451 "trace-name=\"%s\", path=\"%s/%s\"",
452 stream->discarded_packets_state.beginning_cs,
453 stream->prev_packet_state.end_cs, bt_stream_get_id(ir_stream),
454 bt_stream_get_name(ir_stream),
455 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
456 stream->trace->path->str, stream->file_name->str);
457 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
458 goto end;
459 }
460
461 expected_end_cs = bt_clock_snapshot_get_value(cs);
462
463 if (stream->discarded_packets_state.end_cs != expected_end_cs) {
464 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
465 "Incompatible discarded packets message: "
466 "unexpected end time: "
467 "end-cs-val=%" PRIu64 ", "
468 "expected-end-cs-val=%" PRIu64 ", "
469 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
470 "trace-name=\"%s\", path=\"%s/%s\"",
471 stream->discarded_packets_state.end_cs, expected_end_cs,
472 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
473 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
474 stream->trace->path->str, stream->file_name->str);
475 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
476 goto end;
477 }
478 }
479
480 /*
481 * We're not in a discarded packets time range anymore since we
482 * require that the discarded packets time ranges go from one
483 * packet's end time to the next packet's beginning time, and
484 * we're handling a packet beginning message here.
485 */
486 stream->discarded_packets_state.in_range = false;
487
488 ret = fs_sink_stream_open_packet(stream, cs, ir_packet);
489 if (ret) {
490 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to open packet.");
491 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
492 goto end;
493 }
494
495 end:
496 return status;
497 }
498
499 static inline bt_component_class_sink_consume_method_status
500 handle_packet_end_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
501 {
502 int ret;
503 bt_component_class_sink_consume_method_status status =
504 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
505 const bt_packet *ir_packet = bt_message_packet_end_borrow_packet_const(msg);
506 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
507 struct fs_sink_stream *stream;
508 const bt_clock_snapshot *cs = NULL;
509
510 stream = borrow_stream(fs_sink, ir_stream);
511 if (G_UNLIKELY(!stream)) {
512 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
513 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
514 goto end;
515 }
516
517 if (stream->sc->packets_have_ts_end) {
518 cs = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
519 BT_ASSERT(cs);
520 }
521
522 /*
523 * If we previously received a discarded events message with
524 * a time range, make sure that its end time matches what's
525 * expected for CTF 1.8, that is:
526 *
527 * * Its end time is the current packet's end time.
528 *
529 * Validation of the discarded events message's beginning time
530 * is performed in handle_packet_beginning_msg().
531 */
532 if (stream->discarded_events_state.in_range) {
533 uint64_t expected_cs;
534
535 /*
536 * `stream->discarded_events_state.in_range` is only set
537 * when the stream class's discarded events have a time
538 * range.
539 *
540 * It is required that the packet beginning and end
541 * messages for this stream class have times when
542 * discarded events have a time range.
543 */
544 BT_ASSERT(stream->sc->discarded_events_has_ts);
545 BT_ASSERT(stream->sc->packets_have_ts_begin);
546 BT_ASSERT(stream->sc->packets_have_ts_end);
547
548 expected_cs = bt_clock_snapshot_get_value(cs);
549
550 if (stream->discarded_events_state.end_cs != expected_cs) {
551 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
552 "Incompatible discarded events message: "
553 "unexpected end time: "
554 "end-cs-val=%" PRIu64 ", "
555 "expected-end-cs-val=%" PRIu64 ", "
556 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
557 "trace-name=\"%s\", path=\"%s/%s\"",
558 stream->discarded_events_state.end_cs, expected_cs,
559 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
560 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
561 stream->trace->path->str, stream->file_name->str);
562 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
563 goto end;
564 }
565 }
566
567 ret = fs_sink_stream_close_packet(stream, cs);
568 if (ret) {
569 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to close packet.");
570 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
571 goto end;
572 }
573
574 /*
575 * We're not in a discarded events time range anymore since we
576 * require that the discarded events time ranges go from one
577 * packet's end time to the next packet's end time, and we're
578 * handling a packet end message here.
579 */
580 stream->discarded_events_state.in_range = false;
581
582 end:
583 return status;
584 }
585
586 static inline bt_component_class_sink_consume_method_status
587 handle_stream_beginning_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
588 {
589 bt_component_class_sink_consume_method_status status =
590 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
591 const bt_stream *ir_stream = bt_message_stream_beginning_borrow_stream_const(msg);
592 const bt_stream_class *ir_sc = bt_stream_borrow_class_const(ir_stream);
593 struct fs_sink_stream *stream;
594 bool packets_have_beginning_end_cs =
595 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc) &&
596 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc);
597
598 /*
599 * Not supported: discarded events or discarded packets support
600 * without packets support. Packets are the way to know where
601 * discarded events/packets occurred in CTF 1.8.
602 */
603 if (!bt_stream_class_supports_packets(ir_sc)) {
604 BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc));
605
606 if (!fs_sink->ignore_discarded_events && bt_stream_class_supports_discarded_events(ir_sc)) {
607 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
608 "Unsupported stream: "
609 "stream does not support packets, "
610 "but supports discarded events: "
611 "stream-addr=%p, "
612 "stream-id=%" PRIu64 ", "
613 "stream-name=\"%s\"",
614 ir_stream, bt_stream_get_id(ir_stream),
615 bt_stream_get_name(ir_stream));
616 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
617 goto end;
618 }
619 }
620
621 /*
622 * Not supported: discarded events with default clock snapshots,
623 * but packet beginning/end without default clock snapshot.
624 */
625 if (!fs_sink->ignore_discarded_events &&
626 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc) &&
627 !packets_have_beginning_end_cs) {
628 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
629 "Unsupported stream: discarded events have "
630 "default clock snapshots, but packets have no "
631 "beginning and/or end default clock snapshots: "
632 "stream-addr=%p, "
633 "stream-id=%" PRIu64 ", "
634 "stream-name=\"%s\"",
635 ir_stream, bt_stream_get_id(ir_stream),
636 bt_stream_get_name(ir_stream));
637 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
638 goto end;
639 }
640
641 /*
642 * Not supported: discarded packets with default clock
643 * snapshots, but packet beginning/end without default clock
644 * snapshot.
645 */
646 if (!fs_sink->ignore_discarded_packets &&
647 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc) &&
648 !packets_have_beginning_end_cs) {
649 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
650 "Unsupported stream: discarded packets have "
651 "default clock snapshots, but packets have no "
652 "beginning and/or end default clock snapshots: "
653 "stream-addr=%p, "
654 "stream-id=%" PRIu64 ", "
655 "stream-name=\"%s\"",
656 ir_stream, bt_stream_get_id(ir_stream),
657 bt_stream_get_name(ir_stream));
658 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
659 goto end;
660 }
661
662 stream = borrow_stream(fs_sink, ir_stream);
663 if (!stream) {
664 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
665 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
666 goto end;
667 }
668
669 BT_COMP_LOGI("Created new, empty stream file: "
670 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
671 "trace-name=\"%s\", path=\"%s/%s\"",
672 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
673 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
674 stream->trace->path->str, stream->file_name->str);
675
676 end:
677 return status;
678 }
679
680 static inline bt_component_class_sink_consume_method_status
681 handle_stream_end_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
682 {
683 bt_component_class_sink_consume_method_status status =
684 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
685 const bt_stream *ir_stream = bt_message_stream_end_borrow_stream_const(msg);
686 struct fs_sink_stream *stream;
687
688 stream = borrow_stream(fs_sink, ir_stream);
689 if (!stream) {
690 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
691 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
692 goto end;
693 }
694
695 if (G_UNLIKELY(!stream->sc->has_packets && stream->packet_state.is_open)) {
696 /* Close stream's current artificial packet */
697 int ret = fs_sink_stream_close_packet(stream, NULL);
698
699 if (ret) {
700 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to close packet.");
701 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
702 goto end;
703 }
704 }
705
706 BT_COMP_LOGI("Closing stream file: "
707 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
708 "trace-name=\"%s\", path=\"%s/%s\"",
709 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
710 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
711 stream->trace->path->str, stream->file_name->str);
712
713 /*
714 * This destroys the stream object and frees all its resources,
715 * closing the stream file.
716 */
717 g_hash_table_remove(stream->trace->streams, ir_stream);
718
719 end:
720 return status;
721 }
722
723 static inline bt_component_class_sink_consume_method_status
724 handle_discarded_events_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
725 {
726 bt_component_class_sink_consume_method_status status =
727 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
728 const bt_stream *ir_stream = bt_message_discarded_events_borrow_stream_const(msg);
729 struct fs_sink_stream *stream;
730 const bt_clock_snapshot *cs = NULL;
731 bt_property_availability avail;
732 uint64_t count;
733
734 stream = borrow_stream(fs_sink, ir_stream);
735 if (!stream) {
736 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
737 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
738 goto end;
739 }
740
741 if (fs_sink->ignore_discarded_events) {
742 BT_COMP_LOGI("Ignoring discarded events message: "
743 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
744 "trace-name=\"%s\", path=\"%s/%s\"",
745 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
746 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
747 stream->trace->path->str, stream->file_name->str);
748 goto end;
749 }
750
751 if (stream->discarded_events_state.in_range) {
752 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
753 "Unsupported contiguous discarded events message: "
754 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
755 "trace-name=\"%s\", path=\"%s/%s\"",
756 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
757 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
758 stream->trace->path->str, stream->file_name->str);
759 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
760 goto end;
761 }
762
763 /*
764 * If we're currently in an opened packet (got a packet
765 * beginning message, but no packet end message yet), we do not
766 * support having a discarded events message with a time range
767 * because we require that the discarded events message's time
768 * range go from a packet's end time to the next packet's end
769 * time.
770 */
771 if (stream->packet_state.is_open && stream->sc->discarded_events_has_ts) {
772 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
773 "Unsupported discarded events message with "
774 "default clock snapshots occurring within a packet: "
775 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
776 "trace-name=\"%s\", path=\"%s/%s\"",
777 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
778 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
779 stream->trace->path->str, stream->file_name->str);
780 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
781 goto end;
782 }
783
784 if (stream->sc->discarded_events_has_ts) {
785 /*
786 * Make the stream's state be in the time range of a
787 * discarded events message since we have the message's
788 * time range (`stream->sc->discarded_events_has_ts`).
789 */
790 stream->discarded_events_state.in_range = true;
791
792 /*
793 * The clock snapshot values will be validated when
794 * handling the next packet beginning and end messages
795 * (next calls to handle_packet_beginning_msg() and
796 * handle_packet_end_msg()).
797 */
798 cs = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
799 BT_ASSERT(cs);
800 stream->discarded_events_state.beginning_cs = bt_clock_snapshot_get_value(cs);
801 cs = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg);
802 BT_ASSERT(cs);
803 stream->discarded_events_state.end_cs = bt_clock_snapshot_get_value(cs);
804 }
805
806 avail = bt_message_discarded_events_get_count(msg, &count);
807 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
808 /*
809 * There's no specific count of discarded events: set it
810 * to 1 so that we know that we at least discarded
811 * something.
812 */
813 count = 1;
814 }
815
816 stream->packet_state.discarded_events_counter += count;
817
818 end:
819 return status;
820 }
821
822 static inline bt_component_class_sink_consume_method_status
823 handle_discarded_packets_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
824 {
825 bt_component_class_sink_consume_method_status status =
826 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
827 const bt_stream *ir_stream = bt_message_discarded_packets_borrow_stream_const(msg);
828 struct fs_sink_stream *stream;
829 const bt_clock_snapshot *cs = NULL;
830 bt_property_availability avail;
831 uint64_t count;
832
833 stream = borrow_stream(fs_sink, ir_stream);
834 if (!stream) {
835 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
836 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
837 goto end;
838 }
839
840 if (fs_sink->ignore_discarded_packets) {
841 BT_COMP_LOGI("Ignoring discarded packets message: "
842 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
843 "trace-name=\"%s\", path=\"%s/%s\"",
844 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
845 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
846 stream->trace->path->str, stream->file_name->str);
847 goto end;
848 }
849
850 if (stream->discarded_packets_state.in_range) {
851 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
852 "Unsupported contiguous discarded packets message: "
853 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
854 "trace-name=\"%s\", path=\"%s/%s\"",
855 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
856 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
857 stream->trace->path->str, stream->file_name->str);
858 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
859 goto end;
860 }
861
862 /*
863 * Discarded packets messages are guaranteed to occur between
864 * packets.
865 */
866 BT_ASSERT(!stream->packet_state.is_open);
867
868 if (stream->sc->discarded_packets_has_ts) {
869 /*
870 * Make the stream's state be in the time range of a
871 * discarded packets message since we have the message's
872 * time range (`stream->sc->discarded_packets_has_ts`).
873 */
874 stream->discarded_packets_state.in_range = true;
875
876 /*
877 * The clock snapshot values will be validated when
878 * handling the next packet beginning message (next call
879 * to handle_packet_beginning_msg()).
880 */
881 cs = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
882 BT_ASSERT(cs);
883 stream->discarded_packets_state.beginning_cs = bt_clock_snapshot_get_value(cs);
884 cs = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg);
885 BT_ASSERT(cs);
886 stream->discarded_packets_state.end_cs = bt_clock_snapshot_get_value(cs);
887 }
888
889 avail = bt_message_discarded_packets_get_count(msg, &count);
890 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
891 /*
892 * There's no specific count of discarded packets: set
893 * it to 1 so that we know that we at least discarded
894 * something.
895 */
896 count = 1;
897 }
898
899 stream->packet_state.seq_num += count;
900
901 end:
902 return status;
903 }
904
905 static inline void put_messages(bt_message_array_const msgs, uint64_t count)
906 {
907 uint64_t i;
908
909 for (i = 0; i < count; i++) {
910 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
911 }
912 }
913
914 bt_component_class_sink_consume_method_status ctf_fs_sink_consume(bt_self_component_sink *self_comp)
915 {
916 bt_component_class_sink_consume_method_status status =
917 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
918 struct fs_sink_comp *fs_sink;
919 bt_message_iterator_next_status next_status;
920 uint64_t msg_count = 0;
921 bt_message_array_const msgs;
922
923 fs_sink = (fs_sink_comp *) bt_self_component_get_data(
924 bt_self_component_sink_as_self_component(self_comp));
925 BT_ASSERT_DBG(fs_sink);
926 BT_ASSERT_DBG(fs_sink->upstream_iter);
927
928 /* Consume messages */
929 next_status = bt_message_iterator_next(fs_sink->upstream_iter, &msgs, &msg_count);
930 if (next_status < 0) {
931 status = (bt_component_class_sink_consume_method_status) next_status;
932 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
933 "Failed to get next message from upstream iterator.");
934 goto end;
935 }
936
937 switch (next_status) {
938 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK:
939 {
940 uint64_t i;
941
942 for (i = 0; i < msg_count; i++) {
943 const bt_message *msg = msgs[i];
944
945 BT_ASSERT_DBG(msg);
946
947 switch (bt_message_get_type(msg)) {
948 case BT_MESSAGE_TYPE_EVENT:
949 status = handle_event_msg(fs_sink, msg);
950 break;
951 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
952 status = handle_packet_beginning_msg(fs_sink, msg);
953 break;
954 case BT_MESSAGE_TYPE_PACKET_END:
955 status = handle_packet_end_msg(fs_sink, msg);
956 break;
957 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
958 /* Ignore */
959 BT_COMP_LOGD_STR("Ignoring message iterator inactivity message.");
960 break;
961 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
962 status = handle_stream_beginning_msg(fs_sink, msg);
963 break;
964 case BT_MESSAGE_TYPE_STREAM_END:
965 status = handle_stream_end_msg(fs_sink, msg);
966 break;
967 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
968 status = handle_discarded_events_msg(fs_sink, msg);
969 break;
970 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
971 status = handle_discarded_packets_msg(fs_sink, msg);
972 break;
973 default:
974 bt_common_abort();
975 }
976
977 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
978
979 if (status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK) {
980 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
981 "Failed to handle message: "
982 "generated CTF traces could be incomplete: "
983 "output-dir-path=\"%s\"",
984 fs_sink->output_dir_path->str);
985 goto error;
986 }
987 }
988
989 break;
990 }
991 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN:
992 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN;
993 break;
994 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END:
995 /* TODO: Finalize all traces (should already be done?) */
996 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END;
997 break;
998 default:
999 break;
1000 }
1001
1002 goto end;
1003
1004 error:
1005 BT_ASSERT(status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK);
1006 put_messages(msgs, msg_count);
1007
1008 end:
1009 return status;
1010 }
1011
1012 bt_component_class_sink_graph_is_configured_method_status
1013 ctf_fs_sink_graph_is_configured(bt_self_component_sink *self_comp)
1014 {
1015 bt_component_class_sink_graph_is_configured_method_status status;
1016 bt_message_iterator_create_from_sink_component_status msg_iter_status;
1017 fs_sink_comp *fs_sink = (fs_sink_comp *) bt_self_component_get_data(
1018 bt_self_component_sink_as_self_component(self_comp));
1019
1020 msg_iter_status = bt_message_iterator_create_from_sink_component(
1021 self_comp, bt_self_component_sink_borrow_input_port_by_name(self_comp, in_port_name),
1022 &fs_sink->upstream_iter);
1023 if (msg_iter_status != BT_MESSAGE_ITERATOR_CREATE_FROM_SINK_COMPONENT_STATUS_OK) {
1024 status = (bt_component_class_sink_graph_is_configured_method_status) msg_iter_status;
1025 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to create upstream iterator.");
1026 goto end;
1027 }
1028
1029 status = BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK;
1030 end:
1031 return status;
1032 }
1033
1034 void ctf_fs_sink_finalize(bt_self_component_sink *self_comp)
1035 {
1036 fs_sink_comp *fs_sink = (fs_sink_comp *) bt_self_component_get_data(
1037 bt_self_component_sink_as_self_component(self_comp));
1038
1039 destroy_fs_sink_comp(fs_sink);
1040 }
This page took 0.054671 seconds and 3 git commands to generate.