Remove stdbool.h includes from C++ files
[babeltrace.git] / src / plugins / ctf / fs-sink / fs-sink.cpp
CommitLineData
15fe47e0 1/*
0235b0db 2 * SPDX-License-Identifier: MIT
15fe47e0 3 *
0235b0db 4 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
15fe47e0
PP
5 */
6
c802cacb 7#include <glib.h>
c802cacb
SM
8#include <stdio.h>
9
10#include <babeltrace2/babeltrace.h>
11
aa1a7452 12#define BT_COMP_LOG_SELF_COMP (fs_sink->self_comp)
4164020e
SM
13#define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level)
14#define BT_LOG_TAG "PLUGIN/SINK.CTF.FS"
d9c39b0a 15#include "logging/comp-logging.h"
15fe47e0 16
578e048b
MJ
17#include "common/assert.h"
18#include "ctfser/ctfser.h"
c802cacb 19
ce67f561 20#include "plugins/common/param-validation/param-validation.h"
15fe47e0 21
087cd0f5 22#include "fs-sink-ctf-meta.hpp"
c802cacb
SM
23#include "fs-sink-stream.hpp"
24#include "fs-sink-trace.hpp"
25#include "fs-sink.hpp"
087cd0f5 26#include "translate-ctf-ir-to-tsdl.hpp"
c802cacb 27#include "translate-trace-ir-to-ctf-ir.hpp"
15fe47e0 28
4164020e 29static const char * const in_port_name = "in";
15fe47e0 30
4164020e
SM
31static bt_component_class_initialize_method_status
32ensure_output_dir_exists(struct fs_sink_comp *fs_sink)
15fe47e0 33{
4164020e
SM
34 bt_component_class_initialize_method_status status =
35 BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
36 int ret;
37
38 ret = g_mkdir_with_parents(fs_sink->output_dir_path->str, 0755);
39 if (ret) {
40 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(fs_sink->self_comp,
41 "Cannot create directories for output directory",
42 ": output-dir-path=\"%s\"", fs_sink->output_dir_path->str);
43 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
44 goto end;
45 }
15fe47e0
PP
46
47end:
4164020e 48 return status;
15fe47e0
PP
49}
50
087cd0f5 51static bt_param_validation_map_value_entry_descr fs_sink_params_descr[] = {
88730e42
SM
52 {"path", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
53 bt_param_validation_value_descr::makeString()},
54 {"assume-single-trace", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
55 bt_param_validation_value_descr::makeBool()},
56 {"ignore-discarded-events", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
57 bt_param_validation_value_descr::makeBool()},
58 {"ignore-discarded-packets", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
59 bt_param_validation_value_descr::makeBool()},
60 {"quiet", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
61 bt_param_validation_value_descr::makeBool()},
4164020e
SM
62 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
63
64static bt_component_class_initialize_method_status configure_component(struct fs_sink_comp *fs_sink,
65 const bt_value *params)
15fe47e0 66{
4164020e
SM
67 bt_component_class_initialize_method_status status;
68 const bt_value *value;
69 enum bt_param_validation_status validation_status;
70 gchar *validation_error;
71
72 validation_status =
73 bt_param_validation_validate(params, fs_sink_params_descr, &validation_error);
74 if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
75 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
76 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "%s", validation_error);
77 goto end;
78 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
79 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
80 goto end;
81 }
82
83 value = bt_value_map_borrow_entry_value_const(params, "path");
84 g_string_assign(fs_sink->output_dir_path, bt_value_string_get(value));
85
86 value = bt_value_map_borrow_entry_value_const(params, "assume-single-trace");
87 if (value) {
88 fs_sink->assume_single_trace = (bool) bt_value_bool_get(value);
89 }
90
91 value = bt_value_map_borrow_entry_value_const(params, "ignore-discarded-events");
92 if (value) {
93 fs_sink->ignore_discarded_events = (bool) bt_value_bool_get(value);
94 }
95
96 value = bt_value_map_borrow_entry_value_const(params, "ignore-discarded-packets");
97 if (value) {
98 fs_sink->ignore_discarded_packets = (bool) bt_value_bool_get(value);
99 }
100
101 value = bt_value_map_borrow_entry_value_const(params, "quiet");
102 if (value) {
103 fs_sink->quiet = (bool) bt_value_bool_get(value);
104 }
105
106 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
ce67f561 107
15fe47e0 108end:
4164020e
SM
109 g_free(validation_error);
110 return status;
15fe47e0
PP
111}
112
4164020e 113static void destroy_fs_sink_comp(struct fs_sink_comp *fs_sink)
15fe47e0 114{
4164020e
SM
115 if (!fs_sink) {
116 goto end;
117 }
15fe47e0 118
4164020e
SM
119 if (fs_sink->output_dir_path) {
120 g_string_free(fs_sink->output_dir_path, TRUE);
121 fs_sink->output_dir_path = NULL;
122 }
15fe47e0 123
4164020e
SM
124 if (fs_sink->traces) {
125 g_hash_table_destroy(fs_sink->traces);
126 fs_sink->traces = NULL;
127 }
15fe47e0 128
4164020e
SM
129 BT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(fs_sink->upstream_iter);
130 g_free(fs_sink);
15fe47e0
PP
131
132end:
4164020e 133 return;
15fe47e0
PP
134}
135
5bcd9f04
SM
136bt_component_class_initialize_method_status ctf_fs_sink_init(bt_self_component_sink *self_comp_sink,
137 bt_self_component_sink_configuration *,
138 const bt_value *params, void *)
15fe47e0 139{
4164020e
SM
140 bt_component_class_initialize_method_status status;
141 bt_self_component_add_port_status add_port_status;
142 struct fs_sink_comp *fs_sink = NULL;
143 bt_self_component *self_comp = bt_self_component_sink_as_self_component(self_comp_sink);
144 bt_logging_level log_level =
145 bt_component_get_logging_level(bt_self_component_as_component(self_comp));
146
147 fs_sink = g_new0(struct fs_sink_comp, 1);
148 if (!fs_sink) {
149 BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR, log_level, self_comp,
150 "Failed to allocate one CTF FS sink structure.");
151 BT_CURRENT_THREAD_ERROR_APPEND_CAUSE_FROM_COMPONENT(
152 self_comp, "Failed to allocate one CTF FS sink structure.");
153 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
154 goto end;
155 }
156
157 fs_sink->log_level = log_level;
158 fs_sink->self_comp = self_comp;
159 fs_sink->output_dir_path = g_string_new(NULL);
160 status = configure_component(fs_sink, params);
161 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
162 /* configure_component() logs errors */
163 goto end;
164 }
165
166 if (fs_sink->assume_single_trace &&
167 g_file_test(fs_sink->output_dir_path->str, G_FILE_TEST_EXISTS)) {
168 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
169 "Single trace mode, but output path exists: output-path=\"%s\"",
170 fs_sink->output_dir_path->str);
171 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
172 goto end;
173 }
174
175 status = ensure_output_dir_exists(fs_sink);
176 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
177 /* ensure_output_dir_exists() logs errors */
178 goto end;
179 }
180
181 fs_sink->traces = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL,
182 (GDestroyNotify) fs_sink_trace_destroy);
183 if (!fs_sink->traces) {
184 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate one GHashTable.");
185 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
186 goto end;
187 }
188
189 add_port_status =
190 bt_self_component_sink_add_input_port(self_comp_sink, in_port_name, NULL, NULL);
191 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
192 status = (bt_component_class_initialize_method_status) add_port_status;
193 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to add input port.");
194 goto end;
195 }
196
197 bt_self_component_set_data(self_comp, fs_sink);
15fe47e0
PP
198
199end:
4164020e
SM
200 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
201 destroy_fs_sink_comp(fs_sink);
202 }
15fe47e0 203
4164020e 204 return status;
15fe47e0
PP
205}
206
4164020e
SM
207static inline struct fs_sink_stream *borrow_stream(struct fs_sink_comp *fs_sink,
208 const bt_stream *ir_stream)
15fe47e0 209{
4164020e
SM
210 const bt_trace *ir_trace = bt_stream_borrow_trace_const(ir_stream);
211 struct fs_sink_trace *trace;
212 struct fs_sink_stream *stream = NULL;
213
214 trace = (fs_sink_trace *) g_hash_table_lookup(fs_sink->traces, ir_trace);
215 if (G_UNLIKELY(!trace)) {
216 if (fs_sink->assume_single_trace && g_hash_table_size(fs_sink->traces) > 0) {
217 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
218 "Single trace mode, but getting more than one trace: "
219 "stream-name=\"%s\"",
220 bt_stream_get_name(ir_stream));
221 goto end;
222 }
223
224 trace = fs_sink_trace_create(fs_sink, ir_trace);
225 if (!trace) {
226 goto end;
227 }
228 }
229
230 stream = (fs_sink_stream *) g_hash_table_lookup(trace->streams, ir_stream);
231 if (G_UNLIKELY(!stream)) {
232 stream = fs_sink_stream_create(trace, ir_stream);
233 if (!stream) {
234 goto end;
235 }
236 }
15fe47e0
PP
237
238end:
4164020e 239 return stream;
15fe47e0
PP
240}
241
4164020e
SM
242static inline bt_component_class_sink_consume_method_status
243handle_event_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 244{
4164020e
SM
245 int ret;
246 bt_component_class_sink_consume_method_status status =
247 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
248 const bt_event *ir_event = bt_message_event_borrow_event_const(msg);
249 const bt_stream *ir_stream = bt_event_borrow_stream_const(ir_event);
250 struct fs_sink_stream *stream;
251 struct fs_sink_ctf_event_class *ec = NULL;
252 const bt_clock_snapshot *cs = NULL;
253
254 stream = borrow_stream(fs_sink, ir_stream);
255 if (G_UNLIKELY(!stream)) {
256 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
257 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
258 goto end;
259 }
260
261 ret = try_translate_event_class_trace_ir_to_ctf_ir(fs_sink, stream->sc,
262 bt_event_borrow_class_const(ir_event), &ec);
263 if (ret) {
264 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to translate event class to CTF IR.");
265 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
266 goto end;
267 }
268
269 BT_ASSERT_DBG(ec);
270
271 if (stream->sc->default_clock_class) {
272 cs = bt_message_event_borrow_default_clock_snapshot_const(msg);
273 }
274
275 /*
276 * If this event's stream does not support packets, then we
277 * lazily create artificial packets.
278 *
279 * The size of an artificial packet is arbitrarily at least
280 * 4 MiB (it usually is greater because we close it when
281 * comes the time to write a new event and the packet's content
282 * size is >= 4 MiB), except the last one which can be smaller.
283 */
284 if (G_UNLIKELY(!stream->sc->has_packets)) {
285 if (stream->packet_state.is_open &&
286 bt_ctfser_get_offset_in_current_packet_bits(&stream->ctfser) / 8 >= 4 * 1024 * 1024) {
287 /*
288 * Stream's current packet is larger than 4 MiB:
289 * close it. A new packet will be opened just
290 * below.
291 */
292 ret = fs_sink_stream_close_packet(stream, NULL);
293 if (ret) {
294 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to close packet.");
295 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
296 goto end;
297 }
298 }
299
300 if (!stream->packet_state.is_open) {
301 /* Stream's packet is not currently opened: open it */
302 ret = fs_sink_stream_open_packet(stream, NULL, NULL);
303 if (ret) {
304 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to open packet.");
305 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
306 goto end;
307 }
308 }
309 }
310
311 BT_ASSERT_DBG(stream->packet_state.is_open);
312 ret = fs_sink_stream_write_event(stream, cs, ir_event, ec);
313 if (G_UNLIKELY(ret)) {
314 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to write event.");
315 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
316 goto end;
317 }
15fe47e0
PP
318
319end:
4164020e 320 return status;
15fe47e0
PP
321}
322
4164020e
SM
323static inline bt_component_class_sink_consume_method_status
324handle_packet_beginning_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 325{
4164020e
SM
326 int ret;
327 bt_component_class_sink_consume_method_status status =
328 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
329 const bt_packet *ir_packet = bt_message_packet_beginning_borrow_packet_const(msg);
330 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
331 struct fs_sink_stream *stream;
332 const bt_clock_snapshot *cs = NULL;
333
334 stream = borrow_stream(fs_sink, ir_stream);
335 if (G_UNLIKELY(!stream)) {
336 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
337 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
338 goto end;
339 }
340
341 if (stream->sc->packets_have_ts_begin) {
342 cs = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
343 BT_ASSERT(cs);
344 }
345
346 /*
347 * If we previously received a discarded events message with
348 * a time range, make sure that its beginning time matches what's
349 * expected for CTF 1.8, that is:
350 *
351 * * Its beginning time is the previous packet's end
352 * time (or the current packet's beginning time if
353 * this is the first packet).
354 *
355 * We check this here instead of in handle_packet_end_msg()
356 * because we want to catch any incompatible message as early as
357 * possible to report the error.
358 *
359 * Validation of the discarded events message's end time is
360 * performed in handle_packet_end_msg().
361 */
362 if (stream->discarded_events_state.in_range) {
363 uint64_t expected_cs;
364
365 /*
366 * `stream->discarded_events_state.in_range` is only set
367 * when the stream class's discarded events have a time
368 * range.
369 *
370 * It is required that the packet beginning and end
371 * messages for this stream class have times when
372 * discarded events have a time range.
373 */
374 BT_ASSERT(stream->sc->discarded_events_has_ts);
375 BT_ASSERT(stream->sc->packets_have_ts_begin);
376 BT_ASSERT(stream->sc->packets_have_ts_end);
377
378 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
379 /* We're opening the first packet */
380 expected_cs = bt_clock_snapshot_get_value(cs);
381 } else {
382 expected_cs = stream->prev_packet_state.end_cs;
383 }
384
385 if (stream->discarded_events_state.beginning_cs != expected_cs) {
386 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
387 "Incompatible discarded events message: "
388 "unexpected beginning time: "
389 "beginning-cs-val=%" PRIu64 ", "
390 "expected-beginning-cs-val=%" PRIu64 ", "
391 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
392 "trace-name=\"%s\", path=\"%s/%s\"",
393 stream->discarded_events_state.beginning_cs, expected_cs,
394 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
395 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
396 stream->trace->path->str, stream->file_name->str);
397 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
398 goto end;
399 }
400 }
401
402 /*
403 * If we previously received a discarded packets message with a
404 * time range, make sure that its beginning and end times match
405 * what's expected for CTF 1.8, that is:
406 *
407 * * Its beginning time is the previous packet's end time.
408 *
409 * * Its end time is the current packet's beginning time.
410 */
411 if (stream->discarded_packets_state.in_range) {
412 uint64_t expected_end_cs;
413
414 /*
415 * `stream->discarded_packets_state.in_range` is only
416 * set when the stream class's discarded packets have a
417 * time range.
418 *
419 * It is required that the packet beginning and end
420 * messages for this stream class have times when
421 * discarded packets have a time range.
422 */
423 BT_ASSERT(stream->sc->discarded_packets_has_ts);
424 BT_ASSERT(stream->sc->packets_have_ts_begin);
425 BT_ASSERT(stream->sc->packets_have_ts_end);
426
427 /*
428 * It is not supported to have a discarded packets
429 * message _before_ the first packet: we cannot validate
430 * that its beginning time is compatible with CTF 1.8 in
431 * this case.
432 */
433 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
434 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
435 "Incompatible discarded packets message "
436 "occurring before the stream's first packet: "
437 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
438 "trace-name=\"%s\", path=\"%s/%s\"",
439 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
440 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
441 stream->trace->path->str, stream->file_name->str);
442 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
443 goto end;
444 }
445
446 if (stream->discarded_packets_state.beginning_cs != stream->prev_packet_state.end_cs) {
447 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
448 "Incompatible discarded packets message: "
449 "unexpected beginning time: "
450 "beginning-cs-val=%" PRIu64 ", "
451 "expected-beginning-cs-val=%" PRIu64 ", "
452 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
453 "trace-name=\"%s\", path=\"%s/%s\"",
454 stream->discarded_packets_state.beginning_cs,
455 stream->prev_packet_state.end_cs, bt_stream_get_id(ir_stream),
456 bt_stream_get_name(ir_stream),
457 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
458 stream->trace->path->str, stream->file_name->str);
459 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
460 goto end;
461 }
462
463 expected_end_cs = bt_clock_snapshot_get_value(cs);
464
465 if (stream->discarded_packets_state.end_cs != expected_end_cs) {
466 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
467 "Incompatible discarded packets message: "
468 "unexpected end time: "
469 "end-cs-val=%" PRIu64 ", "
470 "expected-end-cs-val=%" PRIu64 ", "
471 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
472 "trace-name=\"%s\", path=\"%s/%s\"",
473 stream->discarded_packets_state.end_cs, expected_end_cs,
474 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
475 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
476 stream->trace->path->str, stream->file_name->str);
477 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
478 goto end;
479 }
480 }
481
482 /*
483 * We're not in a discarded packets time range anymore since we
484 * require that the discarded packets time ranges go from one
485 * packet's end time to the next packet's beginning time, and
486 * we're handling a packet beginning message here.
487 */
488 stream->discarded_packets_state.in_range = false;
489
490 ret = fs_sink_stream_open_packet(stream, cs, ir_packet);
491 if (ret) {
492 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to open packet.");
493 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
494 goto end;
495 }
15fe47e0
PP
496
497end:
4164020e 498 return status;
15fe47e0
PP
499}
500
4164020e
SM
501static inline bt_component_class_sink_consume_method_status
502handle_packet_end_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 503{
4164020e
SM
504 int ret;
505 bt_component_class_sink_consume_method_status status =
506 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
507 const bt_packet *ir_packet = bt_message_packet_end_borrow_packet_const(msg);
508 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
509 struct fs_sink_stream *stream;
510 const bt_clock_snapshot *cs = NULL;
511
512 stream = borrow_stream(fs_sink, ir_stream);
513 if (G_UNLIKELY(!stream)) {
514 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
515 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
516 goto end;
517 }
518
519 if (stream->sc->packets_have_ts_end) {
520 cs = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
521 BT_ASSERT(cs);
522 }
523
524 /*
525 * If we previously received a discarded events message with
526 * a time range, make sure that its end time matches what's
527 * expected for CTF 1.8, that is:
528 *
529 * * Its end time is the current packet's end time.
530 *
531 * Validation of the discarded events message's beginning time
532 * is performed in handle_packet_beginning_msg().
533 */
534 if (stream->discarded_events_state.in_range) {
535 uint64_t expected_cs;
536
537 /*
538 * `stream->discarded_events_state.in_range` is only set
539 * when the stream class's discarded events have a time
540 * range.
541 *
542 * It is required that the packet beginning and end
543 * messages for this stream class have times when
544 * discarded events have a time range.
545 */
546 BT_ASSERT(stream->sc->discarded_events_has_ts);
547 BT_ASSERT(stream->sc->packets_have_ts_begin);
548 BT_ASSERT(stream->sc->packets_have_ts_end);
549
550 expected_cs = bt_clock_snapshot_get_value(cs);
551
552 if (stream->discarded_events_state.end_cs != expected_cs) {
553 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
554 "Incompatible discarded events message: "
555 "unexpected end time: "
556 "end-cs-val=%" PRIu64 ", "
557 "expected-end-cs-val=%" PRIu64 ", "
558 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
559 "trace-name=\"%s\", path=\"%s/%s\"",
560 stream->discarded_events_state.end_cs, expected_cs,
561 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
562 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
563 stream->trace->path->str, stream->file_name->str);
564 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
565 goto end;
566 }
567 }
568
569 ret = fs_sink_stream_close_packet(stream, cs);
570 if (ret) {
571 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to close packet.");
572 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
573 goto end;
574 }
575
576 /*
577 * We're not in a discarded events time range anymore since we
578 * require that the discarded events time ranges go from one
579 * packet's end time to the next packet's end time, and we're
580 * handling a packet end message here.
581 */
582 stream->discarded_events_state.in_range = false;
15fe47e0
PP
583
584end:
4164020e 585 return status;
15fe47e0
PP
586}
587
4164020e
SM
588static inline bt_component_class_sink_consume_method_status
589handle_stream_beginning_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 590{
4164020e
SM
591 bt_component_class_sink_consume_method_status status =
592 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
593 const bt_stream *ir_stream = bt_message_stream_beginning_borrow_stream_const(msg);
594 const bt_stream_class *ir_sc = bt_stream_borrow_class_const(ir_stream);
595 struct fs_sink_stream *stream;
596 bool packets_have_beginning_end_cs =
597 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc) &&
598 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc);
599
600 /*
601 * Not supported: discarded events or discarded packets support
602 * without packets support. Packets are the way to know where
603 * discarded events/packets occurred in CTF 1.8.
604 */
605 if (!bt_stream_class_supports_packets(ir_sc)) {
606 BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc));
607
608 if (!fs_sink->ignore_discarded_events && bt_stream_class_supports_discarded_events(ir_sc)) {
609 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
610 "Unsupported stream: "
611 "stream does not support packets, "
612 "but supports discarded events: "
613 "stream-addr=%p, "
614 "stream-id=%" PRIu64 ", "
615 "stream-name=\"%s\"",
616 ir_stream, bt_stream_get_id(ir_stream),
617 bt_stream_get_name(ir_stream));
618 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
619 goto end;
620 }
621 }
622
623 /*
624 * Not supported: discarded events with default clock snapshots,
625 * but packet beginning/end without default clock snapshot.
626 */
627 if (!fs_sink->ignore_discarded_events &&
628 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc) &&
629 !packets_have_beginning_end_cs) {
630 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
631 "Unsupported stream: discarded events have "
632 "default clock snapshots, but packets have no "
633 "beginning and/or end default clock snapshots: "
634 "stream-addr=%p, "
635 "stream-id=%" PRIu64 ", "
636 "stream-name=\"%s\"",
637 ir_stream, bt_stream_get_id(ir_stream),
638 bt_stream_get_name(ir_stream));
639 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
640 goto end;
641 }
642
643 /*
644 * Not supported: discarded packets with default clock
645 * snapshots, but packet beginning/end without default clock
646 * snapshot.
647 */
648 if (!fs_sink->ignore_discarded_packets &&
649 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc) &&
650 !packets_have_beginning_end_cs) {
651 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
652 "Unsupported stream: discarded packets have "
653 "default clock snapshots, but packets have no "
654 "beginning and/or end default clock snapshots: "
655 "stream-addr=%p, "
656 "stream-id=%" PRIu64 ", "
657 "stream-name=\"%s\"",
658 ir_stream, bt_stream_get_id(ir_stream),
659 bt_stream_get_name(ir_stream));
660 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
661 goto end;
662 }
663
664 stream = borrow_stream(fs_sink, ir_stream);
665 if (!stream) {
666 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
667 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
668 goto end;
669 }
670
671 BT_COMP_LOGI("Created new, empty stream file: "
672 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
673 "trace-name=\"%s\", path=\"%s/%s\"",
674 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
675 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
676 stream->trace->path->str, stream->file_name->str);
15fe47e0
PP
677
678end:
4164020e 679 return status;
15fe47e0
PP
680}
681
4164020e
SM
682static inline bt_component_class_sink_consume_method_status
683handle_stream_end_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 684{
4164020e
SM
685 bt_component_class_sink_consume_method_status status =
686 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
687 const bt_stream *ir_stream = bt_message_stream_end_borrow_stream_const(msg);
688 struct fs_sink_stream *stream;
689
690 stream = borrow_stream(fs_sink, ir_stream);
691 if (!stream) {
692 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
693 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
694 goto end;
695 }
696
697 if (G_UNLIKELY(!stream->sc->has_packets && stream->packet_state.is_open)) {
698 /* Close stream's current artificial packet */
699 int ret = fs_sink_stream_close_packet(stream, NULL);
700
701 if (ret) {
702 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to close packet.");
703 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
704 goto end;
705 }
706 }
707
708 BT_COMP_LOGI("Closing stream file: "
709 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
710 "trace-name=\"%s\", path=\"%s/%s\"",
711 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
712 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
713 stream->trace->path->str, stream->file_name->str);
714
715 /*
716 * This destroys the stream object and frees all its resources,
717 * closing the stream file.
718 */
719 g_hash_table_remove(stream->trace->streams, ir_stream);
15fe47e0
PP
720
721end:
4164020e 722 return status;
15fe47e0
PP
723}
724
4164020e
SM
725static inline bt_component_class_sink_consume_method_status
726handle_discarded_events_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 727{
4164020e
SM
728 bt_component_class_sink_consume_method_status status =
729 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
730 const bt_stream *ir_stream = bt_message_discarded_events_borrow_stream_const(msg);
731 struct fs_sink_stream *stream;
732 const bt_clock_snapshot *cs = NULL;
733 bt_property_availability avail;
734 uint64_t count;
735
736 stream = borrow_stream(fs_sink, ir_stream);
737 if (!stream) {
738 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
739 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
740 goto end;
741 }
742
743 if (fs_sink->ignore_discarded_events) {
744 BT_COMP_LOGI("Ignoring discarded events message: "
745 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
746 "trace-name=\"%s\", path=\"%s/%s\"",
747 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
748 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
749 stream->trace->path->str, stream->file_name->str);
750 goto end;
751 }
752
753 if (stream->discarded_events_state.in_range) {
754 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
755 "Unsupported contiguous discarded events message: "
756 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
757 "trace-name=\"%s\", path=\"%s/%s\"",
758 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
759 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
760 stream->trace->path->str, stream->file_name->str);
761 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
762 goto end;
763 }
764
765 /*
766 * If we're currently in an opened packet (got a packet
767 * beginning message, but no packet end message yet), we do not
768 * support having a discarded events message with a time range
769 * because we require that the discarded events message's time
770 * range go from a packet's end time to the next packet's end
771 * time.
772 */
773 if (stream->packet_state.is_open && stream->sc->discarded_events_has_ts) {
774 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
775 "Unsupported discarded events message with "
776 "default clock snapshots occurring within a packet: "
777 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
778 "trace-name=\"%s\", path=\"%s/%s\"",
779 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
780 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
781 stream->trace->path->str, stream->file_name->str);
782 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
783 goto end;
784 }
785
786 if (stream->sc->discarded_events_has_ts) {
787 /*
788 * Make the stream's state be in the time range of a
789 * discarded events message since we have the message's
790 * time range (`stream->sc->discarded_events_has_ts`).
791 */
792 stream->discarded_events_state.in_range = true;
793
794 /*
795 * The clock snapshot values will be validated when
796 * handling the next packet beginning and end messages
797 * (next calls to handle_packet_beginning_msg() and
798 * handle_packet_end_msg()).
799 */
800 cs = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
801 BT_ASSERT(cs);
802 stream->discarded_events_state.beginning_cs = bt_clock_snapshot_get_value(cs);
803 cs = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg);
804 BT_ASSERT(cs);
805 stream->discarded_events_state.end_cs = bt_clock_snapshot_get_value(cs);
806 }
807
808 avail = bt_message_discarded_events_get_count(msg, &count);
809 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
810 /*
811 * There's no specific count of discarded events: set it
812 * to 1 so that we know that we at least discarded
813 * something.
814 */
815 count = 1;
816 }
817
818 stream->packet_state.discarded_events_counter += count;
15fe47e0
PP
819
820end:
4164020e 821 return status;
15fe47e0
PP
822}
823
4164020e
SM
824static inline bt_component_class_sink_consume_method_status
825handle_discarded_packets_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 826{
4164020e
SM
827 bt_component_class_sink_consume_method_status status =
828 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
829 const bt_stream *ir_stream = bt_message_discarded_packets_borrow_stream_const(msg);
830 struct fs_sink_stream *stream;
831 const bt_clock_snapshot *cs = NULL;
832 bt_property_availability avail;
833 uint64_t count;
834
835 stream = borrow_stream(fs_sink, ir_stream);
836 if (!stream) {
837 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
838 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
839 goto end;
840 }
841
842 if (fs_sink->ignore_discarded_packets) {
843 BT_COMP_LOGI("Ignoring discarded packets message: "
844 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
845 "trace-name=\"%s\", path=\"%s/%s\"",
846 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
847 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
848 stream->trace->path->str, stream->file_name->str);
849 goto end;
850 }
851
852 if (stream->discarded_packets_state.in_range) {
853 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
854 "Unsupported contiguous discarded packets message: "
855 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
856 "trace-name=\"%s\", path=\"%s/%s\"",
857 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
858 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
859 stream->trace->path->str, stream->file_name->str);
860 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
861 goto end;
862 }
863
864 /*
865 * Discarded packets messages are guaranteed to occur between
866 * packets.
867 */
868 BT_ASSERT(!stream->packet_state.is_open);
869
870 if (stream->sc->discarded_packets_has_ts) {
871 /*
872 * Make the stream's state be in the time range of a
873 * discarded packets message since we have the message's
874 * time range (`stream->sc->discarded_packets_has_ts`).
875 */
876 stream->discarded_packets_state.in_range = true;
877
878 /*
879 * The clock snapshot values will be validated when
880 * handling the next packet beginning message (next call
881 * to handle_packet_beginning_msg()).
882 */
883 cs = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
884 BT_ASSERT(cs);
885 stream->discarded_packets_state.beginning_cs = bt_clock_snapshot_get_value(cs);
886 cs = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg);
887 BT_ASSERT(cs);
888 stream->discarded_packets_state.end_cs = bt_clock_snapshot_get_value(cs);
889 }
890
891 avail = bt_message_discarded_packets_get_count(msg, &count);
892 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
893 /*
894 * There's no specific count of discarded packets: set
895 * it to 1 so that we know that we at least discarded
896 * something.
897 */
898 count = 1;
899 }
900
901 stream->packet_state.seq_num += count;
15fe47e0
PP
902
903end:
4164020e 904 return status;
15fe47e0
PP
905}
906
4164020e 907static inline void put_messages(bt_message_array_const msgs, uint64_t count)
15fe47e0 908{
4164020e 909 uint64_t i;
15fe47e0 910
4164020e
SM
911 for (i = 0; i < count; i++) {
912 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
913 }
15fe47e0
PP
914}
915
4164020e 916bt_component_class_sink_consume_method_status ctf_fs_sink_consume(bt_self_component_sink *self_comp)
15fe47e0 917{
4164020e
SM
918 bt_component_class_sink_consume_method_status status =
919 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
920 struct fs_sink_comp *fs_sink;
921 bt_message_iterator_next_status next_status;
922 uint64_t msg_count = 0;
923 bt_message_array_const msgs;
924
925 fs_sink = (fs_sink_comp *) bt_self_component_get_data(
926 bt_self_component_sink_as_self_component(self_comp));
927 BT_ASSERT_DBG(fs_sink);
928 BT_ASSERT_DBG(fs_sink->upstream_iter);
929
930 /* Consume messages */
931 next_status = bt_message_iterator_next(fs_sink->upstream_iter, &msgs, &msg_count);
932 if (next_status < 0) {
933 status = (bt_component_class_sink_consume_method_status) next_status;
934 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
935 "Failed to get next message from upstream iterator.");
936 goto end;
937 }
938
939 switch (next_status) {
940 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK:
941 {
942 uint64_t i;
943
944 for (i = 0; i < msg_count; i++) {
945 const bt_message *msg = msgs[i];
946
947 BT_ASSERT_DBG(msg);
948
949 switch (bt_message_get_type(msg)) {
950 case BT_MESSAGE_TYPE_EVENT:
951 status = handle_event_msg(fs_sink, msg);
952 break;
953 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
954 status = handle_packet_beginning_msg(fs_sink, msg);
955 break;
956 case BT_MESSAGE_TYPE_PACKET_END:
957 status = handle_packet_end_msg(fs_sink, msg);
958 break;
959 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
960 /* Ignore */
961 BT_COMP_LOGD_STR("Ignoring message iterator inactivity message.");
962 break;
963 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
964 status = handle_stream_beginning_msg(fs_sink, msg);
965 break;
966 case BT_MESSAGE_TYPE_STREAM_END:
967 status = handle_stream_end_msg(fs_sink, msg);
968 break;
969 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
970 status = handle_discarded_events_msg(fs_sink, msg);
971 break;
972 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
973 status = handle_discarded_packets_msg(fs_sink, msg);
974 break;
975 default:
976 bt_common_abort();
977 }
978
979 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
980
981 if (status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK) {
982 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
983 "Failed to handle message: "
984 "generated CTF traces could be incomplete: "
985 "output-dir-path=\"%s\"",
986 fs_sink->output_dir_path->str);
987 goto error;
988 }
989 }
990
991 break;
992 }
993 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN:
994 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN;
995 break;
996 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END:
997 /* TODO: Finalize all traces (should already be done?) */
998 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END;
999 break;
1000 default:
1001 break;
1002 }
1003
1004 goto end;
15fe47e0
PP
1005
1006error:
4164020e
SM
1007 BT_ASSERT(status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK);
1008 put_messages(msgs, msg_count);
15fe47e0
PP
1009
1010end:
4164020e 1011 return status;
15fe47e0
PP
1012}
1013
e803df70 1014bt_component_class_sink_graph_is_configured_method_status
4164020e 1015ctf_fs_sink_graph_is_configured(bt_self_component_sink *self_comp)
15fe47e0 1016{
4164020e
SM
1017 bt_component_class_sink_graph_is_configured_method_status status;
1018 bt_message_iterator_create_from_sink_component_status msg_iter_status;
1019 fs_sink_comp *fs_sink = (fs_sink_comp *) bt_self_component_get_data(
1020 bt_self_component_sink_as_self_component(self_comp));
1021
1022 msg_iter_status = bt_message_iterator_create_from_sink_component(
1023 self_comp, bt_self_component_sink_borrow_input_port_by_name(self_comp, in_port_name),
1024 &fs_sink->upstream_iter);
1025 if (msg_iter_status != BT_MESSAGE_ITERATOR_CREATE_FROM_SINK_COMPONENT_STATUS_OK) {
1026 status = (bt_component_class_sink_graph_is_configured_method_status) msg_iter_status;
1027 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to create upstream iterator.");
1028 goto end;
1029 }
1030
1031 status = BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK;
15fe47e0 1032end:
4164020e 1033 return status;
15fe47e0
PP
1034}
1035
15fe47e0
PP
1036void ctf_fs_sink_finalize(bt_self_component_sink *self_comp)
1037{
4164020e
SM
1038 fs_sink_comp *fs_sink = (fs_sink_comp *) bt_self_component_get_data(
1039 bt_self_component_sink_as_self_component(self_comp));
15fe47e0 1040
4164020e 1041 destroy_fs_sink_comp(fs_sink);
15fe47e0 1042}
This page took 0.116561 seconds and 4 git commands to generate.