Re-format new C++ files
[babeltrace.git] / src / plugins / ctf / fs-sink / fs-sink.cpp
CommitLineData
15fe47e0 1/*
0235b0db 2 * SPDX-License-Identifier: MIT
15fe47e0 3 *
0235b0db 4 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
15fe47e0
PP
5 */
6
aa1a7452 7#define BT_COMP_LOG_SELF_COMP (fs_sink->self_comp)
4164020e
SM
8#define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level)
9#define BT_LOG_TAG "PLUGIN/SINK.CTF.FS"
d9c39b0a 10#include "logging/comp-logging.h"
15fe47e0 11
3fadfbc0 12#include <babeltrace2/babeltrace.h>
15fe47e0
PP
13#include <stdio.h>
14#include <stdbool.h>
15#include <glib.h>
578e048b
MJ
16#include "common/assert.h"
17#include "ctfser/ctfser.h"
ce67f561 18#include "plugins/common/param-validation/param-validation.h"
15fe47e0 19
087cd0f5
SM
20#include "fs-sink.hpp"
21#include "fs-sink-trace.hpp"
22#include "fs-sink-stream.hpp"
23#include "fs-sink-ctf-meta.hpp"
24#include "translate-trace-ir-to-ctf-ir.hpp"
25#include "translate-ctf-ir-to-tsdl.hpp"
15fe47e0 26
4164020e 27static const char * const in_port_name = "in";
15fe47e0 28
4164020e
SM
29static bt_component_class_initialize_method_status
30ensure_output_dir_exists(struct fs_sink_comp *fs_sink)
15fe47e0 31{
4164020e
SM
32 bt_component_class_initialize_method_status status =
33 BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
34 int ret;
35
36 ret = g_mkdir_with_parents(fs_sink->output_dir_path->str, 0755);
37 if (ret) {
38 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(fs_sink->self_comp,
39 "Cannot create directories for output directory",
40 ": output-dir-path=\"%s\"", fs_sink->output_dir_path->str);
41 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
42 goto end;
43 }
15fe47e0
PP
44
45end:
4164020e 46 return status;
15fe47e0
PP
47}
48
087cd0f5 49static bt_param_validation_map_value_entry_descr fs_sink_params_descr[] = {
4164020e
SM
50 {"path",
51 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
52 {bt_param_validation_value_descr::string_t}},
53 {"assume-single-trace",
54 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
55 {bt_param_validation_value_descr::bool_t}},
56 {"ignore-discarded-events",
57 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
58 {bt_param_validation_value_descr::bool_t}},
59 {"ignore-discarded-packets",
60 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
61 {bt_param_validation_value_descr::bool_t}},
62 {"quiet",
63 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
64 {bt_param_validation_value_descr::bool_t}},
65 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
66
67static bt_component_class_initialize_method_status configure_component(struct fs_sink_comp *fs_sink,
68 const bt_value *params)
15fe47e0 69{
4164020e
SM
70 bt_component_class_initialize_method_status status;
71 const bt_value *value;
72 enum bt_param_validation_status validation_status;
73 gchar *validation_error;
74
75 validation_status =
76 bt_param_validation_validate(params, fs_sink_params_descr, &validation_error);
77 if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
78 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
79 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "%s", validation_error);
80 goto end;
81 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
82 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
83 goto end;
84 }
85
86 value = bt_value_map_borrow_entry_value_const(params, "path");
87 g_string_assign(fs_sink->output_dir_path, bt_value_string_get(value));
88
89 value = bt_value_map_borrow_entry_value_const(params, "assume-single-trace");
90 if (value) {
91 fs_sink->assume_single_trace = (bool) bt_value_bool_get(value);
92 }
93
94 value = bt_value_map_borrow_entry_value_const(params, "ignore-discarded-events");
95 if (value) {
96 fs_sink->ignore_discarded_events = (bool) bt_value_bool_get(value);
97 }
98
99 value = bt_value_map_borrow_entry_value_const(params, "ignore-discarded-packets");
100 if (value) {
101 fs_sink->ignore_discarded_packets = (bool) bt_value_bool_get(value);
102 }
103
104 value = bt_value_map_borrow_entry_value_const(params, "quiet");
105 if (value) {
106 fs_sink->quiet = (bool) bt_value_bool_get(value);
107 }
108
109 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
ce67f561 110
15fe47e0 111end:
4164020e
SM
112 g_free(validation_error);
113 return status;
15fe47e0
PP
114}
115
4164020e 116static void destroy_fs_sink_comp(struct fs_sink_comp *fs_sink)
15fe47e0 117{
4164020e
SM
118 if (!fs_sink) {
119 goto end;
120 }
15fe47e0 121
4164020e
SM
122 if (fs_sink->output_dir_path) {
123 g_string_free(fs_sink->output_dir_path, TRUE);
124 fs_sink->output_dir_path = NULL;
125 }
15fe47e0 126
4164020e
SM
127 if (fs_sink->traces) {
128 g_hash_table_destroy(fs_sink->traces);
129 fs_sink->traces = NULL;
130 }
15fe47e0 131
4164020e
SM
132 BT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(fs_sink->upstream_iter);
133 g_free(fs_sink);
15fe47e0
PP
134
135end:
4164020e 136 return;
15fe47e0
PP
137}
138
139BT_HIDDEN
4164020e
SM
140bt_component_class_initialize_method_status
141ctf_fs_sink_init(bt_self_component_sink *self_comp_sink,
142 bt_self_component_sink_configuration *config, const bt_value *params,
143 void *init_method_data)
15fe47e0 144{
4164020e
SM
145 bt_component_class_initialize_method_status status;
146 bt_self_component_add_port_status add_port_status;
147 struct fs_sink_comp *fs_sink = NULL;
148 bt_self_component *self_comp = bt_self_component_sink_as_self_component(self_comp_sink);
149 bt_logging_level log_level =
150 bt_component_get_logging_level(bt_self_component_as_component(self_comp));
151
152 fs_sink = g_new0(struct fs_sink_comp, 1);
153 if (!fs_sink) {
154 BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR, log_level, self_comp,
155 "Failed to allocate one CTF FS sink structure.");
156 BT_CURRENT_THREAD_ERROR_APPEND_CAUSE_FROM_COMPONENT(
157 self_comp, "Failed to allocate one CTF FS sink structure.");
158 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
159 goto end;
160 }
161
162 fs_sink->log_level = log_level;
163 fs_sink->self_comp = self_comp;
164 fs_sink->output_dir_path = g_string_new(NULL);
165 status = configure_component(fs_sink, params);
166 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
167 /* configure_component() logs errors */
168 goto end;
169 }
170
171 if (fs_sink->assume_single_trace &&
172 g_file_test(fs_sink->output_dir_path->str, G_FILE_TEST_EXISTS)) {
173 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
174 "Single trace mode, but output path exists: output-path=\"%s\"",
175 fs_sink->output_dir_path->str);
176 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
177 goto end;
178 }
179
180 status = ensure_output_dir_exists(fs_sink);
181 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
182 /* ensure_output_dir_exists() logs errors */
183 goto end;
184 }
185
186 fs_sink->traces = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL,
187 (GDestroyNotify) fs_sink_trace_destroy);
188 if (!fs_sink->traces) {
189 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate one GHashTable.");
190 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
191 goto end;
192 }
193
194 add_port_status =
195 bt_self_component_sink_add_input_port(self_comp_sink, in_port_name, NULL, NULL);
196 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
197 status = (bt_component_class_initialize_method_status) add_port_status;
198 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to add input port.");
199 goto end;
200 }
201
202 bt_self_component_set_data(self_comp, fs_sink);
15fe47e0
PP
203
204end:
4164020e
SM
205 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
206 destroy_fs_sink_comp(fs_sink);
207 }
15fe47e0 208
4164020e 209 return status;
15fe47e0
PP
210}
211
4164020e
SM
212static inline struct fs_sink_stream *borrow_stream(struct fs_sink_comp *fs_sink,
213 const bt_stream *ir_stream)
15fe47e0 214{
4164020e
SM
215 const bt_trace *ir_trace = bt_stream_borrow_trace_const(ir_stream);
216 struct fs_sink_trace *trace;
217 struct fs_sink_stream *stream = NULL;
218
219 trace = (fs_sink_trace *) g_hash_table_lookup(fs_sink->traces, ir_trace);
220 if (G_UNLIKELY(!trace)) {
221 if (fs_sink->assume_single_trace && g_hash_table_size(fs_sink->traces) > 0) {
222 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
223 "Single trace mode, but getting more than one trace: "
224 "stream-name=\"%s\"",
225 bt_stream_get_name(ir_stream));
226 goto end;
227 }
228
229 trace = fs_sink_trace_create(fs_sink, ir_trace);
230 if (!trace) {
231 goto end;
232 }
233 }
234
235 stream = (fs_sink_stream *) g_hash_table_lookup(trace->streams, ir_stream);
236 if (G_UNLIKELY(!stream)) {
237 stream = fs_sink_stream_create(trace, ir_stream);
238 if (!stream) {
239 goto end;
240 }
241 }
15fe47e0
PP
242
243end:
4164020e 244 return stream;
15fe47e0
PP
245}
246
4164020e
SM
247static inline bt_component_class_sink_consume_method_status
248handle_event_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 249{
4164020e
SM
250 int ret;
251 bt_component_class_sink_consume_method_status status =
252 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
253 const bt_event *ir_event = bt_message_event_borrow_event_const(msg);
254 const bt_stream *ir_stream = bt_event_borrow_stream_const(ir_event);
255 struct fs_sink_stream *stream;
256 struct fs_sink_ctf_event_class *ec = NULL;
257 const bt_clock_snapshot *cs = NULL;
258
259 stream = borrow_stream(fs_sink, ir_stream);
260 if (G_UNLIKELY(!stream)) {
261 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
262 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
263 goto end;
264 }
265
266 ret = try_translate_event_class_trace_ir_to_ctf_ir(fs_sink, stream->sc,
267 bt_event_borrow_class_const(ir_event), &ec);
268 if (ret) {
269 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to translate event class to CTF IR.");
270 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
271 goto end;
272 }
273
274 BT_ASSERT_DBG(ec);
275
276 if (stream->sc->default_clock_class) {
277 cs = bt_message_event_borrow_default_clock_snapshot_const(msg);
278 }
279
280 /*
281 * If this event's stream does not support packets, then we
282 * lazily create artificial packets.
283 *
284 * The size of an artificial packet is arbitrarily at least
285 * 4 MiB (it usually is greater because we close it when
286 * comes the time to write a new event and the packet's content
287 * size is >= 4 MiB), except the last one which can be smaller.
288 */
289 if (G_UNLIKELY(!stream->sc->has_packets)) {
290 if (stream->packet_state.is_open &&
291 bt_ctfser_get_offset_in_current_packet_bits(&stream->ctfser) / 8 >= 4 * 1024 * 1024) {
292 /*
293 * Stream's current packet is larger than 4 MiB:
294 * close it. A new packet will be opened just
295 * below.
296 */
297 ret = fs_sink_stream_close_packet(stream, NULL);
298 if (ret) {
299 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to close packet.");
300 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
301 goto end;
302 }
303 }
304
305 if (!stream->packet_state.is_open) {
306 /* Stream's packet is not currently opened: open it */
307 ret = fs_sink_stream_open_packet(stream, NULL, NULL);
308 if (ret) {
309 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to open packet.");
310 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
311 goto end;
312 }
313 }
314 }
315
316 BT_ASSERT_DBG(stream->packet_state.is_open);
317 ret = fs_sink_stream_write_event(stream, cs, ir_event, ec);
318 if (G_UNLIKELY(ret)) {
319 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to write event.");
320 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
321 goto end;
322 }
15fe47e0
PP
323
324end:
4164020e 325 return status;
15fe47e0
PP
326}
327
4164020e
SM
328static inline bt_component_class_sink_consume_method_status
329handle_packet_beginning_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 330{
4164020e
SM
331 int ret;
332 bt_component_class_sink_consume_method_status status =
333 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
334 const bt_packet *ir_packet = bt_message_packet_beginning_borrow_packet_const(msg);
335 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
336 struct fs_sink_stream *stream;
337 const bt_clock_snapshot *cs = NULL;
338
339 stream = borrow_stream(fs_sink, ir_stream);
340 if (G_UNLIKELY(!stream)) {
341 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
342 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
343 goto end;
344 }
345
346 if (stream->sc->packets_have_ts_begin) {
347 cs = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
348 BT_ASSERT(cs);
349 }
350
351 /*
352 * If we previously received a discarded events message with
353 * a time range, make sure that its beginning time matches what's
354 * expected for CTF 1.8, that is:
355 *
356 * * Its beginning time is the previous packet's end
357 * time (or the current packet's beginning time if
358 * this is the first packet).
359 *
360 * We check this here instead of in handle_packet_end_msg()
361 * because we want to catch any incompatible message as early as
362 * possible to report the error.
363 *
364 * Validation of the discarded events message's end time is
365 * performed in handle_packet_end_msg().
366 */
367 if (stream->discarded_events_state.in_range) {
368 uint64_t expected_cs;
369
370 /*
371 * `stream->discarded_events_state.in_range` is only set
372 * when the stream class's discarded events have a time
373 * range.
374 *
375 * It is required that the packet beginning and end
376 * messages for this stream class have times when
377 * discarded events have a time range.
378 */
379 BT_ASSERT(stream->sc->discarded_events_has_ts);
380 BT_ASSERT(stream->sc->packets_have_ts_begin);
381 BT_ASSERT(stream->sc->packets_have_ts_end);
382
383 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
384 /* We're opening the first packet */
385 expected_cs = bt_clock_snapshot_get_value(cs);
386 } else {
387 expected_cs = stream->prev_packet_state.end_cs;
388 }
389
390 if (stream->discarded_events_state.beginning_cs != expected_cs) {
391 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
392 "Incompatible discarded events message: "
393 "unexpected beginning time: "
394 "beginning-cs-val=%" PRIu64 ", "
395 "expected-beginning-cs-val=%" PRIu64 ", "
396 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
397 "trace-name=\"%s\", path=\"%s/%s\"",
398 stream->discarded_events_state.beginning_cs, expected_cs,
399 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
400 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
401 stream->trace->path->str, stream->file_name->str);
402 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
403 goto end;
404 }
405 }
406
407 /*
408 * If we previously received a discarded packets message with a
409 * time range, make sure that its beginning and end times match
410 * what's expected for CTF 1.8, that is:
411 *
412 * * Its beginning time is the previous packet's end time.
413 *
414 * * Its end time is the current packet's beginning time.
415 */
416 if (stream->discarded_packets_state.in_range) {
417 uint64_t expected_end_cs;
418
419 /*
420 * `stream->discarded_packets_state.in_range` is only
421 * set when the stream class's discarded packets have a
422 * time range.
423 *
424 * It is required that the packet beginning and end
425 * messages for this stream class have times when
426 * discarded packets have a time range.
427 */
428 BT_ASSERT(stream->sc->discarded_packets_has_ts);
429 BT_ASSERT(stream->sc->packets_have_ts_begin);
430 BT_ASSERT(stream->sc->packets_have_ts_end);
431
432 /*
433 * It is not supported to have a discarded packets
434 * message _before_ the first packet: we cannot validate
435 * that its beginning time is compatible with CTF 1.8 in
436 * this case.
437 */
438 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
439 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
440 "Incompatible discarded packets message "
441 "occurring before the stream's first packet: "
442 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
443 "trace-name=\"%s\", path=\"%s/%s\"",
444 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
445 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
446 stream->trace->path->str, stream->file_name->str);
447 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
448 goto end;
449 }
450
451 if (stream->discarded_packets_state.beginning_cs != stream->prev_packet_state.end_cs) {
452 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
453 "Incompatible discarded packets message: "
454 "unexpected beginning time: "
455 "beginning-cs-val=%" PRIu64 ", "
456 "expected-beginning-cs-val=%" PRIu64 ", "
457 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
458 "trace-name=\"%s\", path=\"%s/%s\"",
459 stream->discarded_packets_state.beginning_cs,
460 stream->prev_packet_state.end_cs, bt_stream_get_id(ir_stream),
461 bt_stream_get_name(ir_stream),
462 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
463 stream->trace->path->str, stream->file_name->str);
464 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
465 goto end;
466 }
467
468 expected_end_cs = bt_clock_snapshot_get_value(cs);
469
470 if (stream->discarded_packets_state.end_cs != expected_end_cs) {
471 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
472 "Incompatible discarded packets message: "
473 "unexpected end time: "
474 "end-cs-val=%" PRIu64 ", "
475 "expected-end-cs-val=%" PRIu64 ", "
476 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
477 "trace-name=\"%s\", path=\"%s/%s\"",
478 stream->discarded_packets_state.end_cs, expected_end_cs,
479 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
480 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
481 stream->trace->path->str, stream->file_name->str);
482 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
483 goto end;
484 }
485 }
486
487 /*
488 * We're not in a discarded packets time range anymore since we
489 * require that the discarded packets time ranges go from one
490 * packet's end time to the next packet's beginning time, and
491 * we're handling a packet beginning message here.
492 */
493 stream->discarded_packets_state.in_range = false;
494
495 ret = fs_sink_stream_open_packet(stream, cs, ir_packet);
496 if (ret) {
497 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to open packet.");
498 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
499 goto end;
500 }
15fe47e0
PP
501
502end:
4164020e 503 return status;
15fe47e0
PP
504}
505
4164020e
SM
506static inline bt_component_class_sink_consume_method_status
507handle_packet_end_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 508{
4164020e
SM
509 int ret;
510 bt_component_class_sink_consume_method_status status =
511 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
512 const bt_packet *ir_packet = bt_message_packet_end_borrow_packet_const(msg);
513 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
514 struct fs_sink_stream *stream;
515 const bt_clock_snapshot *cs = NULL;
516
517 stream = borrow_stream(fs_sink, ir_stream);
518 if (G_UNLIKELY(!stream)) {
519 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
520 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
521 goto end;
522 }
523
524 if (stream->sc->packets_have_ts_end) {
525 cs = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
526 BT_ASSERT(cs);
527 }
528
529 /*
530 * If we previously received a discarded events message with
531 * a time range, make sure that its end time matches what's
532 * expected for CTF 1.8, that is:
533 *
534 * * Its end time is the current packet's end time.
535 *
536 * Validation of the discarded events message's beginning time
537 * is performed in handle_packet_beginning_msg().
538 */
539 if (stream->discarded_events_state.in_range) {
540 uint64_t expected_cs;
541
542 /*
543 * `stream->discarded_events_state.in_range` is only set
544 * when the stream class's discarded events have a time
545 * range.
546 *
547 * It is required that the packet beginning and end
548 * messages for this stream class have times when
549 * discarded events have a time range.
550 */
551 BT_ASSERT(stream->sc->discarded_events_has_ts);
552 BT_ASSERT(stream->sc->packets_have_ts_begin);
553 BT_ASSERT(stream->sc->packets_have_ts_end);
554
555 expected_cs = bt_clock_snapshot_get_value(cs);
556
557 if (stream->discarded_events_state.end_cs != expected_cs) {
558 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
559 "Incompatible discarded events message: "
560 "unexpected end time: "
561 "end-cs-val=%" PRIu64 ", "
562 "expected-end-cs-val=%" PRIu64 ", "
563 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
564 "trace-name=\"%s\", path=\"%s/%s\"",
565 stream->discarded_events_state.end_cs, expected_cs,
566 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
567 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
568 stream->trace->path->str, stream->file_name->str);
569 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
570 goto end;
571 }
572 }
573
574 ret = fs_sink_stream_close_packet(stream, cs);
575 if (ret) {
576 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to close packet.");
577 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
578 goto end;
579 }
580
581 /*
582 * We're not in a discarded events time range anymore since we
583 * require that the discarded events time ranges go from one
584 * packet's end time to the next packet's end time, and we're
585 * handling a packet end message here.
586 */
587 stream->discarded_events_state.in_range = false;
15fe47e0
PP
588
589end:
4164020e 590 return status;
15fe47e0
PP
591}
592
4164020e
SM
593static inline bt_component_class_sink_consume_method_status
594handle_stream_beginning_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 595{
4164020e
SM
596 bt_component_class_sink_consume_method_status status =
597 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
598 const bt_stream *ir_stream = bt_message_stream_beginning_borrow_stream_const(msg);
599 const bt_stream_class *ir_sc = bt_stream_borrow_class_const(ir_stream);
600 struct fs_sink_stream *stream;
601 bool packets_have_beginning_end_cs =
602 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc) &&
603 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc);
604
605 /*
606 * Not supported: discarded events or discarded packets support
607 * without packets support. Packets are the way to know where
608 * discarded events/packets occurred in CTF 1.8.
609 */
610 if (!bt_stream_class_supports_packets(ir_sc)) {
611 BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc));
612
613 if (!fs_sink->ignore_discarded_events && bt_stream_class_supports_discarded_events(ir_sc)) {
614 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
615 "Unsupported stream: "
616 "stream does not support packets, "
617 "but supports discarded events: "
618 "stream-addr=%p, "
619 "stream-id=%" PRIu64 ", "
620 "stream-name=\"%s\"",
621 ir_stream, bt_stream_get_id(ir_stream),
622 bt_stream_get_name(ir_stream));
623 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
624 goto end;
625 }
626 }
627
628 /*
629 * Not supported: discarded events with default clock snapshots,
630 * but packet beginning/end without default clock snapshot.
631 */
632 if (!fs_sink->ignore_discarded_events &&
633 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc) &&
634 !packets_have_beginning_end_cs) {
635 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
636 "Unsupported stream: discarded events have "
637 "default clock snapshots, but packets have no "
638 "beginning and/or end default clock snapshots: "
639 "stream-addr=%p, "
640 "stream-id=%" PRIu64 ", "
641 "stream-name=\"%s\"",
642 ir_stream, bt_stream_get_id(ir_stream),
643 bt_stream_get_name(ir_stream));
644 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
645 goto end;
646 }
647
648 /*
649 * Not supported: discarded packets with default clock
650 * snapshots, but packet beginning/end without default clock
651 * snapshot.
652 */
653 if (!fs_sink->ignore_discarded_packets &&
654 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc) &&
655 !packets_have_beginning_end_cs) {
656 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
657 "Unsupported stream: discarded packets have "
658 "default clock snapshots, but packets have no "
659 "beginning and/or end default clock snapshots: "
660 "stream-addr=%p, "
661 "stream-id=%" PRIu64 ", "
662 "stream-name=\"%s\"",
663 ir_stream, bt_stream_get_id(ir_stream),
664 bt_stream_get_name(ir_stream));
665 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
666 goto end;
667 }
668
669 stream = borrow_stream(fs_sink, ir_stream);
670 if (!stream) {
671 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
672 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
673 goto end;
674 }
675
676 BT_COMP_LOGI("Created new, empty stream file: "
677 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
678 "trace-name=\"%s\", path=\"%s/%s\"",
679 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
680 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
681 stream->trace->path->str, stream->file_name->str);
15fe47e0
PP
682
683end:
4164020e 684 return status;
15fe47e0
PP
685}
686
4164020e
SM
687static inline bt_component_class_sink_consume_method_status
688handle_stream_end_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 689{
4164020e
SM
690 bt_component_class_sink_consume_method_status status =
691 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
692 const bt_stream *ir_stream = bt_message_stream_end_borrow_stream_const(msg);
693 struct fs_sink_stream *stream;
694
695 stream = borrow_stream(fs_sink, ir_stream);
696 if (!stream) {
697 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
698 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
699 goto end;
700 }
701
702 if (G_UNLIKELY(!stream->sc->has_packets && stream->packet_state.is_open)) {
703 /* Close stream's current artificial packet */
704 int ret = fs_sink_stream_close_packet(stream, NULL);
705
706 if (ret) {
707 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to close packet.");
708 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
709 goto end;
710 }
711 }
712
713 BT_COMP_LOGI("Closing stream file: "
714 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
715 "trace-name=\"%s\", path=\"%s/%s\"",
716 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
717 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
718 stream->trace->path->str, stream->file_name->str);
719
720 /*
721 * This destroys the stream object and frees all its resources,
722 * closing the stream file.
723 */
724 g_hash_table_remove(stream->trace->streams, ir_stream);
15fe47e0
PP
725
726end:
4164020e 727 return status;
15fe47e0
PP
728}
729
4164020e
SM
730static inline bt_component_class_sink_consume_method_status
731handle_discarded_events_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 732{
4164020e
SM
733 bt_component_class_sink_consume_method_status status =
734 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
735 const bt_stream *ir_stream = bt_message_discarded_events_borrow_stream_const(msg);
736 struct fs_sink_stream *stream;
737 const bt_clock_snapshot *cs = NULL;
738 bt_property_availability avail;
739 uint64_t count;
740
741 stream = borrow_stream(fs_sink, ir_stream);
742 if (!stream) {
743 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
744 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
745 goto end;
746 }
747
748 if (fs_sink->ignore_discarded_events) {
749 BT_COMP_LOGI("Ignoring discarded events message: "
750 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
751 "trace-name=\"%s\", path=\"%s/%s\"",
752 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
753 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
754 stream->trace->path->str, stream->file_name->str);
755 goto end;
756 }
757
758 if (stream->discarded_events_state.in_range) {
759 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
760 "Unsupported contiguous discarded events message: "
761 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
762 "trace-name=\"%s\", path=\"%s/%s\"",
763 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
764 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
765 stream->trace->path->str, stream->file_name->str);
766 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
767 goto end;
768 }
769
770 /*
771 * If we're currently in an opened packet (got a packet
772 * beginning message, but no packet end message yet), we do not
773 * support having a discarded events message with a time range
774 * because we require that the discarded events message's time
775 * range go from a packet's end time to the next packet's end
776 * time.
777 */
778 if (stream->packet_state.is_open && stream->sc->discarded_events_has_ts) {
779 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
780 "Unsupported discarded events message with "
781 "default clock snapshots occurring within a packet: "
782 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
783 "trace-name=\"%s\", path=\"%s/%s\"",
784 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
785 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
786 stream->trace->path->str, stream->file_name->str);
787 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
788 goto end;
789 }
790
791 if (stream->sc->discarded_events_has_ts) {
792 /*
793 * Make the stream's state be in the time range of a
794 * discarded events message since we have the message's
795 * time range (`stream->sc->discarded_events_has_ts`).
796 */
797 stream->discarded_events_state.in_range = true;
798
799 /*
800 * The clock snapshot values will be validated when
801 * handling the next packet beginning and end messages
802 * (next calls to handle_packet_beginning_msg() and
803 * handle_packet_end_msg()).
804 */
805 cs = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
806 BT_ASSERT(cs);
807 stream->discarded_events_state.beginning_cs = bt_clock_snapshot_get_value(cs);
808 cs = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg);
809 BT_ASSERT(cs);
810 stream->discarded_events_state.end_cs = bt_clock_snapshot_get_value(cs);
811 }
812
813 avail = bt_message_discarded_events_get_count(msg, &count);
814 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
815 /*
816 * There's no specific count of discarded events: set it
817 * to 1 so that we know that we at least discarded
818 * something.
819 */
820 count = 1;
821 }
822
823 stream->packet_state.discarded_events_counter += count;
15fe47e0
PP
824
825end:
4164020e 826 return status;
15fe47e0
PP
827}
828
4164020e
SM
829static inline bt_component_class_sink_consume_method_status
830handle_discarded_packets_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 831{
4164020e
SM
832 bt_component_class_sink_consume_method_status status =
833 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
834 const bt_stream *ir_stream = bt_message_discarded_packets_borrow_stream_const(msg);
835 struct fs_sink_stream *stream;
836 const bt_clock_snapshot *cs = NULL;
837 bt_property_availability avail;
838 uint64_t count;
839
840 stream = borrow_stream(fs_sink, ir_stream);
841 if (!stream) {
842 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
843 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
844 goto end;
845 }
846
847 if (fs_sink->ignore_discarded_packets) {
848 BT_COMP_LOGI("Ignoring discarded packets message: "
849 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
850 "trace-name=\"%s\", path=\"%s/%s\"",
851 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
852 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
853 stream->trace->path->str, stream->file_name->str);
854 goto end;
855 }
856
857 if (stream->discarded_packets_state.in_range) {
858 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
859 "Unsupported contiguous discarded packets message: "
860 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
861 "trace-name=\"%s\", path=\"%s/%s\"",
862 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
863 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
864 stream->trace->path->str, stream->file_name->str);
865 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
866 goto end;
867 }
868
869 /*
870 * Discarded packets messages are guaranteed to occur between
871 * packets.
872 */
873 BT_ASSERT(!stream->packet_state.is_open);
874
875 if (stream->sc->discarded_packets_has_ts) {
876 /*
877 * Make the stream's state be in the time range of a
878 * discarded packets message since we have the message's
879 * time range (`stream->sc->discarded_packets_has_ts`).
880 */
881 stream->discarded_packets_state.in_range = true;
882
883 /*
884 * The clock snapshot values will be validated when
885 * handling the next packet beginning message (next call
886 * to handle_packet_beginning_msg()).
887 */
888 cs = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
889 BT_ASSERT(cs);
890 stream->discarded_packets_state.beginning_cs = bt_clock_snapshot_get_value(cs);
891 cs = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg);
892 BT_ASSERT(cs);
893 stream->discarded_packets_state.end_cs = bt_clock_snapshot_get_value(cs);
894 }
895
896 avail = bt_message_discarded_packets_get_count(msg, &count);
897 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
898 /*
899 * There's no specific count of discarded packets: set
900 * it to 1 so that we know that we at least discarded
901 * something.
902 */
903 count = 1;
904 }
905
906 stream->packet_state.seq_num += count;
15fe47e0
PP
907
908end:
4164020e 909 return status;
15fe47e0
PP
910}
911
4164020e 912static inline void put_messages(bt_message_array_const msgs, uint64_t count)
15fe47e0 913{
4164020e 914 uint64_t i;
15fe47e0 915
4164020e
SM
916 for (i = 0; i < count; i++) {
917 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
918 }
15fe47e0
PP
919}
920
921BT_HIDDEN
4164020e 922bt_component_class_sink_consume_method_status ctf_fs_sink_consume(bt_self_component_sink *self_comp)
15fe47e0 923{
4164020e
SM
924 bt_component_class_sink_consume_method_status status =
925 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
926 struct fs_sink_comp *fs_sink;
927 bt_message_iterator_next_status next_status;
928 uint64_t msg_count = 0;
929 bt_message_array_const msgs;
930
931 fs_sink = (fs_sink_comp *) bt_self_component_get_data(
932 bt_self_component_sink_as_self_component(self_comp));
933 BT_ASSERT_DBG(fs_sink);
934 BT_ASSERT_DBG(fs_sink->upstream_iter);
935
936 /* Consume messages */
937 next_status = bt_message_iterator_next(fs_sink->upstream_iter, &msgs, &msg_count);
938 if (next_status < 0) {
939 status = (bt_component_class_sink_consume_method_status) next_status;
940 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
941 "Failed to get next message from upstream iterator.");
942 goto end;
943 }
944
945 switch (next_status) {
946 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK:
947 {
948 uint64_t i;
949
950 for (i = 0; i < msg_count; i++) {
951 const bt_message *msg = msgs[i];
952
953 BT_ASSERT_DBG(msg);
954
955 switch (bt_message_get_type(msg)) {
956 case BT_MESSAGE_TYPE_EVENT:
957 status = handle_event_msg(fs_sink, msg);
958 break;
959 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
960 status = handle_packet_beginning_msg(fs_sink, msg);
961 break;
962 case BT_MESSAGE_TYPE_PACKET_END:
963 status = handle_packet_end_msg(fs_sink, msg);
964 break;
965 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
966 /* Ignore */
967 BT_COMP_LOGD_STR("Ignoring message iterator inactivity message.");
968 break;
969 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
970 status = handle_stream_beginning_msg(fs_sink, msg);
971 break;
972 case BT_MESSAGE_TYPE_STREAM_END:
973 status = handle_stream_end_msg(fs_sink, msg);
974 break;
975 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
976 status = handle_discarded_events_msg(fs_sink, msg);
977 break;
978 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
979 status = handle_discarded_packets_msg(fs_sink, msg);
980 break;
981 default:
982 bt_common_abort();
983 }
984
985 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
986
987 if (status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK) {
988 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
989 "Failed to handle message: "
990 "generated CTF traces could be incomplete: "
991 "output-dir-path=\"%s\"",
992 fs_sink->output_dir_path->str);
993 goto error;
994 }
995 }
996
997 break;
998 }
999 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN:
1000 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN;
1001 break;
1002 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END:
1003 /* TODO: Finalize all traces (should already be done?) */
1004 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END;
1005 break;
1006 default:
1007 break;
1008 }
1009
1010 goto end;
15fe47e0
PP
1011
1012error:
4164020e
SM
1013 BT_ASSERT(status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK);
1014 put_messages(msgs, msg_count);
15fe47e0
PP
1015
1016end:
4164020e 1017 return status;
15fe47e0
PP
1018}
1019
1020BT_HIDDEN
e803df70 1021bt_component_class_sink_graph_is_configured_method_status
4164020e 1022ctf_fs_sink_graph_is_configured(bt_self_component_sink *self_comp)
15fe47e0 1023{
4164020e
SM
1024 bt_component_class_sink_graph_is_configured_method_status status;
1025 bt_message_iterator_create_from_sink_component_status msg_iter_status;
1026 fs_sink_comp *fs_sink = (fs_sink_comp *) bt_self_component_get_data(
1027 bt_self_component_sink_as_self_component(self_comp));
1028
1029 msg_iter_status = bt_message_iterator_create_from_sink_component(
1030 self_comp, bt_self_component_sink_borrow_input_port_by_name(self_comp, in_port_name),
1031 &fs_sink->upstream_iter);
1032 if (msg_iter_status != BT_MESSAGE_ITERATOR_CREATE_FROM_SINK_COMPONENT_STATUS_OK) {
1033 status = (bt_component_class_sink_graph_is_configured_method_status) msg_iter_status;
1034 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to create upstream iterator.");
1035 goto end;
1036 }
1037
1038 status = BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK;
15fe47e0 1039end:
4164020e 1040 return status;
15fe47e0
PP
1041}
1042
1043BT_HIDDEN
1044void ctf_fs_sink_finalize(bt_self_component_sink *self_comp)
1045{
4164020e
SM
1046 fs_sink_comp *fs_sink = (fs_sink_comp *) bt_self_component_get_data(
1047 bt_self_component_sink_as_self_component(self_comp));
15fe47e0 1048
4164020e 1049 destroy_fs_sink_comp(fs_sink);
15fe47e0 1050}
This page took 0.1045 seconds and 4 git commands to generate.