sink.ctf.fs: initialize structure fields
[babeltrace.git] / src / plugins / ctf / fs-sink / fs-sink.cpp
CommitLineData
15fe47e0 1/*
0235b0db 2 * SPDX-License-Identifier: MIT
15fe47e0 3 *
0235b0db 4 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
15fe47e0
PP
5 */
6
c802cacb 7#include <glib.h>
c802cacb
SM
8#include <stdio.h>
9
10#include <babeltrace2/babeltrace.h>
11
aa1a7452 12#define BT_COMP_LOG_SELF_COMP (fs_sink->self_comp)
4164020e
SM
13#define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level)
14#define BT_LOG_TAG "PLUGIN/SINK.CTF.FS"
d9c39b0a 15#include "logging/comp-logging.h"
15fe47e0 16
578e048b
MJ
17#include "common/assert.h"
18#include "ctfser/ctfser.h"
c802cacb 19
ce67f561 20#include "plugins/common/param-validation/param-validation.h"
15fe47e0 21
087cd0f5 22#include "fs-sink-ctf-meta.hpp"
c802cacb
SM
23#include "fs-sink-stream.hpp"
24#include "fs-sink-trace.hpp"
25#include "fs-sink.hpp"
c802cacb 26#include "translate-trace-ir-to-ctf-ir.hpp"
15fe47e0 27
4164020e 28static const char * const in_port_name = "in";
15fe47e0 29
4164020e
SM
30static bt_component_class_initialize_method_status
31ensure_output_dir_exists(struct fs_sink_comp *fs_sink)
15fe47e0 32{
4164020e
SM
33 bt_component_class_initialize_method_status status =
34 BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
35 int ret;
36
37 ret = g_mkdir_with_parents(fs_sink->output_dir_path->str, 0755);
38 if (ret) {
39 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(fs_sink->self_comp,
40 "Cannot create directories for output directory",
41 ": output-dir-path=\"%s\"", fs_sink->output_dir_path->str);
42 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
43 goto end;
44 }
15fe47e0
PP
45
46end:
4164020e 47 return status;
15fe47e0
PP
48}
49
087cd0f5 50static bt_param_validation_map_value_entry_descr fs_sink_params_descr[] = {
88730e42
SM
51 {"path", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
52 bt_param_validation_value_descr::makeString()},
53 {"assume-single-trace", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
54 bt_param_validation_value_descr::makeBool()},
55 {"ignore-discarded-events", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
56 bt_param_validation_value_descr::makeBool()},
57 {"ignore-discarded-packets", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
58 bt_param_validation_value_descr::makeBool()},
59 {"quiet", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
60 bt_param_validation_value_descr::makeBool()},
4164020e
SM
61 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
62
63static bt_component_class_initialize_method_status configure_component(struct fs_sink_comp *fs_sink,
64 const bt_value *params)
15fe47e0 65{
4164020e
SM
66 bt_component_class_initialize_method_status status;
67 const bt_value *value;
68 enum bt_param_validation_status validation_status;
69 gchar *validation_error;
70
71 validation_status =
72 bt_param_validation_validate(params, fs_sink_params_descr, &validation_error);
73 if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
74 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
75 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "%s", validation_error);
76 goto end;
77 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
78 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
79 goto end;
80 }
81
82 value = bt_value_map_borrow_entry_value_const(params, "path");
83 g_string_assign(fs_sink->output_dir_path, bt_value_string_get(value));
84
85 value = bt_value_map_borrow_entry_value_const(params, "assume-single-trace");
86 if (value) {
87 fs_sink->assume_single_trace = (bool) bt_value_bool_get(value);
88 }
89
90 value = bt_value_map_borrow_entry_value_const(params, "ignore-discarded-events");
91 if (value) {
92 fs_sink->ignore_discarded_events = (bool) bt_value_bool_get(value);
93 }
94
95 value = bt_value_map_borrow_entry_value_const(params, "ignore-discarded-packets");
96 if (value) {
97 fs_sink->ignore_discarded_packets = (bool) bt_value_bool_get(value);
98 }
99
100 value = bt_value_map_borrow_entry_value_const(params, "quiet");
101 if (value) {
102 fs_sink->quiet = (bool) bt_value_bool_get(value);
103 }
104
105 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
ce67f561 106
15fe47e0 107end:
4164020e
SM
108 g_free(validation_error);
109 return status;
15fe47e0
PP
110}
111
4164020e 112static void destroy_fs_sink_comp(struct fs_sink_comp *fs_sink)
15fe47e0 113{
4164020e
SM
114 if (!fs_sink) {
115 goto end;
116 }
15fe47e0 117
4164020e
SM
118 if (fs_sink->output_dir_path) {
119 g_string_free(fs_sink->output_dir_path, TRUE);
120 fs_sink->output_dir_path = NULL;
121 }
15fe47e0 122
4164020e
SM
123 if (fs_sink->traces) {
124 g_hash_table_destroy(fs_sink->traces);
125 fs_sink->traces = NULL;
126 }
15fe47e0 127
4164020e 128 BT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(fs_sink->upstream_iter);
150640e8 129 delete fs_sink;
15fe47e0
PP
130
131end:
4164020e 132 return;
15fe47e0
PP
133}
134
5bcd9f04
SM
135bt_component_class_initialize_method_status ctf_fs_sink_init(bt_self_component_sink *self_comp_sink,
136 bt_self_component_sink_configuration *,
137 const bt_value *params, void *)
15fe47e0 138{
4164020e
SM
139 bt_component_class_initialize_method_status status;
140 bt_self_component_add_port_status add_port_status;
141 struct fs_sink_comp *fs_sink = NULL;
142 bt_self_component *self_comp = bt_self_component_sink_as_self_component(self_comp_sink);
143 bt_logging_level log_level =
144 bt_component_get_logging_level(bt_self_component_as_component(self_comp));
145
150640e8 146 fs_sink = new fs_sink_comp;
4164020e
SM
147 fs_sink->log_level = log_level;
148 fs_sink->self_comp = self_comp;
149 fs_sink->output_dir_path = g_string_new(NULL);
150 status = configure_component(fs_sink, params);
151 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
152 /* configure_component() logs errors */
153 goto end;
154 }
155
156 if (fs_sink->assume_single_trace &&
157 g_file_test(fs_sink->output_dir_path->str, G_FILE_TEST_EXISTS)) {
158 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
159 "Single trace mode, but output path exists: output-path=\"%s\"",
160 fs_sink->output_dir_path->str);
161 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
162 goto end;
163 }
164
165 status = ensure_output_dir_exists(fs_sink);
166 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
167 /* ensure_output_dir_exists() logs errors */
168 goto end;
169 }
170
171 fs_sink->traces = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL,
172 (GDestroyNotify) fs_sink_trace_destroy);
173 if (!fs_sink->traces) {
174 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate one GHashTable.");
175 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
176 goto end;
177 }
178
179 add_port_status =
180 bt_self_component_sink_add_input_port(self_comp_sink, in_port_name, NULL, NULL);
181 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
182 status = (bt_component_class_initialize_method_status) add_port_status;
183 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to add input port.");
184 goto end;
185 }
186
187 bt_self_component_set_data(self_comp, fs_sink);
15fe47e0
PP
188
189end:
4164020e
SM
190 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
191 destroy_fs_sink_comp(fs_sink);
192 }
15fe47e0 193
4164020e 194 return status;
15fe47e0
PP
195}
196
4164020e
SM
197static inline struct fs_sink_stream *borrow_stream(struct fs_sink_comp *fs_sink,
198 const bt_stream *ir_stream)
15fe47e0 199{
4164020e
SM
200 const bt_trace *ir_trace = bt_stream_borrow_trace_const(ir_stream);
201 struct fs_sink_trace *trace;
202 struct fs_sink_stream *stream = NULL;
203
204 trace = (fs_sink_trace *) g_hash_table_lookup(fs_sink->traces, ir_trace);
205 if (G_UNLIKELY(!trace)) {
206 if (fs_sink->assume_single_trace && g_hash_table_size(fs_sink->traces) > 0) {
207 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
208 "Single trace mode, but getting more than one trace: "
209 "stream-name=\"%s\"",
210 bt_stream_get_name(ir_stream));
211 goto end;
212 }
213
214 trace = fs_sink_trace_create(fs_sink, ir_trace);
215 if (!trace) {
216 goto end;
217 }
218 }
219
220 stream = (fs_sink_stream *) g_hash_table_lookup(trace->streams, ir_stream);
221 if (G_UNLIKELY(!stream)) {
222 stream = fs_sink_stream_create(trace, ir_stream);
223 if (!stream) {
224 goto end;
225 }
226 }
15fe47e0
PP
227
228end:
4164020e 229 return stream;
15fe47e0
PP
230}
231
4164020e
SM
232static inline bt_component_class_sink_consume_method_status
233handle_event_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 234{
4164020e
SM
235 int ret;
236 bt_component_class_sink_consume_method_status status =
237 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
238 const bt_event *ir_event = bt_message_event_borrow_event_const(msg);
239 const bt_stream *ir_stream = bt_event_borrow_stream_const(ir_event);
240 struct fs_sink_stream *stream;
241 struct fs_sink_ctf_event_class *ec = NULL;
242 const bt_clock_snapshot *cs = NULL;
243
244 stream = borrow_stream(fs_sink, ir_stream);
245 if (G_UNLIKELY(!stream)) {
246 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
247 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
248 goto end;
249 }
250
251 ret = try_translate_event_class_trace_ir_to_ctf_ir(fs_sink, stream->sc,
252 bt_event_borrow_class_const(ir_event), &ec);
253 if (ret) {
254 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to translate event class to CTF IR.");
255 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
256 goto end;
257 }
258
259 BT_ASSERT_DBG(ec);
260
261 if (stream->sc->default_clock_class) {
262 cs = bt_message_event_borrow_default_clock_snapshot_const(msg);
263 }
264
265 /*
266 * If this event's stream does not support packets, then we
267 * lazily create artificial packets.
268 *
269 * The size of an artificial packet is arbitrarily at least
270 * 4 MiB (it usually is greater because we close it when
271 * comes the time to write a new event and the packet's content
272 * size is >= 4 MiB), except the last one which can be smaller.
273 */
274 if (G_UNLIKELY(!stream->sc->has_packets)) {
275 if (stream->packet_state.is_open &&
276 bt_ctfser_get_offset_in_current_packet_bits(&stream->ctfser) / 8 >= 4 * 1024 * 1024) {
277 /*
278 * Stream's current packet is larger than 4 MiB:
279 * close it. A new packet will be opened just
280 * below.
281 */
282 ret = fs_sink_stream_close_packet(stream, NULL);
283 if (ret) {
284 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to close packet.");
285 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
286 goto end;
287 }
288 }
289
290 if (!stream->packet_state.is_open) {
291 /* Stream's packet is not currently opened: open it */
292 ret = fs_sink_stream_open_packet(stream, NULL, NULL);
293 if (ret) {
294 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to open packet.");
295 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
296 goto end;
297 }
298 }
299 }
300
301 BT_ASSERT_DBG(stream->packet_state.is_open);
302 ret = fs_sink_stream_write_event(stream, cs, ir_event, ec);
303 if (G_UNLIKELY(ret)) {
304 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to write event.");
305 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
306 goto end;
307 }
15fe47e0
PP
308
309end:
4164020e 310 return status;
15fe47e0
PP
311}
312
4164020e
SM
313static inline bt_component_class_sink_consume_method_status
314handle_packet_beginning_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 315{
4164020e
SM
316 int ret;
317 bt_component_class_sink_consume_method_status status =
318 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
319 const bt_packet *ir_packet = bt_message_packet_beginning_borrow_packet_const(msg);
320 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
321 struct fs_sink_stream *stream;
322 const bt_clock_snapshot *cs = NULL;
323
324 stream = borrow_stream(fs_sink, ir_stream);
325 if (G_UNLIKELY(!stream)) {
326 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
327 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
328 goto end;
329 }
330
331 if (stream->sc->packets_have_ts_begin) {
332 cs = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
333 BT_ASSERT(cs);
334 }
335
336 /*
337 * If we previously received a discarded events message with
338 * a time range, make sure that its beginning time matches what's
339 * expected for CTF 1.8, that is:
340 *
341 * * Its beginning time is the previous packet's end
342 * time (or the current packet's beginning time if
343 * this is the first packet).
344 *
345 * We check this here instead of in handle_packet_end_msg()
346 * because we want to catch any incompatible message as early as
347 * possible to report the error.
348 *
349 * Validation of the discarded events message's end time is
350 * performed in handle_packet_end_msg().
351 */
352 if (stream->discarded_events_state.in_range) {
353 uint64_t expected_cs;
354
355 /*
356 * `stream->discarded_events_state.in_range` is only set
357 * when the stream class's discarded events have a time
358 * range.
359 *
360 * It is required that the packet beginning and end
361 * messages for this stream class have times when
362 * discarded events have a time range.
363 */
364 BT_ASSERT(stream->sc->discarded_events_has_ts);
365 BT_ASSERT(stream->sc->packets_have_ts_begin);
366 BT_ASSERT(stream->sc->packets_have_ts_end);
367
368 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
369 /* We're opening the first packet */
370 expected_cs = bt_clock_snapshot_get_value(cs);
371 } else {
372 expected_cs = stream->prev_packet_state.end_cs;
373 }
374
375 if (stream->discarded_events_state.beginning_cs != expected_cs) {
376 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
377 "Incompatible discarded events message: "
378 "unexpected beginning time: "
379 "beginning-cs-val=%" PRIu64 ", "
380 "expected-beginning-cs-val=%" PRIu64 ", "
381 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
382 "trace-name=\"%s\", path=\"%s/%s\"",
383 stream->discarded_events_state.beginning_cs, expected_cs,
384 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
385 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
386 stream->trace->path->str, stream->file_name->str);
387 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
388 goto end;
389 }
390 }
391
392 /*
393 * If we previously received a discarded packets message with a
394 * time range, make sure that its beginning and end times match
395 * what's expected for CTF 1.8, that is:
396 *
397 * * Its beginning time is the previous packet's end time.
398 *
399 * * Its end time is the current packet's beginning time.
400 */
401 if (stream->discarded_packets_state.in_range) {
402 uint64_t expected_end_cs;
403
404 /*
405 * `stream->discarded_packets_state.in_range` is only
406 * set when the stream class's discarded packets have a
407 * time range.
408 *
409 * It is required that the packet beginning and end
410 * messages for this stream class have times when
411 * discarded packets have a time range.
412 */
413 BT_ASSERT(stream->sc->discarded_packets_has_ts);
414 BT_ASSERT(stream->sc->packets_have_ts_begin);
415 BT_ASSERT(stream->sc->packets_have_ts_end);
416
417 /*
418 * It is not supported to have a discarded packets
419 * message _before_ the first packet: we cannot validate
420 * that its beginning time is compatible with CTF 1.8 in
421 * this case.
422 */
423 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
424 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
425 "Incompatible discarded packets message "
426 "occurring before the stream's first packet: "
427 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
428 "trace-name=\"%s\", path=\"%s/%s\"",
429 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
430 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
431 stream->trace->path->str, stream->file_name->str);
432 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
433 goto end;
434 }
435
436 if (stream->discarded_packets_state.beginning_cs != stream->prev_packet_state.end_cs) {
437 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
438 "Incompatible discarded packets message: "
439 "unexpected beginning time: "
440 "beginning-cs-val=%" PRIu64 ", "
441 "expected-beginning-cs-val=%" PRIu64 ", "
442 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
443 "trace-name=\"%s\", path=\"%s/%s\"",
444 stream->discarded_packets_state.beginning_cs,
445 stream->prev_packet_state.end_cs, bt_stream_get_id(ir_stream),
446 bt_stream_get_name(ir_stream),
447 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
448 stream->trace->path->str, stream->file_name->str);
449 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
450 goto end;
451 }
452
453 expected_end_cs = bt_clock_snapshot_get_value(cs);
454
455 if (stream->discarded_packets_state.end_cs != expected_end_cs) {
456 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
457 "Incompatible discarded packets message: "
458 "unexpected end time: "
459 "end-cs-val=%" PRIu64 ", "
460 "expected-end-cs-val=%" PRIu64 ", "
461 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
462 "trace-name=\"%s\", path=\"%s/%s\"",
463 stream->discarded_packets_state.end_cs, expected_end_cs,
464 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
465 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
466 stream->trace->path->str, stream->file_name->str);
467 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
468 goto end;
469 }
470 }
471
472 /*
473 * We're not in a discarded packets time range anymore since we
474 * require that the discarded packets time ranges go from one
475 * packet's end time to the next packet's beginning time, and
476 * we're handling a packet beginning message here.
477 */
478 stream->discarded_packets_state.in_range = false;
479
480 ret = fs_sink_stream_open_packet(stream, cs, ir_packet);
481 if (ret) {
482 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to open packet.");
483 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
484 goto end;
485 }
15fe47e0
PP
486
487end:
4164020e 488 return status;
15fe47e0
PP
489}
490
4164020e
SM
491static inline bt_component_class_sink_consume_method_status
492handle_packet_end_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 493{
4164020e
SM
494 int ret;
495 bt_component_class_sink_consume_method_status status =
496 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
497 const bt_packet *ir_packet = bt_message_packet_end_borrow_packet_const(msg);
498 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
499 struct fs_sink_stream *stream;
500 const bt_clock_snapshot *cs = NULL;
501
502 stream = borrow_stream(fs_sink, ir_stream);
503 if (G_UNLIKELY(!stream)) {
504 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
505 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
506 goto end;
507 }
508
509 if (stream->sc->packets_have_ts_end) {
510 cs = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
511 BT_ASSERT(cs);
512 }
513
514 /*
515 * If we previously received a discarded events message with
516 * a time range, make sure that its end time matches what's
517 * expected for CTF 1.8, that is:
518 *
519 * * Its end time is the current packet's end time.
520 *
521 * Validation of the discarded events message's beginning time
522 * is performed in handle_packet_beginning_msg().
523 */
524 if (stream->discarded_events_state.in_range) {
525 uint64_t expected_cs;
526
527 /*
528 * `stream->discarded_events_state.in_range` is only set
529 * when the stream class's discarded events have a time
530 * range.
531 *
532 * It is required that the packet beginning and end
533 * messages for this stream class have times when
534 * discarded events have a time range.
535 */
536 BT_ASSERT(stream->sc->discarded_events_has_ts);
537 BT_ASSERT(stream->sc->packets_have_ts_begin);
538 BT_ASSERT(stream->sc->packets_have_ts_end);
539
540 expected_cs = bt_clock_snapshot_get_value(cs);
541
542 if (stream->discarded_events_state.end_cs != expected_cs) {
543 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
544 "Incompatible discarded events message: "
545 "unexpected end time: "
546 "end-cs-val=%" PRIu64 ", "
547 "expected-end-cs-val=%" PRIu64 ", "
548 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
549 "trace-name=\"%s\", path=\"%s/%s\"",
550 stream->discarded_events_state.end_cs, expected_cs,
551 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
552 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
553 stream->trace->path->str, stream->file_name->str);
554 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
555 goto end;
556 }
557 }
558
559 ret = fs_sink_stream_close_packet(stream, cs);
560 if (ret) {
561 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to close packet.");
562 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
563 goto end;
564 }
565
566 /*
567 * We're not in a discarded events time range anymore since we
568 * require that the discarded events time ranges go from one
569 * packet's end time to the next packet's end time, and we're
570 * handling a packet end message here.
571 */
572 stream->discarded_events_state.in_range = false;
15fe47e0
PP
573
574end:
4164020e 575 return status;
15fe47e0
PP
576}
577
4164020e
SM
578static inline bt_component_class_sink_consume_method_status
579handle_stream_beginning_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 580{
4164020e
SM
581 bt_component_class_sink_consume_method_status status =
582 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
583 const bt_stream *ir_stream = bt_message_stream_beginning_borrow_stream_const(msg);
584 const bt_stream_class *ir_sc = bt_stream_borrow_class_const(ir_stream);
585 struct fs_sink_stream *stream;
586 bool packets_have_beginning_end_cs =
587 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc) &&
588 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc);
589
590 /*
591 * Not supported: discarded events or discarded packets support
592 * without packets support. Packets are the way to know where
593 * discarded events/packets occurred in CTF 1.8.
594 */
595 if (!bt_stream_class_supports_packets(ir_sc)) {
596 BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc));
597
598 if (!fs_sink->ignore_discarded_events && bt_stream_class_supports_discarded_events(ir_sc)) {
599 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
600 "Unsupported stream: "
601 "stream does not support packets, "
602 "but supports discarded events: "
603 "stream-addr=%p, "
604 "stream-id=%" PRIu64 ", "
605 "stream-name=\"%s\"",
606 ir_stream, bt_stream_get_id(ir_stream),
607 bt_stream_get_name(ir_stream));
608 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
609 goto end;
610 }
611 }
612
613 /*
614 * Not supported: discarded events with default clock snapshots,
615 * but packet beginning/end without default clock snapshot.
616 */
617 if (!fs_sink->ignore_discarded_events &&
618 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc) &&
619 !packets_have_beginning_end_cs) {
620 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
621 "Unsupported stream: discarded events have "
622 "default clock snapshots, but packets have no "
623 "beginning and/or end default clock snapshots: "
624 "stream-addr=%p, "
625 "stream-id=%" PRIu64 ", "
626 "stream-name=\"%s\"",
627 ir_stream, bt_stream_get_id(ir_stream),
628 bt_stream_get_name(ir_stream));
629 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
630 goto end;
631 }
632
633 /*
634 * Not supported: discarded packets with default clock
635 * snapshots, but packet beginning/end without default clock
636 * snapshot.
637 */
638 if (!fs_sink->ignore_discarded_packets &&
639 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc) &&
640 !packets_have_beginning_end_cs) {
641 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
642 "Unsupported stream: discarded packets have "
643 "default clock snapshots, but packets have no "
644 "beginning and/or end default clock snapshots: "
645 "stream-addr=%p, "
646 "stream-id=%" PRIu64 ", "
647 "stream-name=\"%s\"",
648 ir_stream, bt_stream_get_id(ir_stream),
649 bt_stream_get_name(ir_stream));
650 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
651 goto end;
652 }
653
654 stream = borrow_stream(fs_sink, ir_stream);
655 if (!stream) {
656 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
657 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
658 goto end;
659 }
660
661 BT_COMP_LOGI("Created new, empty stream file: "
662 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
663 "trace-name=\"%s\", path=\"%s/%s\"",
664 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
665 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
666 stream->trace->path->str, stream->file_name->str);
15fe47e0
PP
667
668end:
4164020e 669 return status;
15fe47e0
PP
670}
671
4164020e
SM
672static inline bt_component_class_sink_consume_method_status
673handle_stream_end_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 674{
4164020e
SM
675 bt_component_class_sink_consume_method_status status =
676 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
677 const bt_stream *ir_stream = bt_message_stream_end_borrow_stream_const(msg);
678 struct fs_sink_stream *stream;
679
680 stream = borrow_stream(fs_sink, ir_stream);
681 if (!stream) {
682 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
683 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
684 goto end;
685 }
686
687 if (G_UNLIKELY(!stream->sc->has_packets && stream->packet_state.is_open)) {
688 /* Close stream's current artificial packet */
689 int ret = fs_sink_stream_close_packet(stream, NULL);
690
691 if (ret) {
692 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to close packet.");
693 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
694 goto end;
695 }
696 }
697
698 BT_COMP_LOGI("Closing stream file: "
699 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
700 "trace-name=\"%s\", path=\"%s/%s\"",
701 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
702 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
703 stream->trace->path->str, stream->file_name->str);
704
705 /*
706 * This destroys the stream object and frees all its resources,
707 * closing the stream file.
708 */
709 g_hash_table_remove(stream->trace->streams, ir_stream);
15fe47e0
PP
710
711end:
4164020e 712 return status;
15fe47e0
PP
713}
714
4164020e
SM
715static inline bt_component_class_sink_consume_method_status
716handle_discarded_events_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 717{
4164020e
SM
718 bt_component_class_sink_consume_method_status status =
719 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
720 const bt_stream *ir_stream = bt_message_discarded_events_borrow_stream_const(msg);
721 struct fs_sink_stream *stream;
722 const bt_clock_snapshot *cs = NULL;
723 bt_property_availability avail;
724 uint64_t count;
725
726 stream = borrow_stream(fs_sink, ir_stream);
727 if (!stream) {
728 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
729 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
730 goto end;
731 }
732
733 if (fs_sink->ignore_discarded_events) {
734 BT_COMP_LOGI("Ignoring discarded events message: "
735 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
736 "trace-name=\"%s\", path=\"%s/%s\"",
737 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
738 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
739 stream->trace->path->str, stream->file_name->str);
740 goto end;
741 }
742
743 if (stream->discarded_events_state.in_range) {
744 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
745 "Unsupported contiguous discarded events message: "
746 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
747 "trace-name=\"%s\", path=\"%s/%s\"",
748 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
749 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
750 stream->trace->path->str, stream->file_name->str);
751 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
752 goto end;
753 }
754
755 /*
756 * If we're currently in an opened packet (got a packet
757 * beginning message, but no packet end message yet), we do not
758 * support having a discarded events message with a time range
759 * because we require that the discarded events message's time
760 * range go from a packet's end time to the next packet's end
761 * time.
762 */
763 if (stream->packet_state.is_open && stream->sc->discarded_events_has_ts) {
764 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
765 "Unsupported discarded events message with "
766 "default clock snapshots occurring within a packet: "
767 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
768 "trace-name=\"%s\", path=\"%s/%s\"",
769 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
770 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
771 stream->trace->path->str, stream->file_name->str);
772 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
773 goto end;
774 }
775
776 if (stream->sc->discarded_events_has_ts) {
777 /*
778 * Make the stream's state be in the time range of a
779 * discarded events message since we have the message's
780 * time range (`stream->sc->discarded_events_has_ts`).
781 */
782 stream->discarded_events_state.in_range = true;
783
784 /*
785 * The clock snapshot values will be validated when
786 * handling the next packet beginning and end messages
787 * (next calls to handle_packet_beginning_msg() and
788 * handle_packet_end_msg()).
789 */
790 cs = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
791 BT_ASSERT(cs);
792 stream->discarded_events_state.beginning_cs = bt_clock_snapshot_get_value(cs);
793 cs = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg);
794 BT_ASSERT(cs);
795 stream->discarded_events_state.end_cs = bt_clock_snapshot_get_value(cs);
796 }
797
798 avail = bt_message_discarded_events_get_count(msg, &count);
799 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
800 /*
801 * There's no specific count of discarded events: set it
802 * to 1 so that we know that we at least discarded
803 * something.
804 */
805 count = 1;
806 }
807
808 stream->packet_state.discarded_events_counter += count;
15fe47e0
PP
809
810end:
4164020e 811 return status;
15fe47e0
PP
812}
813
4164020e
SM
814static inline bt_component_class_sink_consume_method_status
815handle_discarded_packets_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 816{
4164020e
SM
817 bt_component_class_sink_consume_method_status status =
818 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
819 const bt_stream *ir_stream = bt_message_discarded_packets_borrow_stream_const(msg);
820 struct fs_sink_stream *stream;
821 const bt_clock_snapshot *cs = NULL;
822 bt_property_availability avail;
823 uint64_t count;
824
825 stream = borrow_stream(fs_sink, ir_stream);
826 if (!stream) {
827 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
828 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
829 goto end;
830 }
831
832 if (fs_sink->ignore_discarded_packets) {
833 BT_COMP_LOGI("Ignoring discarded packets message: "
834 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
835 "trace-name=\"%s\", path=\"%s/%s\"",
836 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
837 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
838 stream->trace->path->str, stream->file_name->str);
839 goto end;
840 }
841
842 if (stream->discarded_packets_state.in_range) {
843 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
844 "Unsupported contiguous discarded packets message: "
845 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
846 "trace-name=\"%s\", path=\"%s/%s\"",
847 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
848 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
849 stream->trace->path->str, stream->file_name->str);
850 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
851 goto end;
852 }
853
854 /*
855 * Discarded packets messages are guaranteed to occur between
856 * packets.
857 */
858 BT_ASSERT(!stream->packet_state.is_open);
859
860 if (stream->sc->discarded_packets_has_ts) {
861 /*
862 * Make the stream's state be in the time range of a
863 * discarded packets message since we have the message's
864 * time range (`stream->sc->discarded_packets_has_ts`).
865 */
866 stream->discarded_packets_state.in_range = true;
867
868 /*
869 * The clock snapshot values will be validated when
870 * handling the next packet beginning message (next call
871 * to handle_packet_beginning_msg()).
872 */
873 cs = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
874 BT_ASSERT(cs);
875 stream->discarded_packets_state.beginning_cs = bt_clock_snapshot_get_value(cs);
876 cs = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg);
877 BT_ASSERT(cs);
878 stream->discarded_packets_state.end_cs = bt_clock_snapshot_get_value(cs);
879 }
880
881 avail = bt_message_discarded_packets_get_count(msg, &count);
882 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
883 /*
884 * There's no specific count of discarded packets: set
885 * it to 1 so that we know that we at least discarded
886 * something.
887 */
888 count = 1;
889 }
890
891 stream->packet_state.seq_num += count;
15fe47e0
PP
892
893end:
4164020e 894 return status;
15fe47e0
PP
895}
896
4164020e 897static inline void put_messages(bt_message_array_const msgs, uint64_t count)
15fe47e0 898{
4164020e 899 uint64_t i;
15fe47e0 900
4164020e
SM
901 for (i = 0; i < count; i++) {
902 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
903 }
15fe47e0
PP
904}
905
4164020e 906bt_component_class_sink_consume_method_status ctf_fs_sink_consume(bt_self_component_sink *self_comp)
15fe47e0 907{
4164020e
SM
908 bt_component_class_sink_consume_method_status status =
909 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
910 struct fs_sink_comp *fs_sink;
911 bt_message_iterator_next_status next_status;
912 uint64_t msg_count = 0;
913 bt_message_array_const msgs;
914
915 fs_sink = (fs_sink_comp *) bt_self_component_get_data(
916 bt_self_component_sink_as_self_component(self_comp));
917 BT_ASSERT_DBG(fs_sink);
918 BT_ASSERT_DBG(fs_sink->upstream_iter);
919
920 /* Consume messages */
921 next_status = bt_message_iterator_next(fs_sink->upstream_iter, &msgs, &msg_count);
922 if (next_status < 0) {
923 status = (bt_component_class_sink_consume_method_status) next_status;
924 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
925 "Failed to get next message from upstream iterator.");
926 goto end;
927 }
928
929 switch (next_status) {
930 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK:
931 {
932 uint64_t i;
933
934 for (i = 0; i < msg_count; i++) {
935 const bt_message *msg = msgs[i];
936
937 BT_ASSERT_DBG(msg);
938
939 switch (bt_message_get_type(msg)) {
940 case BT_MESSAGE_TYPE_EVENT:
941 status = handle_event_msg(fs_sink, msg);
942 break;
943 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
944 status = handle_packet_beginning_msg(fs_sink, msg);
945 break;
946 case BT_MESSAGE_TYPE_PACKET_END:
947 status = handle_packet_end_msg(fs_sink, msg);
948 break;
949 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
950 /* Ignore */
951 BT_COMP_LOGD_STR("Ignoring message iterator inactivity message.");
952 break;
953 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
954 status = handle_stream_beginning_msg(fs_sink, msg);
955 break;
956 case BT_MESSAGE_TYPE_STREAM_END:
957 status = handle_stream_end_msg(fs_sink, msg);
958 break;
959 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
960 status = handle_discarded_events_msg(fs_sink, msg);
961 break;
962 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
963 status = handle_discarded_packets_msg(fs_sink, msg);
964 break;
965 default:
966 bt_common_abort();
967 }
968
969 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
970
971 if (status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK) {
972 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
973 "Failed to handle message: "
974 "generated CTF traces could be incomplete: "
975 "output-dir-path=\"%s\"",
976 fs_sink->output_dir_path->str);
977 goto error;
978 }
979 }
980
981 break;
982 }
983 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN:
984 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN;
985 break;
986 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END:
987 /* TODO: Finalize all traces (should already be done?) */
988 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END;
989 break;
990 default:
991 break;
992 }
993
994 goto end;
15fe47e0
PP
995
996error:
4164020e
SM
997 BT_ASSERT(status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK);
998 put_messages(msgs, msg_count);
15fe47e0
PP
999
1000end:
4164020e 1001 return status;
15fe47e0
PP
1002}
1003
e803df70 1004bt_component_class_sink_graph_is_configured_method_status
4164020e 1005ctf_fs_sink_graph_is_configured(bt_self_component_sink *self_comp)
15fe47e0 1006{
4164020e
SM
1007 bt_component_class_sink_graph_is_configured_method_status status;
1008 bt_message_iterator_create_from_sink_component_status msg_iter_status;
1009 fs_sink_comp *fs_sink = (fs_sink_comp *) bt_self_component_get_data(
1010 bt_self_component_sink_as_self_component(self_comp));
1011
1012 msg_iter_status = bt_message_iterator_create_from_sink_component(
1013 self_comp, bt_self_component_sink_borrow_input_port_by_name(self_comp, in_port_name),
1014 &fs_sink->upstream_iter);
1015 if (msg_iter_status != BT_MESSAGE_ITERATOR_CREATE_FROM_SINK_COMPONENT_STATUS_OK) {
1016 status = (bt_component_class_sink_graph_is_configured_method_status) msg_iter_status;
1017 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to create upstream iterator.");
1018 goto end;
1019 }
1020
1021 status = BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK;
15fe47e0 1022end:
4164020e 1023 return status;
15fe47e0
PP
1024}
1025
15fe47e0
PP
1026void ctf_fs_sink_finalize(bt_self_component_sink *self_comp)
1027{
4164020e
SM
1028 fs_sink_comp *fs_sink = (fs_sink_comp *) bt_self_component_get_data(
1029 bt_self_component_sink_as_self_component(self_comp));
15fe47e0 1030
4164020e 1031 destroy_fs_sink_comp(fs_sink);
15fe47e0 1032}
This page took 0.124834 seconds and 4 git commands to generate.