Sort includes in C++ files
[babeltrace.git] / src / plugins / ctf / fs-sink / fs-sink.cpp
CommitLineData
15fe47e0 1/*
0235b0db 2 * SPDX-License-Identifier: MIT
15fe47e0 3 *
0235b0db 4 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
15fe47e0
PP
5 */
6
c802cacb
SM
7#include <glib.h>
8#include <stdbool.h>
9#include <stdio.h>
10
11#include <babeltrace2/babeltrace.h>
12
aa1a7452 13#define BT_COMP_LOG_SELF_COMP (fs_sink->self_comp)
4164020e
SM
14#define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level)
15#define BT_LOG_TAG "PLUGIN/SINK.CTF.FS"
d9c39b0a 16#include "logging/comp-logging.h"
15fe47e0 17
578e048b
MJ
18#include "common/assert.h"
19#include "ctfser/ctfser.h"
c802cacb 20
ce67f561 21#include "plugins/common/param-validation/param-validation.h"
15fe47e0 22
087cd0f5 23#include "fs-sink-ctf-meta.hpp"
c802cacb
SM
24#include "fs-sink-stream.hpp"
25#include "fs-sink-trace.hpp"
26#include "fs-sink.hpp"
087cd0f5 27#include "translate-ctf-ir-to-tsdl.hpp"
c802cacb 28#include "translate-trace-ir-to-ctf-ir.hpp"
15fe47e0 29
4164020e 30static const char * const in_port_name = "in";
15fe47e0 31
4164020e
SM
32static bt_component_class_initialize_method_status
33ensure_output_dir_exists(struct fs_sink_comp *fs_sink)
15fe47e0 34{
4164020e
SM
35 bt_component_class_initialize_method_status status =
36 BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
37 int ret;
38
39 ret = g_mkdir_with_parents(fs_sink->output_dir_path->str, 0755);
40 if (ret) {
41 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(fs_sink->self_comp,
42 "Cannot create directories for output directory",
43 ": output-dir-path=\"%s\"", fs_sink->output_dir_path->str);
44 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
45 goto end;
46 }
15fe47e0
PP
47
48end:
4164020e 49 return status;
15fe47e0
PP
50}
51
087cd0f5 52static bt_param_validation_map_value_entry_descr fs_sink_params_descr[] = {
88730e42
SM
53 {"path", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
54 bt_param_validation_value_descr::makeString()},
55 {"assume-single-trace", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
56 bt_param_validation_value_descr::makeBool()},
57 {"ignore-discarded-events", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
58 bt_param_validation_value_descr::makeBool()},
59 {"ignore-discarded-packets", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
60 bt_param_validation_value_descr::makeBool()},
61 {"quiet", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
62 bt_param_validation_value_descr::makeBool()},
4164020e
SM
63 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
64
65static bt_component_class_initialize_method_status configure_component(struct fs_sink_comp *fs_sink,
66 const bt_value *params)
15fe47e0 67{
4164020e
SM
68 bt_component_class_initialize_method_status status;
69 const bt_value *value;
70 enum bt_param_validation_status validation_status;
71 gchar *validation_error;
72
73 validation_status =
74 bt_param_validation_validate(params, fs_sink_params_descr, &validation_error);
75 if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
76 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
77 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "%s", validation_error);
78 goto end;
79 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
80 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
81 goto end;
82 }
83
84 value = bt_value_map_borrow_entry_value_const(params, "path");
85 g_string_assign(fs_sink->output_dir_path, bt_value_string_get(value));
86
87 value = bt_value_map_borrow_entry_value_const(params, "assume-single-trace");
88 if (value) {
89 fs_sink->assume_single_trace = (bool) bt_value_bool_get(value);
90 }
91
92 value = bt_value_map_borrow_entry_value_const(params, "ignore-discarded-events");
93 if (value) {
94 fs_sink->ignore_discarded_events = (bool) bt_value_bool_get(value);
95 }
96
97 value = bt_value_map_borrow_entry_value_const(params, "ignore-discarded-packets");
98 if (value) {
99 fs_sink->ignore_discarded_packets = (bool) bt_value_bool_get(value);
100 }
101
102 value = bt_value_map_borrow_entry_value_const(params, "quiet");
103 if (value) {
104 fs_sink->quiet = (bool) bt_value_bool_get(value);
105 }
106
107 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
ce67f561 108
15fe47e0 109end:
4164020e
SM
110 g_free(validation_error);
111 return status;
15fe47e0
PP
112}
113
4164020e 114static void destroy_fs_sink_comp(struct fs_sink_comp *fs_sink)
15fe47e0 115{
4164020e
SM
116 if (!fs_sink) {
117 goto end;
118 }
15fe47e0 119
4164020e
SM
120 if (fs_sink->output_dir_path) {
121 g_string_free(fs_sink->output_dir_path, TRUE);
122 fs_sink->output_dir_path = NULL;
123 }
15fe47e0 124
4164020e
SM
125 if (fs_sink->traces) {
126 g_hash_table_destroy(fs_sink->traces);
127 fs_sink->traces = NULL;
128 }
15fe47e0 129
4164020e
SM
130 BT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(fs_sink->upstream_iter);
131 g_free(fs_sink);
15fe47e0
PP
132
133end:
4164020e 134 return;
15fe47e0
PP
135}
136
5bcd9f04
SM
137bt_component_class_initialize_method_status ctf_fs_sink_init(bt_self_component_sink *self_comp_sink,
138 bt_self_component_sink_configuration *,
139 const bt_value *params, void *)
15fe47e0 140{
4164020e
SM
141 bt_component_class_initialize_method_status status;
142 bt_self_component_add_port_status add_port_status;
143 struct fs_sink_comp *fs_sink = NULL;
144 bt_self_component *self_comp = bt_self_component_sink_as_self_component(self_comp_sink);
145 bt_logging_level log_level =
146 bt_component_get_logging_level(bt_self_component_as_component(self_comp));
147
148 fs_sink = g_new0(struct fs_sink_comp, 1);
149 if (!fs_sink) {
150 BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR, log_level, self_comp,
151 "Failed to allocate one CTF FS sink structure.");
152 BT_CURRENT_THREAD_ERROR_APPEND_CAUSE_FROM_COMPONENT(
153 self_comp, "Failed to allocate one CTF FS sink structure.");
154 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
155 goto end;
156 }
157
158 fs_sink->log_level = log_level;
159 fs_sink->self_comp = self_comp;
160 fs_sink->output_dir_path = g_string_new(NULL);
161 status = configure_component(fs_sink, params);
162 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
163 /* configure_component() logs errors */
164 goto end;
165 }
166
167 if (fs_sink->assume_single_trace &&
168 g_file_test(fs_sink->output_dir_path->str, G_FILE_TEST_EXISTS)) {
169 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
170 "Single trace mode, but output path exists: output-path=\"%s\"",
171 fs_sink->output_dir_path->str);
172 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
173 goto end;
174 }
175
176 status = ensure_output_dir_exists(fs_sink);
177 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
178 /* ensure_output_dir_exists() logs errors */
179 goto end;
180 }
181
182 fs_sink->traces = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL,
183 (GDestroyNotify) fs_sink_trace_destroy);
184 if (!fs_sink->traces) {
185 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate one GHashTable.");
186 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
187 goto end;
188 }
189
190 add_port_status =
191 bt_self_component_sink_add_input_port(self_comp_sink, in_port_name, NULL, NULL);
192 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
193 status = (bt_component_class_initialize_method_status) add_port_status;
194 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to add input port.");
195 goto end;
196 }
197
198 bt_self_component_set_data(self_comp, fs_sink);
15fe47e0
PP
199
200end:
4164020e
SM
201 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
202 destroy_fs_sink_comp(fs_sink);
203 }
15fe47e0 204
4164020e 205 return status;
15fe47e0
PP
206}
207
4164020e
SM
208static inline struct fs_sink_stream *borrow_stream(struct fs_sink_comp *fs_sink,
209 const bt_stream *ir_stream)
15fe47e0 210{
4164020e
SM
211 const bt_trace *ir_trace = bt_stream_borrow_trace_const(ir_stream);
212 struct fs_sink_trace *trace;
213 struct fs_sink_stream *stream = NULL;
214
215 trace = (fs_sink_trace *) g_hash_table_lookup(fs_sink->traces, ir_trace);
216 if (G_UNLIKELY(!trace)) {
217 if (fs_sink->assume_single_trace && g_hash_table_size(fs_sink->traces) > 0) {
218 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
219 "Single trace mode, but getting more than one trace: "
220 "stream-name=\"%s\"",
221 bt_stream_get_name(ir_stream));
222 goto end;
223 }
224
225 trace = fs_sink_trace_create(fs_sink, ir_trace);
226 if (!trace) {
227 goto end;
228 }
229 }
230
231 stream = (fs_sink_stream *) g_hash_table_lookup(trace->streams, ir_stream);
232 if (G_UNLIKELY(!stream)) {
233 stream = fs_sink_stream_create(trace, ir_stream);
234 if (!stream) {
235 goto end;
236 }
237 }
15fe47e0
PP
238
239end:
4164020e 240 return stream;
15fe47e0
PP
241}
242
4164020e
SM
243static inline bt_component_class_sink_consume_method_status
244handle_event_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 245{
4164020e
SM
246 int ret;
247 bt_component_class_sink_consume_method_status status =
248 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
249 const bt_event *ir_event = bt_message_event_borrow_event_const(msg);
250 const bt_stream *ir_stream = bt_event_borrow_stream_const(ir_event);
251 struct fs_sink_stream *stream;
252 struct fs_sink_ctf_event_class *ec = NULL;
253 const bt_clock_snapshot *cs = NULL;
254
255 stream = borrow_stream(fs_sink, ir_stream);
256 if (G_UNLIKELY(!stream)) {
257 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
258 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
259 goto end;
260 }
261
262 ret = try_translate_event_class_trace_ir_to_ctf_ir(fs_sink, stream->sc,
263 bt_event_borrow_class_const(ir_event), &ec);
264 if (ret) {
265 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to translate event class to CTF IR.");
266 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
267 goto end;
268 }
269
270 BT_ASSERT_DBG(ec);
271
272 if (stream->sc->default_clock_class) {
273 cs = bt_message_event_borrow_default_clock_snapshot_const(msg);
274 }
275
276 /*
277 * If this event's stream does not support packets, then we
278 * lazily create artificial packets.
279 *
280 * The size of an artificial packet is arbitrarily at least
281 * 4 MiB (it usually is greater because we close it when
282 * comes the time to write a new event and the packet's content
283 * size is >= 4 MiB), except the last one which can be smaller.
284 */
285 if (G_UNLIKELY(!stream->sc->has_packets)) {
286 if (stream->packet_state.is_open &&
287 bt_ctfser_get_offset_in_current_packet_bits(&stream->ctfser) / 8 >= 4 * 1024 * 1024) {
288 /*
289 * Stream's current packet is larger than 4 MiB:
290 * close it. A new packet will be opened just
291 * below.
292 */
293 ret = fs_sink_stream_close_packet(stream, NULL);
294 if (ret) {
295 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to close packet.");
296 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
297 goto end;
298 }
299 }
300
301 if (!stream->packet_state.is_open) {
302 /* Stream's packet is not currently opened: open it */
303 ret = fs_sink_stream_open_packet(stream, NULL, NULL);
304 if (ret) {
305 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to open packet.");
306 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
307 goto end;
308 }
309 }
310 }
311
312 BT_ASSERT_DBG(stream->packet_state.is_open);
313 ret = fs_sink_stream_write_event(stream, cs, ir_event, ec);
314 if (G_UNLIKELY(ret)) {
315 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to write event.");
316 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
317 goto end;
318 }
15fe47e0
PP
319
320end:
4164020e 321 return status;
15fe47e0
PP
322}
323
4164020e
SM
324static inline bt_component_class_sink_consume_method_status
325handle_packet_beginning_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 326{
4164020e
SM
327 int ret;
328 bt_component_class_sink_consume_method_status status =
329 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
330 const bt_packet *ir_packet = bt_message_packet_beginning_borrow_packet_const(msg);
331 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
332 struct fs_sink_stream *stream;
333 const bt_clock_snapshot *cs = NULL;
334
335 stream = borrow_stream(fs_sink, ir_stream);
336 if (G_UNLIKELY(!stream)) {
337 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
338 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
339 goto end;
340 }
341
342 if (stream->sc->packets_have_ts_begin) {
343 cs = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
344 BT_ASSERT(cs);
345 }
346
347 /*
348 * If we previously received a discarded events message with
349 * a time range, make sure that its beginning time matches what's
350 * expected for CTF 1.8, that is:
351 *
352 * * Its beginning time is the previous packet's end
353 * time (or the current packet's beginning time if
354 * this is the first packet).
355 *
356 * We check this here instead of in handle_packet_end_msg()
357 * because we want to catch any incompatible message as early as
358 * possible to report the error.
359 *
360 * Validation of the discarded events message's end time is
361 * performed in handle_packet_end_msg().
362 */
363 if (stream->discarded_events_state.in_range) {
364 uint64_t expected_cs;
365
366 /*
367 * `stream->discarded_events_state.in_range` is only set
368 * when the stream class's discarded events have a time
369 * range.
370 *
371 * It is required that the packet beginning and end
372 * messages for this stream class have times when
373 * discarded events have a time range.
374 */
375 BT_ASSERT(stream->sc->discarded_events_has_ts);
376 BT_ASSERT(stream->sc->packets_have_ts_begin);
377 BT_ASSERT(stream->sc->packets_have_ts_end);
378
379 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
380 /* We're opening the first packet */
381 expected_cs = bt_clock_snapshot_get_value(cs);
382 } else {
383 expected_cs = stream->prev_packet_state.end_cs;
384 }
385
386 if (stream->discarded_events_state.beginning_cs != expected_cs) {
387 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
388 "Incompatible discarded events message: "
389 "unexpected beginning time: "
390 "beginning-cs-val=%" PRIu64 ", "
391 "expected-beginning-cs-val=%" PRIu64 ", "
392 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
393 "trace-name=\"%s\", path=\"%s/%s\"",
394 stream->discarded_events_state.beginning_cs, expected_cs,
395 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
396 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
397 stream->trace->path->str, stream->file_name->str);
398 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
399 goto end;
400 }
401 }
402
403 /*
404 * If we previously received a discarded packets message with a
405 * time range, make sure that its beginning and end times match
406 * what's expected for CTF 1.8, that is:
407 *
408 * * Its beginning time is the previous packet's end time.
409 *
410 * * Its end time is the current packet's beginning time.
411 */
412 if (stream->discarded_packets_state.in_range) {
413 uint64_t expected_end_cs;
414
415 /*
416 * `stream->discarded_packets_state.in_range` is only
417 * set when the stream class's discarded packets have a
418 * time range.
419 *
420 * It is required that the packet beginning and end
421 * messages for this stream class have times when
422 * discarded packets have a time range.
423 */
424 BT_ASSERT(stream->sc->discarded_packets_has_ts);
425 BT_ASSERT(stream->sc->packets_have_ts_begin);
426 BT_ASSERT(stream->sc->packets_have_ts_end);
427
428 /*
429 * It is not supported to have a discarded packets
430 * message _before_ the first packet: we cannot validate
431 * that its beginning time is compatible with CTF 1.8 in
432 * this case.
433 */
434 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
435 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
436 "Incompatible discarded packets message "
437 "occurring before the stream's first packet: "
438 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
439 "trace-name=\"%s\", path=\"%s/%s\"",
440 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
441 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
442 stream->trace->path->str, stream->file_name->str);
443 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
444 goto end;
445 }
446
447 if (stream->discarded_packets_state.beginning_cs != stream->prev_packet_state.end_cs) {
448 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
449 "Incompatible discarded packets message: "
450 "unexpected beginning time: "
451 "beginning-cs-val=%" PRIu64 ", "
452 "expected-beginning-cs-val=%" PRIu64 ", "
453 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
454 "trace-name=\"%s\", path=\"%s/%s\"",
455 stream->discarded_packets_state.beginning_cs,
456 stream->prev_packet_state.end_cs, bt_stream_get_id(ir_stream),
457 bt_stream_get_name(ir_stream),
458 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
459 stream->trace->path->str, stream->file_name->str);
460 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
461 goto end;
462 }
463
464 expected_end_cs = bt_clock_snapshot_get_value(cs);
465
466 if (stream->discarded_packets_state.end_cs != expected_end_cs) {
467 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
468 "Incompatible discarded packets message: "
469 "unexpected end time: "
470 "end-cs-val=%" PRIu64 ", "
471 "expected-end-cs-val=%" PRIu64 ", "
472 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
473 "trace-name=\"%s\", path=\"%s/%s\"",
474 stream->discarded_packets_state.end_cs, expected_end_cs,
475 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
476 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
477 stream->trace->path->str, stream->file_name->str);
478 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
479 goto end;
480 }
481 }
482
483 /*
484 * We're not in a discarded packets time range anymore since we
485 * require that the discarded packets time ranges go from one
486 * packet's end time to the next packet's beginning time, and
487 * we're handling a packet beginning message here.
488 */
489 stream->discarded_packets_state.in_range = false;
490
491 ret = fs_sink_stream_open_packet(stream, cs, ir_packet);
492 if (ret) {
493 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to open packet.");
494 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
495 goto end;
496 }
15fe47e0
PP
497
498end:
4164020e 499 return status;
15fe47e0
PP
500}
501
4164020e
SM
502static inline bt_component_class_sink_consume_method_status
503handle_packet_end_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 504{
4164020e
SM
505 int ret;
506 bt_component_class_sink_consume_method_status status =
507 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
508 const bt_packet *ir_packet = bt_message_packet_end_borrow_packet_const(msg);
509 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
510 struct fs_sink_stream *stream;
511 const bt_clock_snapshot *cs = NULL;
512
513 stream = borrow_stream(fs_sink, ir_stream);
514 if (G_UNLIKELY(!stream)) {
515 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
516 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
517 goto end;
518 }
519
520 if (stream->sc->packets_have_ts_end) {
521 cs = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
522 BT_ASSERT(cs);
523 }
524
525 /*
526 * If we previously received a discarded events message with
527 * a time range, make sure that its end time matches what's
528 * expected for CTF 1.8, that is:
529 *
530 * * Its end time is the current packet's end time.
531 *
532 * Validation of the discarded events message's beginning time
533 * is performed in handle_packet_beginning_msg().
534 */
535 if (stream->discarded_events_state.in_range) {
536 uint64_t expected_cs;
537
538 /*
539 * `stream->discarded_events_state.in_range` is only set
540 * when the stream class's discarded events have a time
541 * range.
542 *
543 * It is required that the packet beginning and end
544 * messages for this stream class have times when
545 * discarded events have a time range.
546 */
547 BT_ASSERT(stream->sc->discarded_events_has_ts);
548 BT_ASSERT(stream->sc->packets_have_ts_begin);
549 BT_ASSERT(stream->sc->packets_have_ts_end);
550
551 expected_cs = bt_clock_snapshot_get_value(cs);
552
553 if (stream->discarded_events_state.end_cs != expected_cs) {
554 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
555 "Incompatible discarded events message: "
556 "unexpected end time: "
557 "end-cs-val=%" PRIu64 ", "
558 "expected-end-cs-val=%" PRIu64 ", "
559 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
560 "trace-name=\"%s\", path=\"%s/%s\"",
561 stream->discarded_events_state.end_cs, expected_cs,
562 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
563 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
564 stream->trace->path->str, stream->file_name->str);
565 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
566 goto end;
567 }
568 }
569
570 ret = fs_sink_stream_close_packet(stream, cs);
571 if (ret) {
572 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to close packet.");
573 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
574 goto end;
575 }
576
577 /*
578 * We're not in a discarded events time range anymore since we
579 * require that the discarded events time ranges go from one
580 * packet's end time to the next packet's end time, and we're
581 * handling a packet end message here.
582 */
583 stream->discarded_events_state.in_range = false;
15fe47e0
PP
584
585end:
4164020e 586 return status;
15fe47e0
PP
587}
588
4164020e
SM
589static inline bt_component_class_sink_consume_method_status
590handle_stream_beginning_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 591{
4164020e
SM
592 bt_component_class_sink_consume_method_status status =
593 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
594 const bt_stream *ir_stream = bt_message_stream_beginning_borrow_stream_const(msg);
595 const bt_stream_class *ir_sc = bt_stream_borrow_class_const(ir_stream);
596 struct fs_sink_stream *stream;
597 bool packets_have_beginning_end_cs =
598 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc) &&
599 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc);
600
601 /*
602 * Not supported: discarded events or discarded packets support
603 * without packets support. Packets are the way to know where
604 * discarded events/packets occurred in CTF 1.8.
605 */
606 if (!bt_stream_class_supports_packets(ir_sc)) {
607 BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc));
608
609 if (!fs_sink->ignore_discarded_events && bt_stream_class_supports_discarded_events(ir_sc)) {
610 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
611 "Unsupported stream: "
612 "stream does not support packets, "
613 "but supports discarded events: "
614 "stream-addr=%p, "
615 "stream-id=%" PRIu64 ", "
616 "stream-name=\"%s\"",
617 ir_stream, bt_stream_get_id(ir_stream),
618 bt_stream_get_name(ir_stream));
619 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
620 goto end;
621 }
622 }
623
624 /*
625 * Not supported: discarded events with default clock snapshots,
626 * but packet beginning/end without default clock snapshot.
627 */
628 if (!fs_sink->ignore_discarded_events &&
629 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc) &&
630 !packets_have_beginning_end_cs) {
631 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
632 "Unsupported stream: discarded events have "
633 "default clock snapshots, but packets have no "
634 "beginning and/or end default clock snapshots: "
635 "stream-addr=%p, "
636 "stream-id=%" PRIu64 ", "
637 "stream-name=\"%s\"",
638 ir_stream, bt_stream_get_id(ir_stream),
639 bt_stream_get_name(ir_stream));
640 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
641 goto end;
642 }
643
644 /*
645 * Not supported: discarded packets with default clock
646 * snapshots, but packet beginning/end without default clock
647 * snapshot.
648 */
649 if (!fs_sink->ignore_discarded_packets &&
650 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc) &&
651 !packets_have_beginning_end_cs) {
652 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
653 "Unsupported stream: discarded packets have "
654 "default clock snapshots, but packets have no "
655 "beginning and/or end default clock snapshots: "
656 "stream-addr=%p, "
657 "stream-id=%" PRIu64 ", "
658 "stream-name=\"%s\"",
659 ir_stream, bt_stream_get_id(ir_stream),
660 bt_stream_get_name(ir_stream));
661 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
662 goto end;
663 }
664
665 stream = borrow_stream(fs_sink, ir_stream);
666 if (!stream) {
667 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
668 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
669 goto end;
670 }
671
672 BT_COMP_LOGI("Created new, empty stream file: "
673 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
674 "trace-name=\"%s\", path=\"%s/%s\"",
675 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
676 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
677 stream->trace->path->str, stream->file_name->str);
15fe47e0
PP
678
679end:
4164020e 680 return status;
15fe47e0
PP
681}
682
4164020e
SM
683static inline bt_component_class_sink_consume_method_status
684handle_stream_end_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 685{
4164020e
SM
686 bt_component_class_sink_consume_method_status status =
687 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
688 const bt_stream *ir_stream = bt_message_stream_end_borrow_stream_const(msg);
689 struct fs_sink_stream *stream;
690
691 stream = borrow_stream(fs_sink, ir_stream);
692 if (!stream) {
693 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
694 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
695 goto end;
696 }
697
698 if (G_UNLIKELY(!stream->sc->has_packets && stream->packet_state.is_open)) {
699 /* Close stream's current artificial packet */
700 int ret = fs_sink_stream_close_packet(stream, NULL);
701
702 if (ret) {
703 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to close packet.");
704 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
705 goto end;
706 }
707 }
708
709 BT_COMP_LOGI("Closing stream file: "
710 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
711 "trace-name=\"%s\", path=\"%s/%s\"",
712 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
713 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
714 stream->trace->path->str, stream->file_name->str);
715
716 /*
717 * This destroys the stream object and frees all its resources,
718 * closing the stream file.
719 */
720 g_hash_table_remove(stream->trace->streams, ir_stream);
15fe47e0
PP
721
722end:
4164020e 723 return status;
15fe47e0
PP
724}
725
4164020e
SM
726static inline bt_component_class_sink_consume_method_status
727handle_discarded_events_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 728{
4164020e
SM
729 bt_component_class_sink_consume_method_status status =
730 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
731 const bt_stream *ir_stream = bt_message_discarded_events_borrow_stream_const(msg);
732 struct fs_sink_stream *stream;
733 const bt_clock_snapshot *cs = NULL;
734 bt_property_availability avail;
735 uint64_t count;
736
737 stream = borrow_stream(fs_sink, ir_stream);
738 if (!stream) {
739 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
740 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
741 goto end;
742 }
743
744 if (fs_sink->ignore_discarded_events) {
745 BT_COMP_LOGI("Ignoring discarded events message: "
746 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
747 "trace-name=\"%s\", path=\"%s/%s\"",
748 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
749 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
750 stream->trace->path->str, stream->file_name->str);
751 goto end;
752 }
753
754 if (stream->discarded_events_state.in_range) {
755 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
756 "Unsupported contiguous discarded events message: "
757 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
758 "trace-name=\"%s\", path=\"%s/%s\"",
759 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
760 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
761 stream->trace->path->str, stream->file_name->str);
762 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
763 goto end;
764 }
765
766 /*
767 * If we're currently in an opened packet (got a packet
768 * beginning message, but no packet end message yet), we do not
769 * support having a discarded events message with a time range
770 * because we require that the discarded events message's time
771 * range go from a packet's end time to the next packet's end
772 * time.
773 */
774 if (stream->packet_state.is_open && stream->sc->discarded_events_has_ts) {
775 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
776 "Unsupported discarded events message with "
777 "default clock snapshots occurring within a packet: "
778 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
779 "trace-name=\"%s\", path=\"%s/%s\"",
780 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
781 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
782 stream->trace->path->str, stream->file_name->str);
783 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
784 goto end;
785 }
786
787 if (stream->sc->discarded_events_has_ts) {
788 /*
789 * Make the stream's state be in the time range of a
790 * discarded events message since we have the message's
791 * time range (`stream->sc->discarded_events_has_ts`).
792 */
793 stream->discarded_events_state.in_range = true;
794
795 /*
796 * The clock snapshot values will be validated when
797 * handling the next packet beginning and end messages
798 * (next calls to handle_packet_beginning_msg() and
799 * handle_packet_end_msg()).
800 */
801 cs = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
802 BT_ASSERT(cs);
803 stream->discarded_events_state.beginning_cs = bt_clock_snapshot_get_value(cs);
804 cs = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg);
805 BT_ASSERT(cs);
806 stream->discarded_events_state.end_cs = bt_clock_snapshot_get_value(cs);
807 }
808
809 avail = bt_message_discarded_events_get_count(msg, &count);
810 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
811 /*
812 * There's no specific count of discarded events: set it
813 * to 1 so that we know that we at least discarded
814 * something.
815 */
816 count = 1;
817 }
818
819 stream->packet_state.discarded_events_counter += count;
15fe47e0
PP
820
821end:
4164020e 822 return status;
15fe47e0
PP
823}
824
4164020e
SM
825static inline bt_component_class_sink_consume_method_status
826handle_discarded_packets_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 827{
4164020e
SM
828 bt_component_class_sink_consume_method_status status =
829 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
830 const bt_stream *ir_stream = bt_message_discarded_packets_borrow_stream_const(msg);
831 struct fs_sink_stream *stream;
832 const bt_clock_snapshot *cs = NULL;
833 bt_property_availability avail;
834 uint64_t count;
835
836 stream = borrow_stream(fs_sink, ir_stream);
837 if (!stream) {
838 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
839 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
840 goto end;
841 }
842
843 if (fs_sink->ignore_discarded_packets) {
844 BT_COMP_LOGI("Ignoring discarded packets message: "
845 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
846 "trace-name=\"%s\", path=\"%s/%s\"",
847 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
848 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
849 stream->trace->path->str, stream->file_name->str);
850 goto end;
851 }
852
853 if (stream->discarded_packets_state.in_range) {
854 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
855 "Unsupported contiguous discarded packets message: "
856 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
857 "trace-name=\"%s\", path=\"%s/%s\"",
858 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
859 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
860 stream->trace->path->str, stream->file_name->str);
861 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
862 goto end;
863 }
864
865 /*
866 * Discarded packets messages are guaranteed to occur between
867 * packets.
868 */
869 BT_ASSERT(!stream->packet_state.is_open);
870
871 if (stream->sc->discarded_packets_has_ts) {
872 /*
873 * Make the stream's state be in the time range of a
874 * discarded packets message since we have the message's
875 * time range (`stream->sc->discarded_packets_has_ts`).
876 */
877 stream->discarded_packets_state.in_range = true;
878
879 /*
880 * The clock snapshot values will be validated when
881 * handling the next packet beginning message (next call
882 * to handle_packet_beginning_msg()).
883 */
884 cs = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
885 BT_ASSERT(cs);
886 stream->discarded_packets_state.beginning_cs = bt_clock_snapshot_get_value(cs);
887 cs = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg);
888 BT_ASSERT(cs);
889 stream->discarded_packets_state.end_cs = bt_clock_snapshot_get_value(cs);
890 }
891
892 avail = bt_message_discarded_packets_get_count(msg, &count);
893 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
894 /*
895 * There's no specific count of discarded packets: set
896 * it to 1 so that we know that we at least discarded
897 * something.
898 */
899 count = 1;
900 }
901
902 stream->packet_state.seq_num += count;
15fe47e0
PP
903
904end:
4164020e 905 return status;
15fe47e0
PP
906}
907
4164020e 908static inline void put_messages(bt_message_array_const msgs, uint64_t count)
15fe47e0 909{
4164020e 910 uint64_t i;
15fe47e0 911
4164020e
SM
912 for (i = 0; i < count; i++) {
913 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
914 }
15fe47e0
PP
915}
916
4164020e 917bt_component_class_sink_consume_method_status ctf_fs_sink_consume(bt_self_component_sink *self_comp)
15fe47e0 918{
4164020e
SM
919 bt_component_class_sink_consume_method_status status =
920 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
921 struct fs_sink_comp *fs_sink;
922 bt_message_iterator_next_status next_status;
923 uint64_t msg_count = 0;
924 bt_message_array_const msgs;
925
926 fs_sink = (fs_sink_comp *) bt_self_component_get_data(
927 bt_self_component_sink_as_self_component(self_comp));
928 BT_ASSERT_DBG(fs_sink);
929 BT_ASSERT_DBG(fs_sink->upstream_iter);
930
931 /* Consume messages */
932 next_status = bt_message_iterator_next(fs_sink->upstream_iter, &msgs, &msg_count);
933 if (next_status < 0) {
934 status = (bt_component_class_sink_consume_method_status) next_status;
935 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
936 "Failed to get next message from upstream iterator.");
937 goto end;
938 }
939
940 switch (next_status) {
941 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK:
942 {
943 uint64_t i;
944
945 for (i = 0; i < msg_count; i++) {
946 const bt_message *msg = msgs[i];
947
948 BT_ASSERT_DBG(msg);
949
950 switch (bt_message_get_type(msg)) {
951 case BT_MESSAGE_TYPE_EVENT:
952 status = handle_event_msg(fs_sink, msg);
953 break;
954 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
955 status = handle_packet_beginning_msg(fs_sink, msg);
956 break;
957 case BT_MESSAGE_TYPE_PACKET_END:
958 status = handle_packet_end_msg(fs_sink, msg);
959 break;
960 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
961 /* Ignore */
962 BT_COMP_LOGD_STR("Ignoring message iterator inactivity message.");
963 break;
964 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
965 status = handle_stream_beginning_msg(fs_sink, msg);
966 break;
967 case BT_MESSAGE_TYPE_STREAM_END:
968 status = handle_stream_end_msg(fs_sink, msg);
969 break;
970 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
971 status = handle_discarded_events_msg(fs_sink, msg);
972 break;
973 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
974 status = handle_discarded_packets_msg(fs_sink, msg);
975 break;
976 default:
977 bt_common_abort();
978 }
979
980 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
981
982 if (status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK) {
983 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
984 "Failed to handle message: "
985 "generated CTF traces could be incomplete: "
986 "output-dir-path=\"%s\"",
987 fs_sink->output_dir_path->str);
988 goto error;
989 }
990 }
991
992 break;
993 }
994 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN:
995 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN;
996 break;
997 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END:
998 /* TODO: Finalize all traces (should already be done?) */
999 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END;
1000 break;
1001 default:
1002 break;
1003 }
1004
1005 goto end;
15fe47e0
PP
1006
1007error:
4164020e
SM
1008 BT_ASSERT(status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK);
1009 put_messages(msgs, msg_count);
15fe47e0
PP
1010
1011end:
4164020e 1012 return status;
15fe47e0
PP
1013}
1014
e803df70 1015bt_component_class_sink_graph_is_configured_method_status
4164020e 1016ctf_fs_sink_graph_is_configured(bt_self_component_sink *self_comp)
15fe47e0 1017{
4164020e
SM
1018 bt_component_class_sink_graph_is_configured_method_status status;
1019 bt_message_iterator_create_from_sink_component_status msg_iter_status;
1020 fs_sink_comp *fs_sink = (fs_sink_comp *) bt_self_component_get_data(
1021 bt_self_component_sink_as_self_component(self_comp));
1022
1023 msg_iter_status = bt_message_iterator_create_from_sink_component(
1024 self_comp, bt_self_component_sink_borrow_input_port_by_name(self_comp, in_port_name),
1025 &fs_sink->upstream_iter);
1026 if (msg_iter_status != BT_MESSAGE_ITERATOR_CREATE_FROM_SINK_COMPONENT_STATUS_OK) {
1027 status = (bt_component_class_sink_graph_is_configured_method_status) msg_iter_status;
1028 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to create upstream iterator.");
1029 goto end;
1030 }
1031
1032 status = BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK;
15fe47e0 1033end:
4164020e 1034 return status;
15fe47e0
PP
1035}
1036
15fe47e0
PP
1037void ctf_fs_sink_finalize(bt_self_component_sink *self_comp)
1038{
4164020e
SM
1039 fs_sink_comp *fs_sink = (fs_sink_comp *) bt_self_component_get_data(
1040 bt_self_component_sink_as_self_component(self_comp));
15fe47e0 1041
4164020e 1042 destroy_fs_sink_comp(fs_sink);
15fe47e0 1043}
This page took 0.116099 seconds and 4 git commands to generate.