Visibility hidden by default
[babeltrace.git] / src / plugins / ctf / fs-sink / fs-sink.cpp
CommitLineData
15fe47e0 1/*
0235b0db 2 * SPDX-License-Identifier: MIT
15fe47e0 3 *
0235b0db 4 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
15fe47e0
PP
5 */
6
aa1a7452 7#define BT_COMP_LOG_SELF_COMP (fs_sink->self_comp)
4164020e
SM
8#define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level)
9#define BT_LOG_TAG "PLUGIN/SINK.CTF.FS"
d9c39b0a 10#include "logging/comp-logging.h"
15fe47e0 11
3fadfbc0 12#include <babeltrace2/babeltrace.h>
15fe47e0
PP
13#include <stdio.h>
14#include <stdbool.h>
15#include <glib.h>
578e048b
MJ
16#include "common/assert.h"
17#include "ctfser/ctfser.h"
ce67f561 18#include "plugins/common/param-validation/param-validation.h"
15fe47e0 19
087cd0f5
SM
20#include "fs-sink.hpp"
21#include "fs-sink-trace.hpp"
22#include "fs-sink-stream.hpp"
23#include "fs-sink-ctf-meta.hpp"
24#include "translate-trace-ir-to-ctf-ir.hpp"
25#include "translate-ctf-ir-to-tsdl.hpp"
15fe47e0 26
4164020e 27static const char * const in_port_name = "in";
15fe47e0 28
4164020e
SM
29static bt_component_class_initialize_method_status
30ensure_output_dir_exists(struct fs_sink_comp *fs_sink)
15fe47e0 31{
4164020e
SM
32 bt_component_class_initialize_method_status status =
33 BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
34 int ret;
35
36 ret = g_mkdir_with_parents(fs_sink->output_dir_path->str, 0755);
37 if (ret) {
38 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(fs_sink->self_comp,
39 "Cannot create directories for output directory",
40 ": output-dir-path=\"%s\"", fs_sink->output_dir_path->str);
41 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
42 goto end;
43 }
15fe47e0
PP
44
45end:
4164020e 46 return status;
15fe47e0
PP
47}
48
087cd0f5 49static bt_param_validation_map_value_entry_descr fs_sink_params_descr[] = {
88730e42
SM
50 {"path", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
51 bt_param_validation_value_descr::makeString()},
52 {"assume-single-trace", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
53 bt_param_validation_value_descr::makeBool()},
54 {"ignore-discarded-events", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
55 bt_param_validation_value_descr::makeBool()},
56 {"ignore-discarded-packets", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
57 bt_param_validation_value_descr::makeBool()},
58 {"quiet", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
59 bt_param_validation_value_descr::makeBool()},
4164020e
SM
60 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
61
62static bt_component_class_initialize_method_status configure_component(struct fs_sink_comp *fs_sink,
63 const bt_value *params)
15fe47e0 64{
4164020e
SM
65 bt_component_class_initialize_method_status status;
66 const bt_value *value;
67 enum bt_param_validation_status validation_status;
68 gchar *validation_error;
69
70 validation_status =
71 bt_param_validation_validate(params, fs_sink_params_descr, &validation_error);
72 if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
73 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
74 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "%s", validation_error);
75 goto end;
76 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
77 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
78 goto end;
79 }
80
81 value = bt_value_map_borrow_entry_value_const(params, "path");
82 g_string_assign(fs_sink->output_dir_path, bt_value_string_get(value));
83
84 value = bt_value_map_borrow_entry_value_const(params, "assume-single-trace");
85 if (value) {
86 fs_sink->assume_single_trace = (bool) bt_value_bool_get(value);
87 }
88
89 value = bt_value_map_borrow_entry_value_const(params, "ignore-discarded-events");
90 if (value) {
91 fs_sink->ignore_discarded_events = (bool) bt_value_bool_get(value);
92 }
93
94 value = bt_value_map_borrow_entry_value_const(params, "ignore-discarded-packets");
95 if (value) {
96 fs_sink->ignore_discarded_packets = (bool) bt_value_bool_get(value);
97 }
98
99 value = bt_value_map_borrow_entry_value_const(params, "quiet");
100 if (value) {
101 fs_sink->quiet = (bool) bt_value_bool_get(value);
102 }
103
104 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
ce67f561 105
15fe47e0 106end:
4164020e
SM
107 g_free(validation_error);
108 return status;
15fe47e0
PP
109}
110
4164020e 111static void destroy_fs_sink_comp(struct fs_sink_comp *fs_sink)
15fe47e0 112{
4164020e
SM
113 if (!fs_sink) {
114 goto end;
115 }
15fe47e0 116
4164020e
SM
117 if (fs_sink->output_dir_path) {
118 g_string_free(fs_sink->output_dir_path, TRUE);
119 fs_sink->output_dir_path = NULL;
120 }
15fe47e0 121
4164020e
SM
122 if (fs_sink->traces) {
123 g_hash_table_destroy(fs_sink->traces);
124 fs_sink->traces = NULL;
125 }
15fe47e0 126
4164020e
SM
127 BT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(fs_sink->upstream_iter);
128 g_free(fs_sink);
15fe47e0
PP
129
130end:
4164020e 131 return;
15fe47e0
PP
132}
133
4164020e
SM
134bt_component_class_initialize_method_status
135ctf_fs_sink_init(bt_self_component_sink *self_comp_sink,
136 bt_self_component_sink_configuration *config, const bt_value *params,
137 void *init_method_data)
15fe47e0 138{
4164020e
SM
139 bt_component_class_initialize_method_status status;
140 bt_self_component_add_port_status add_port_status;
141 struct fs_sink_comp *fs_sink = NULL;
142 bt_self_component *self_comp = bt_self_component_sink_as_self_component(self_comp_sink);
143 bt_logging_level log_level =
144 bt_component_get_logging_level(bt_self_component_as_component(self_comp));
145
146 fs_sink = g_new0(struct fs_sink_comp, 1);
147 if (!fs_sink) {
148 BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR, log_level, self_comp,
149 "Failed to allocate one CTF FS sink structure.");
150 BT_CURRENT_THREAD_ERROR_APPEND_CAUSE_FROM_COMPONENT(
151 self_comp, "Failed to allocate one CTF FS sink structure.");
152 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
153 goto end;
154 }
155
156 fs_sink->log_level = log_level;
157 fs_sink->self_comp = self_comp;
158 fs_sink->output_dir_path = g_string_new(NULL);
159 status = configure_component(fs_sink, params);
160 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
161 /* configure_component() logs errors */
162 goto end;
163 }
164
165 if (fs_sink->assume_single_trace &&
166 g_file_test(fs_sink->output_dir_path->str, G_FILE_TEST_EXISTS)) {
167 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
168 "Single trace mode, but output path exists: output-path=\"%s\"",
169 fs_sink->output_dir_path->str);
170 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
171 goto end;
172 }
173
174 status = ensure_output_dir_exists(fs_sink);
175 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
176 /* ensure_output_dir_exists() logs errors */
177 goto end;
178 }
179
180 fs_sink->traces = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL,
181 (GDestroyNotify) fs_sink_trace_destroy);
182 if (!fs_sink->traces) {
183 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate one GHashTable.");
184 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
185 goto end;
186 }
187
188 add_port_status =
189 bt_self_component_sink_add_input_port(self_comp_sink, in_port_name, NULL, NULL);
190 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
191 status = (bt_component_class_initialize_method_status) add_port_status;
192 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to add input port.");
193 goto end;
194 }
195
196 bt_self_component_set_data(self_comp, fs_sink);
15fe47e0
PP
197
198end:
4164020e
SM
199 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
200 destroy_fs_sink_comp(fs_sink);
201 }
15fe47e0 202
4164020e 203 return status;
15fe47e0
PP
204}
205
4164020e
SM
206static inline struct fs_sink_stream *borrow_stream(struct fs_sink_comp *fs_sink,
207 const bt_stream *ir_stream)
15fe47e0 208{
4164020e
SM
209 const bt_trace *ir_trace = bt_stream_borrow_trace_const(ir_stream);
210 struct fs_sink_trace *trace;
211 struct fs_sink_stream *stream = NULL;
212
213 trace = (fs_sink_trace *) g_hash_table_lookup(fs_sink->traces, ir_trace);
214 if (G_UNLIKELY(!trace)) {
215 if (fs_sink->assume_single_trace && g_hash_table_size(fs_sink->traces) > 0) {
216 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
217 "Single trace mode, but getting more than one trace: "
218 "stream-name=\"%s\"",
219 bt_stream_get_name(ir_stream));
220 goto end;
221 }
222
223 trace = fs_sink_trace_create(fs_sink, ir_trace);
224 if (!trace) {
225 goto end;
226 }
227 }
228
229 stream = (fs_sink_stream *) g_hash_table_lookup(trace->streams, ir_stream);
230 if (G_UNLIKELY(!stream)) {
231 stream = fs_sink_stream_create(trace, ir_stream);
232 if (!stream) {
233 goto end;
234 }
235 }
15fe47e0
PP
236
237end:
4164020e 238 return stream;
15fe47e0
PP
239}
240
4164020e
SM
241static inline bt_component_class_sink_consume_method_status
242handle_event_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 243{
4164020e
SM
244 int ret;
245 bt_component_class_sink_consume_method_status status =
246 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
247 const bt_event *ir_event = bt_message_event_borrow_event_const(msg);
248 const bt_stream *ir_stream = bt_event_borrow_stream_const(ir_event);
249 struct fs_sink_stream *stream;
250 struct fs_sink_ctf_event_class *ec = NULL;
251 const bt_clock_snapshot *cs = NULL;
252
253 stream = borrow_stream(fs_sink, ir_stream);
254 if (G_UNLIKELY(!stream)) {
255 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
256 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
257 goto end;
258 }
259
260 ret = try_translate_event_class_trace_ir_to_ctf_ir(fs_sink, stream->sc,
261 bt_event_borrow_class_const(ir_event), &ec);
262 if (ret) {
263 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to translate event class to CTF IR.");
264 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
265 goto end;
266 }
267
268 BT_ASSERT_DBG(ec);
269
270 if (stream->sc->default_clock_class) {
271 cs = bt_message_event_borrow_default_clock_snapshot_const(msg);
272 }
273
274 /*
275 * If this event's stream does not support packets, then we
276 * lazily create artificial packets.
277 *
278 * The size of an artificial packet is arbitrarily at least
279 * 4 MiB (it usually is greater because we close it when
280 * comes the time to write a new event and the packet's content
281 * size is >= 4 MiB), except the last one which can be smaller.
282 */
283 if (G_UNLIKELY(!stream->sc->has_packets)) {
284 if (stream->packet_state.is_open &&
285 bt_ctfser_get_offset_in_current_packet_bits(&stream->ctfser) / 8 >= 4 * 1024 * 1024) {
286 /*
287 * Stream's current packet is larger than 4 MiB:
288 * close it. A new packet will be opened just
289 * below.
290 */
291 ret = fs_sink_stream_close_packet(stream, NULL);
292 if (ret) {
293 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to close packet.");
294 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
295 goto end;
296 }
297 }
298
299 if (!stream->packet_state.is_open) {
300 /* Stream's packet is not currently opened: open it */
301 ret = fs_sink_stream_open_packet(stream, NULL, NULL);
302 if (ret) {
303 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to open packet.");
304 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
305 goto end;
306 }
307 }
308 }
309
310 BT_ASSERT_DBG(stream->packet_state.is_open);
311 ret = fs_sink_stream_write_event(stream, cs, ir_event, ec);
312 if (G_UNLIKELY(ret)) {
313 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to write event.");
314 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
315 goto end;
316 }
15fe47e0
PP
317
318end:
4164020e 319 return status;
15fe47e0
PP
320}
321
4164020e
SM
322static inline bt_component_class_sink_consume_method_status
323handle_packet_beginning_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 324{
4164020e
SM
325 int ret;
326 bt_component_class_sink_consume_method_status status =
327 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
328 const bt_packet *ir_packet = bt_message_packet_beginning_borrow_packet_const(msg);
329 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
330 struct fs_sink_stream *stream;
331 const bt_clock_snapshot *cs = NULL;
332
333 stream = borrow_stream(fs_sink, ir_stream);
334 if (G_UNLIKELY(!stream)) {
335 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
336 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
337 goto end;
338 }
339
340 if (stream->sc->packets_have_ts_begin) {
341 cs = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
342 BT_ASSERT(cs);
343 }
344
345 /*
346 * If we previously received a discarded events message with
347 * a time range, make sure that its beginning time matches what's
348 * expected for CTF 1.8, that is:
349 *
350 * * Its beginning time is the previous packet's end
351 * time (or the current packet's beginning time if
352 * this is the first packet).
353 *
354 * We check this here instead of in handle_packet_end_msg()
355 * because we want to catch any incompatible message as early as
356 * possible to report the error.
357 *
358 * Validation of the discarded events message's end time is
359 * performed in handle_packet_end_msg().
360 */
361 if (stream->discarded_events_state.in_range) {
362 uint64_t expected_cs;
363
364 /*
365 * `stream->discarded_events_state.in_range` is only set
366 * when the stream class's discarded events have a time
367 * range.
368 *
369 * It is required that the packet beginning and end
370 * messages for this stream class have times when
371 * discarded events have a time range.
372 */
373 BT_ASSERT(stream->sc->discarded_events_has_ts);
374 BT_ASSERT(stream->sc->packets_have_ts_begin);
375 BT_ASSERT(stream->sc->packets_have_ts_end);
376
377 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
378 /* We're opening the first packet */
379 expected_cs = bt_clock_snapshot_get_value(cs);
380 } else {
381 expected_cs = stream->prev_packet_state.end_cs;
382 }
383
384 if (stream->discarded_events_state.beginning_cs != expected_cs) {
385 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
386 "Incompatible discarded events message: "
387 "unexpected beginning time: "
388 "beginning-cs-val=%" PRIu64 ", "
389 "expected-beginning-cs-val=%" PRIu64 ", "
390 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
391 "trace-name=\"%s\", path=\"%s/%s\"",
392 stream->discarded_events_state.beginning_cs, expected_cs,
393 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
394 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
395 stream->trace->path->str, stream->file_name->str);
396 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
397 goto end;
398 }
399 }
400
401 /*
402 * If we previously received a discarded packets message with a
403 * time range, make sure that its beginning and end times match
404 * what's expected for CTF 1.8, that is:
405 *
406 * * Its beginning time is the previous packet's end time.
407 *
408 * * Its end time is the current packet's beginning time.
409 */
410 if (stream->discarded_packets_state.in_range) {
411 uint64_t expected_end_cs;
412
413 /*
414 * `stream->discarded_packets_state.in_range` is only
415 * set when the stream class's discarded packets have a
416 * time range.
417 *
418 * It is required that the packet beginning and end
419 * messages for this stream class have times when
420 * discarded packets have a time range.
421 */
422 BT_ASSERT(stream->sc->discarded_packets_has_ts);
423 BT_ASSERT(stream->sc->packets_have_ts_begin);
424 BT_ASSERT(stream->sc->packets_have_ts_end);
425
426 /*
427 * It is not supported to have a discarded packets
428 * message _before_ the first packet: we cannot validate
429 * that its beginning time is compatible with CTF 1.8 in
430 * this case.
431 */
432 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
433 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
434 "Incompatible discarded packets message "
435 "occurring before the stream's first packet: "
436 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
437 "trace-name=\"%s\", path=\"%s/%s\"",
438 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
439 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
440 stream->trace->path->str, stream->file_name->str);
441 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
442 goto end;
443 }
444
445 if (stream->discarded_packets_state.beginning_cs != stream->prev_packet_state.end_cs) {
446 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
447 "Incompatible discarded packets message: "
448 "unexpected beginning time: "
449 "beginning-cs-val=%" PRIu64 ", "
450 "expected-beginning-cs-val=%" PRIu64 ", "
451 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
452 "trace-name=\"%s\", path=\"%s/%s\"",
453 stream->discarded_packets_state.beginning_cs,
454 stream->prev_packet_state.end_cs, bt_stream_get_id(ir_stream),
455 bt_stream_get_name(ir_stream),
456 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
457 stream->trace->path->str, stream->file_name->str);
458 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
459 goto end;
460 }
461
462 expected_end_cs = bt_clock_snapshot_get_value(cs);
463
464 if (stream->discarded_packets_state.end_cs != expected_end_cs) {
465 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
466 "Incompatible discarded packets message: "
467 "unexpected end time: "
468 "end-cs-val=%" PRIu64 ", "
469 "expected-end-cs-val=%" PRIu64 ", "
470 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
471 "trace-name=\"%s\", path=\"%s/%s\"",
472 stream->discarded_packets_state.end_cs, expected_end_cs,
473 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
474 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
475 stream->trace->path->str, stream->file_name->str);
476 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
477 goto end;
478 }
479 }
480
481 /*
482 * We're not in a discarded packets time range anymore since we
483 * require that the discarded packets time ranges go from one
484 * packet's end time to the next packet's beginning time, and
485 * we're handling a packet beginning message here.
486 */
487 stream->discarded_packets_state.in_range = false;
488
489 ret = fs_sink_stream_open_packet(stream, cs, ir_packet);
490 if (ret) {
491 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to open packet.");
492 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
493 goto end;
494 }
15fe47e0
PP
495
496end:
4164020e 497 return status;
15fe47e0
PP
498}
499
4164020e
SM
500static inline bt_component_class_sink_consume_method_status
501handle_packet_end_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 502{
4164020e
SM
503 int ret;
504 bt_component_class_sink_consume_method_status status =
505 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
506 const bt_packet *ir_packet = bt_message_packet_end_borrow_packet_const(msg);
507 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
508 struct fs_sink_stream *stream;
509 const bt_clock_snapshot *cs = NULL;
510
511 stream = borrow_stream(fs_sink, ir_stream);
512 if (G_UNLIKELY(!stream)) {
513 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
514 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
515 goto end;
516 }
517
518 if (stream->sc->packets_have_ts_end) {
519 cs = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
520 BT_ASSERT(cs);
521 }
522
523 /*
524 * If we previously received a discarded events message with
525 * a time range, make sure that its end time matches what's
526 * expected for CTF 1.8, that is:
527 *
528 * * Its end time is the current packet's end time.
529 *
530 * Validation of the discarded events message's beginning time
531 * is performed in handle_packet_beginning_msg().
532 */
533 if (stream->discarded_events_state.in_range) {
534 uint64_t expected_cs;
535
536 /*
537 * `stream->discarded_events_state.in_range` is only set
538 * when the stream class's discarded events have a time
539 * range.
540 *
541 * It is required that the packet beginning and end
542 * messages for this stream class have times when
543 * discarded events have a time range.
544 */
545 BT_ASSERT(stream->sc->discarded_events_has_ts);
546 BT_ASSERT(stream->sc->packets_have_ts_begin);
547 BT_ASSERT(stream->sc->packets_have_ts_end);
548
549 expected_cs = bt_clock_snapshot_get_value(cs);
550
551 if (stream->discarded_events_state.end_cs != expected_cs) {
552 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
553 "Incompatible discarded events message: "
554 "unexpected end time: "
555 "end-cs-val=%" PRIu64 ", "
556 "expected-end-cs-val=%" PRIu64 ", "
557 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
558 "trace-name=\"%s\", path=\"%s/%s\"",
559 stream->discarded_events_state.end_cs, expected_cs,
560 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
561 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
562 stream->trace->path->str, stream->file_name->str);
563 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
564 goto end;
565 }
566 }
567
568 ret = fs_sink_stream_close_packet(stream, cs);
569 if (ret) {
570 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to close packet.");
571 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
572 goto end;
573 }
574
575 /*
576 * We're not in a discarded events time range anymore since we
577 * require that the discarded events time ranges go from one
578 * packet's end time to the next packet's end time, and we're
579 * handling a packet end message here.
580 */
581 stream->discarded_events_state.in_range = false;
15fe47e0
PP
582
583end:
4164020e 584 return status;
15fe47e0
PP
585}
586
4164020e
SM
587static inline bt_component_class_sink_consume_method_status
588handle_stream_beginning_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 589{
4164020e
SM
590 bt_component_class_sink_consume_method_status status =
591 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
592 const bt_stream *ir_stream = bt_message_stream_beginning_borrow_stream_const(msg);
593 const bt_stream_class *ir_sc = bt_stream_borrow_class_const(ir_stream);
594 struct fs_sink_stream *stream;
595 bool packets_have_beginning_end_cs =
596 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc) &&
597 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc);
598
599 /*
600 * Not supported: discarded events or discarded packets support
601 * without packets support. Packets are the way to know where
602 * discarded events/packets occurred in CTF 1.8.
603 */
604 if (!bt_stream_class_supports_packets(ir_sc)) {
605 BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc));
606
607 if (!fs_sink->ignore_discarded_events && bt_stream_class_supports_discarded_events(ir_sc)) {
608 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
609 "Unsupported stream: "
610 "stream does not support packets, "
611 "but supports discarded events: "
612 "stream-addr=%p, "
613 "stream-id=%" PRIu64 ", "
614 "stream-name=\"%s\"",
615 ir_stream, bt_stream_get_id(ir_stream),
616 bt_stream_get_name(ir_stream));
617 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
618 goto end;
619 }
620 }
621
622 /*
623 * Not supported: discarded events with default clock snapshots,
624 * but packet beginning/end without default clock snapshot.
625 */
626 if (!fs_sink->ignore_discarded_events &&
627 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc) &&
628 !packets_have_beginning_end_cs) {
629 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
630 "Unsupported stream: discarded events have "
631 "default clock snapshots, but packets have no "
632 "beginning and/or end default clock snapshots: "
633 "stream-addr=%p, "
634 "stream-id=%" PRIu64 ", "
635 "stream-name=\"%s\"",
636 ir_stream, bt_stream_get_id(ir_stream),
637 bt_stream_get_name(ir_stream));
638 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
639 goto end;
640 }
641
642 /*
643 * Not supported: discarded packets with default clock
644 * snapshots, but packet beginning/end without default clock
645 * snapshot.
646 */
647 if (!fs_sink->ignore_discarded_packets &&
648 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc) &&
649 !packets_have_beginning_end_cs) {
650 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
651 "Unsupported stream: discarded packets have "
652 "default clock snapshots, but packets have no "
653 "beginning and/or end default clock snapshots: "
654 "stream-addr=%p, "
655 "stream-id=%" PRIu64 ", "
656 "stream-name=\"%s\"",
657 ir_stream, bt_stream_get_id(ir_stream),
658 bt_stream_get_name(ir_stream));
659 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
660 goto end;
661 }
662
663 stream = borrow_stream(fs_sink, ir_stream);
664 if (!stream) {
665 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
666 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
667 goto end;
668 }
669
670 BT_COMP_LOGI("Created new, empty stream file: "
671 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
672 "trace-name=\"%s\", path=\"%s/%s\"",
673 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
674 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
675 stream->trace->path->str, stream->file_name->str);
15fe47e0
PP
676
677end:
4164020e 678 return status;
15fe47e0
PP
679}
680
4164020e
SM
681static inline bt_component_class_sink_consume_method_status
682handle_stream_end_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 683{
4164020e
SM
684 bt_component_class_sink_consume_method_status status =
685 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
686 const bt_stream *ir_stream = bt_message_stream_end_borrow_stream_const(msg);
687 struct fs_sink_stream *stream;
688
689 stream = borrow_stream(fs_sink, ir_stream);
690 if (!stream) {
691 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
692 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
693 goto end;
694 }
695
696 if (G_UNLIKELY(!stream->sc->has_packets && stream->packet_state.is_open)) {
697 /* Close stream's current artificial packet */
698 int ret = fs_sink_stream_close_packet(stream, NULL);
699
700 if (ret) {
701 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to close packet.");
702 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
703 goto end;
704 }
705 }
706
707 BT_COMP_LOGI("Closing stream file: "
708 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
709 "trace-name=\"%s\", path=\"%s/%s\"",
710 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
711 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
712 stream->trace->path->str, stream->file_name->str);
713
714 /*
715 * This destroys the stream object and frees all its resources,
716 * closing the stream file.
717 */
718 g_hash_table_remove(stream->trace->streams, ir_stream);
15fe47e0
PP
719
720end:
4164020e 721 return status;
15fe47e0
PP
722}
723
4164020e
SM
724static inline bt_component_class_sink_consume_method_status
725handle_discarded_events_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 726{
4164020e
SM
727 bt_component_class_sink_consume_method_status status =
728 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
729 const bt_stream *ir_stream = bt_message_discarded_events_borrow_stream_const(msg);
730 struct fs_sink_stream *stream;
731 const bt_clock_snapshot *cs = NULL;
732 bt_property_availability avail;
733 uint64_t count;
734
735 stream = borrow_stream(fs_sink, ir_stream);
736 if (!stream) {
737 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
738 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
739 goto end;
740 }
741
742 if (fs_sink->ignore_discarded_events) {
743 BT_COMP_LOGI("Ignoring discarded events message: "
744 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
745 "trace-name=\"%s\", path=\"%s/%s\"",
746 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
747 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
748 stream->trace->path->str, stream->file_name->str);
749 goto end;
750 }
751
752 if (stream->discarded_events_state.in_range) {
753 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
754 "Unsupported contiguous discarded events message: "
755 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
756 "trace-name=\"%s\", path=\"%s/%s\"",
757 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
758 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
759 stream->trace->path->str, stream->file_name->str);
760 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
761 goto end;
762 }
763
764 /*
765 * If we're currently in an opened packet (got a packet
766 * beginning message, but no packet end message yet), we do not
767 * support having a discarded events message with a time range
768 * because we require that the discarded events message's time
769 * range go from a packet's end time to the next packet's end
770 * time.
771 */
772 if (stream->packet_state.is_open && stream->sc->discarded_events_has_ts) {
773 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
774 "Unsupported discarded events message with "
775 "default clock snapshots occurring within a packet: "
776 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
777 "trace-name=\"%s\", path=\"%s/%s\"",
778 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
779 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
780 stream->trace->path->str, stream->file_name->str);
781 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
782 goto end;
783 }
784
785 if (stream->sc->discarded_events_has_ts) {
786 /*
787 * Make the stream's state be in the time range of a
788 * discarded events message since we have the message's
789 * time range (`stream->sc->discarded_events_has_ts`).
790 */
791 stream->discarded_events_state.in_range = true;
792
793 /*
794 * The clock snapshot values will be validated when
795 * handling the next packet beginning and end messages
796 * (next calls to handle_packet_beginning_msg() and
797 * handle_packet_end_msg()).
798 */
799 cs = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
800 BT_ASSERT(cs);
801 stream->discarded_events_state.beginning_cs = bt_clock_snapshot_get_value(cs);
802 cs = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg);
803 BT_ASSERT(cs);
804 stream->discarded_events_state.end_cs = bt_clock_snapshot_get_value(cs);
805 }
806
807 avail = bt_message_discarded_events_get_count(msg, &count);
808 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
809 /*
810 * There's no specific count of discarded events: set it
811 * to 1 so that we know that we at least discarded
812 * something.
813 */
814 count = 1;
815 }
816
817 stream->packet_state.discarded_events_counter += count;
15fe47e0
PP
818
819end:
4164020e 820 return status;
15fe47e0
PP
821}
822
4164020e
SM
823static inline bt_component_class_sink_consume_method_status
824handle_discarded_packets_msg(struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 825{
4164020e
SM
826 bt_component_class_sink_consume_method_status status =
827 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
828 const bt_stream *ir_stream = bt_message_discarded_packets_borrow_stream_const(msg);
829 struct fs_sink_stream *stream;
830 const bt_clock_snapshot *cs = NULL;
831 bt_property_availability avail;
832 uint64_t count;
833
834 stream = borrow_stream(fs_sink, ir_stream);
835 if (!stream) {
836 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream.");
837 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
838 goto end;
839 }
840
841 if (fs_sink->ignore_discarded_packets) {
842 BT_COMP_LOGI("Ignoring discarded packets message: "
843 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
844 "trace-name=\"%s\", path=\"%s/%s\"",
845 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
846 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
847 stream->trace->path->str, stream->file_name->str);
848 goto end;
849 }
850
851 if (stream->discarded_packets_state.in_range) {
852 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
853 "Unsupported contiguous discarded packets message: "
854 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
855 "trace-name=\"%s\", path=\"%s/%s\"",
856 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
857 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
858 stream->trace->path->str, stream->file_name->str);
859 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
860 goto end;
861 }
862
863 /*
864 * Discarded packets messages are guaranteed to occur between
865 * packets.
866 */
867 BT_ASSERT(!stream->packet_state.is_open);
868
869 if (stream->sc->discarded_packets_has_ts) {
870 /*
871 * Make the stream's state be in the time range of a
872 * discarded packets message since we have the message's
873 * time range (`stream->sc->discarded_packets_has_ts`).
874 */
875 stream->discarded_packets_state.in_range = true;
876
877 /*
878 * The clock snapshot values will be validated when
879 * handling the next packet beginning message (next call
880 * to handle_packet_beginning_msg()).
881 */
882 cs = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
883 BT_ASSERT(cs);
884 stream->discarded_packets_state.beginning_cs = bt_clock_snapshot_get_value(cs);
885 cs = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg);
886 BT_ASSERT(cs);
887 stream->discarded_packets_state.end_cs = bt_clock_snapshot_get_value(cs);
888 }
889
890 avail = bt_message_discarded_packets_get_count(msg, &count);
891 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
892 /*
893 * There's no specific count of discarded packets: set
894 * it to 1 so that we know that we at least discarded
895 * something.
896 */
897 count = 1;
898 }
899
900 stream->packet_state.seq_num += count;
15fe47e0
PP
901
902end:
4164020e 903 return status;
15fe47e0
PP
904}
905
4164020e 906static inline void put_messages(bt_message_array_const msgs, uint64_t count)
15fe47e0 907{
4164020e 908 uint64_t i;
15fe47e0 909
4164020e
SM
910 for (i = 0; i < count; i++) {
911 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
912 }
15fe47e0
PP
913}
914
4164020e 915bt_component_class_sink_consume_method_status ctf_fs_sink_consume(bt_self_component_sink *self_comp)
15fe47e0 916{
4164020e
SM
917 bt_component_class_sink_consume_method_status status =
918 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
919 struct fs_sink_comp *fs_sink;
920 bt_message_iterator_next_status next_status;
921 uint64_t msg_count = 0;
922 bt_message_array_const msgs;
923
924 fs_sink = (fs_sink_comp *) bt_self_component_get_data(
925 bt_self_component_sink_as_self_component(self_comp));
926 BT_ASSERT_DBG(fs_sink);
927 BT_ASSERT_DBG(fs_sink->upstream_iter);
928
929 /* Consume messages */
930 next_status = bt_message_iterator_next(fs_sink->upstream_iter, &msgs, &msg_count);
931 if (next_status < 0) {
932 status = (bt_component_class_sink_consume_method_status) next_status;
933 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
934 "Failed to get next message from upstream iterator.");
935 goto end;
936 }
937
938 switch (next_status) {
939 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK:
940 {
941 uint64_t i;
942
943 for (i = 0; i < msg_count; i++) {
944 const bt_message *msg = msgs[i];
945
946 BT_ASSERT_DBG(msg);
947
948 switch (bt_message_get_type(msg)) {
949 case BT_MESSAGE_TYPE_EVENT:
950 status = handle_event_msg(fs_sink, msg);
951 break;
952 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
953 status = handle_packet_beginning_msg(fs_sink, msg);
954 break;
955 case BT_MESSAGE_TYPE_PACKET_END:
956 status = handle_packet_end_msg(fs_sink, msg);
957 break;
958 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
959 /* Ignore */
960 BT_COMP_LOGD_STR("Ignoring message iterator inactivity message.");
961 break;
962 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
963 status = handle_stream_beginning_msg(fs_sink, msg);
964 break;
965 case BT_MESSAGE_TYPE_STREAM_END:
966 status = handle_stream_end_msg(fs_sink, msg);
967 break;
968 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
969 status = handle_discarded_events_msg(fs_sink, msg);
970 break;
971 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
972 status = handle_discarded_packets_msg(fs_sink, msg);
973 break;
974 default:
975 bt_common_abort();
976 }
977
978 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
979
980 if (status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK) {
981 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
982 "Failed to handle message: "
983 "generated CTF traces could be incomplete: "
984 "output-dir-path=\"%s\"",
985 fs_sink->output_dir_path->str);
986 goto error;
987 }
988 }
989
990 break;
991 }
992 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN:
993 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN;
994 break;
995 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END:
996 /* TODO: Finalize all traces (should already be done?) */
997 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END;
998 break;
999 default:
1000 break;
1001 }
1002
1003 goto end;
15fe47e0
PP
1004
1005error:
4164020e
SM
1006 BT_ASSERT(status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK);
1007 put_messages(msgs, msg_count);
15fe47e0
PP
1008
1009end:
4164020e 1010 return status;
15fe47e0
PP
1011}
1012
e803df70 1013bt_component_class_sink_graph_is_configured_method_status
4164020e 1014ctf_fs_sink_graph_is_configured(bt_self_component_sink *self_comp)
15fe47e0 1015{
4164020e
SM
1016 bt_component_class_sink_graph_is_configured_method_status status;
1017 bt_message_iterator_create_from_sink_component_status msg_iter_status;
1018 fs_sink_comp *fs_sink = (fs_sink_comp *) bt_self_component_get_data(
1019 bt_self_component_sink_as_self_component(self_comp));
1020
1021 msg_iter_status = bt_message_iterator_create_from_sink_component(
1022 self_comp, bt_self_component_sink_borrow_input_port_by_name(self_comp, in_port_name),
1023 &fs_sink->upstream_iter);
1024 if (msg_iter_status != BT_MESSAGE_ITERATOR_CREATE_FROM_SINK_COMPONENT_STATUS_OK) {
1025 status = (bt_component_class_sink_graph_is_configured_method_status) msg_iter_status;
1026 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to create upstream iterator.");
1027 goto end;
1028 }
1029
1030 status = BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK;
15fe47e0 1031end:
4164020e 1032 return status;
15fe47e0
PP
1033}
1034
15fe47e0
PP
1035void ctf_fs_sink_finalize(bt_self_component_sink *self_comp)
1036{
4164020e
SM
1037 fs_sink_comp *fs_sink = (fs_sink_comp *) bt_self_component_get_data(
1038 bt_self_component_sink_as_self_component(self_comp));
15fe47e0 1039
4164020e 1040 destroy_fs_sink_comp(fs_sink);
15fe47e0 1041}
This page took 0.112636 seconds and 4 git commands to generate.