e19624e07903dac0cc59c799a3d29267624aad37
[babeltrace.git] / src / plugins / ctf / fs-sink / fs-sink.c
1 /*
2 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 * SOFTWARE.
21 */
22
23 #define BT_COMP_LOG_SELF_COMP (fs_sink->self_comp)
24 #define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level)
25 #define BT_LOG_TAG "PLUGIN/SINK.CTF.FS"
26 #include "logging/comp-logging.h"
27
28 #include <babeltrace2/babeltrace.h>
29 #include <stdio.h>
30 #include <stdbool.h>
31 #include <glib.h>
32 #include "common/assert.h"
33 #include "ctfser/ctfser.h"
34 #include "plugins/common/param-validation/param-validation.h"
35
36 #include "fs-sink.h"
37 #include "fs-sink-trace.h"
38 #include "fs-sink-stream.h"
39 #include "fs-sink-ctf-meta.h"
40 #include "translate-trace-ir-to-ctf-ir.h"
41 #include "translate-ctf-ir-to-tsdl.h"
42
43 static
44 const char * const in_port_name = "in";
45
46 static
47 bt_component_class_initialize_method_status ensure_output_dir_exists(
48 struct fs_sink_comp *fs_sink)
49 {
50 bt_component_class_initialize_method_status status =
51 BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
52 int ret;
53
54 ret = g_mkdir_with_parents(fs_sink->output_dir_path->str, 0755);
55 if (ret) {
56 BT_COMP_LOGE_ERRNO(
57 "Cannot create directories for output directory",
58 ": output-dir-path=\"%s\"",
59 fs_sink->output_dir_path->str);
60 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
61 goto end;
62 }
63
64 end:
65 return status;
66 }
67
68 static struct bt_param_validation_map_value_entry_descr fs_sink_params_descr[] = {
69 { "path", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY, { .type = BT_VALUE_TYPE_STRING } },
70 { "assume-single-trace", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .type = BT_VALUE_TYPE_BOOL } },
71 { "ignore-discarded-events", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .type = BT_VALUE_TYPE_BOOL } },
72 { "ignore-discarded-packets", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .type = BT_VALUE_TYPE_BOOL } },
73 { "quiet", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .type = BT_VALUE_TYPE_BOOL } },
74 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END
75 };
76
77 static
78 bt_component_class_initialize_method_status
79 configure_component(struct fs_sink_comp *fs_sink, const bt_value *params)
80 {
81 bt_component_class_initialize_method_status status;
82 const bt_value *value;
83 enum bt_param_validation_status validation_status;
84 gchar *validation_error;
85
86 validation_status = bt_param_validation_validate(params,
87 fs_sink_params_descr, &validation_error);
88 if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
89 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
90 goto end;
91 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
92 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
93 goto end;
94 }
95
96 value = bt_value_map_borrow_entry_value_const(params, "path");
97 g_string_assign(fs_sink->output_dir_path,
98 bt_value_string_get(value));
99
100 value = bt_value_map_borrow_entry_value_const(params,
101 "assume-single-trace");
102 if (value) {
103 fs_sink->assume_single_trace = (bool) bt_value_bool_get(value);
104 }
105
106 value = bt_value_map_borrow_entry_value_const(params,
107 "ignore-discarded-events");
108 if (value) {
109 fs_sink->ignore_discarded_events =
110 (bool) bt_value_bool_get(value);
111 }
112
113 value = bt_value_map_borrow_entry_value_const(params,
114 "ignore-discarded-packets");
115 if (value) {
116 fs_sink->ignore_discarded_packets =
117 (bool) bt_value_bool_get(value);
118 }
119
120 value = bt_value_map_borrow_entry_value_const(params,
121 "quiet");
122 if (value) {
123 fs_sink->quiet = (bool) bt_value_bool_get(value);
124 }
125
126 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
127
128 end:
129 g_free(validation_error);
130 return status;
131 }
132
133 static
134 void destroy_fs_sink_comp(struct fs_sink_comp *fs_sink)
135 {
136 if (!fs_sink) {
137 goto end;
138 }
139
140 if (fs_sink->output_dir_path) {
141 g_string_free(fs_sink->output_dir_path, TRUE);
142 fs_sink->output_dir_path = NULL;
143 }
144
145 if (fs_sink->traces) {
146 g_hash_table_destroy(fs_sink->traces);
147 fs_sink->traces = NULL;
148 }
149
150 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(
151 fs_sink->upstream_iter);
152 g_free(fs_sink);
153
154 end:
155 return;
156 }
157
158 BT_HIDDEN
159 bt_component_class_initialize_method_status ctf_fs_sink_init(
160 bt_self_component_sink *self_comp_sink,
161 bt_self_component_sink_configuration *config,
162 const bt_value *params,
163 void *init_method_data)
164 {
165 bt_component_class_initialize_method_status status =
166 BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
167 bt_self_component_add_port_status add_port_status;
168 struct fs_sink_comp *fs_sink = NULL;
169 bt_self_component *self_comp =
170 bt_self_component_sink_as_self_component(self_comp_sink);
171 bt_logging_level log_level = bt_component_get_logging_level(
172 bt_self_component_as_component(self_comp));
173
174 fs_sink = g_new0(struct fs_sink_comp, 1);
175 if (!fs_sink) {
176 BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR, log_level, self_comp,
177 "Failed to allocate one CTF FS sink structure.");
178 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
179 goto end;
180 }
181
182 fs_sink->log_level = log_level;
183 fs_sink->self_comp = self_comp;
184 fs_sink->output_dir_path = g_string_new(NULL);
185 status = configure_component(fs_sink, params);
186 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
187 /* configure_component() logs errors */
188 goto end;
189 }
190
191 if (fs_sink->assume_single_trace &&
192 g_file_test(fs_sink->output_dir_path->str,
193 G_FILE_TEST_EXISTS)) {
194 BT_COMP_LOGE("Single trace mode, but output path exists: "
195 "output-path=\"%s\"", fs_sink->output_dir_path->str);
196 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
197 goto end;
198 }
199
200 status = ensure_output_dir_exists(fs_sink);
201 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
202 /* ensure_output_dir_exists() logs errors */
203 goto end;
204 }
205
206 fs_sink->traces = g_hash_table_new_full(g_direct_hash, g_direct_equal,
207 NULL, (GDestroyNotify) fs_sink_trace_destroy);
208 if (!fs_sink->traces) {
209 BT_COMP_LOGE_STR("Failed to allocate one GHashTable.");
210 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
211 goto end;
212 }
213
214 add_port_status = bt_self_component_sink_add_input_port(
215 self_comp_sink, in_port_name, NULL, NULL);
216 switch (add_port_status) {
217 case BT_SELF_COMPONENT_ADD_PORT_STATUS_ERROR:
218 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
219 goto end;
220 case BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR:
221 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
222 goto end;
223 default:
224 break;
225 }
226
227 bt_self_component_set_data(self_comp, fs_sink);
228
229 end:
230 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
231 destroy_fs_sink_comp(fs_sink);
232 }
233
234 return status;
235 }
236
237 static inline
238 struct fs_sink_stream *borrow_stream(struct fs_sink_comp *fs_sink,
239 const bt_stream *ir_stream)
240 {
241 const bt_trace *ir_trace = bt_stream_borrow_trace_const(ir_stream);
242 struct fs_sink_trace *trace;
243 struct fs_sink_stream *stream = NULL;
244
245 trace = g_hash_table_lookup(fs_sink->traces, ir_trace);
246 if (G_UNLIKELY(!trace)) {
247 if (fs_sink->assume_single_trace &&
248 g_hash_table_size(fs_sink->traces) > 0) {
249 BT_COMP_LOGE("Single trace mode, but getting more than one trace: "
250 "stream-name=\"%s\"",
251 bt_stream_get_name(ir_stream));
252 goto end;
253 }
254
255 trace = fs_sink_trace_create(fs_sink, ir_trace);
256 if (!trace) {
257 goto end;
258 }
259 }
260
261 stream = g_hash_table_lookup(trace->streams, ir_stream);
262 if (G_UNLIKELY(!stream)) {
263 stream = fs_sink_stream_create(trace, ir_stream);
264 if (!stream) {
265 goto end;
266 }
267 }
268
269 end:
270 return stream;
271 }
272
273 static inline
274 bt_component_class_sink_consume_method_status handle_event_msg(
275 struct fs_sink_comp *fs_sink, const bt_message *msg)
276 {
277 int ret;
278 bt_component_class_sink_consume_method_status status =
279 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
280 const bt_event *ir_event = bt_message_event_borrow_event_const(msg);
281 const bt_stream *ir_stream = bt_event_borrow_stream_const(ir_event);
282 struct fs_sink_stream *stream;
283 struct fs_sink_ctf_event_class *ec = NULL;
284 const bt_clock_snapshot *cs = NULL;
285
286 stream = borrow_stream(fs_sink, ir_stream);
287 if (G_UNLIKELY(!stream)) {
288 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
289 goto end;
290 }
291
292 ret = try_translate_event_class_trace_ir_to_ctf_ir(fs_sink,
293 stream->sc, bt_event_borrow_class_const(ir_event), &ec);
294 if (ret) {
295 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
296 goto end;
297 }
298
299 BT_ASSERT(ec);
300
301 if (stream->sc->default_clock_class) {
302 cs = bt_message_event_borrow_default_clock_snapshot_const(
303 msg);
304 }
305
306 /*
307 * If this event's stream does not support packets, then we
308 * lazily create artificial packets.
309 *
310 * The size of an artificial packet is arbitrarily at least
311 * 4 MiB (it usually is greater because we close it when
312 * comes the time to write a new event and the packet's content
313 * size is >= 4 MiB), except the last one which can be smaller.
314 */
315 if (G_UNLIKELY(!stream->sc->has_packets)) {
316 if (stream->packet_state.is_open &&
317 bt_ctfser_get_offset_in_current_packet_bits(&stream->ctfser) / 8 >=
318 4 * 1024 * 1024) {
319 /*
320 * Stream's current packet is larger than 4 MiB:
321 * close it. A new packet will be opened just
322 * below.
323 */
324 ret = fs_sink_stream_close_packet(stream, NULL);
325 if (ret) {
326 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
327 goto end;
328 }
329 }
330
331 if (!stream->packet_state.is_open) {
332 /* Stream's packet is not currently opened: open it */
333 ret = fs_sink_stream_open_packet(stream, NULL, NULL);
334 if (ret) {
335 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
336 goto end;
337 }
338 }
339 }
340
341 BT_ASSERT(stream->packet_state.is_open);
342 ret = fs_sink_stream_write_event(stream, cs, ir_event, ec);
343 if (G_UNLIKELY(ret)) {
344 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
345 goto end;
346 }
347
348 end:
349 return status;
350 }
351
352 static inline
353 bt_component_class_sink_consume_method_status handle_packet_beginning_msg(
354 struct fs_sink_comp *fs_sink, const bt_message *msg)
355 {
356 int ret;
357 bt_component_class_sink_consume_method_status status =
358 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
359 const bt_packet *ir_packet =
360 bt_message_packet_beginning_borrow_packet_const(msg);
361 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
362 struct fs_sink_stream *stream;
363 const bt_clock_snapshot *cs = NULL;
364
365 stream = borrow_stream(fs_sink, ir_stream);
366 if (G_UNLIKELY(!stream)) {
367 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
368 goto end;
369 }
370
371 if (stream->sc->packets_have_ts_begin) {
372 cs = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
373 msg);
374 BT_ASSERT(cs);
375 }
376
377 /*
378 * If we previously received a discarded events message with
379 * a time range, make sure that its beginning time matches what's
380 * expected for CTF 1.8, that is:
381 *
382 * * Its beginning time is the previous packet's end
383 * time (or the current packet's beginning time if
384 * this is the first packet).
385 *
386 * We check this here instead of in handle_packet_end_msg()
387 * because we want to catch any incompatible message as early as
388 * possible to report the error.
389 *
390 * Validation of the discarded events message's end time is
391 * performed in handle_packet_end_msg().
392 */
393 if (stream->discarded_events_state.in_range) {
394 uint64_t expected_cs;
395
396 /*
397 * `stream->discarded_events_state.in_range` is only set
398 * when the stream class's discarded events have a time
399 * range.
400 *
401 * It is required that the packet beginning and end
402 * messages for this stream class have times when
403 * discarded events have a time range.
404 */
405 BT_ASSERT(stream->sc->discarded_events_has_ts);
406 BT_ASSERT(stream->sc->packets_have_ts_begin);
407 BT_ASSERT(stream->sc->packets_have_ts_end);
408
409 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
410 /* We're opening the first packet */
411 expected_cs = bt_clock_snapshot_get_value(cs);
412 } else {
413 expected_cs = stream->prev_packet_state.end_cs;
414 }
415
416 if (stream->discarded_events_state.beginning_cs !=
417 expected_cs) {
418 BT_COMP_LOGE("Incompatible discarded events message: "
419 "unexpected beginning time: "
420 "beginning-cs-val=%" PRIu64 ", "
421 "expected-beginning-cs-val=%" PRIu64 ", "
422 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
423 "trace-name=\"%s\", path=\"%s/%s\"",
424 stream->discarded_events_state.beginning_cs,
425 expected_cs,
426 bt_stream_get_id(ir_stream),
427 bt_stream_get_name(ir_stream),
428 bt_trace_get_name(
429 bt_stream_borrow_trace_const(ir_stream)),
430 stream->trace->path->str, stream->file_name->str);
431 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
432 goto end;
433 }
434 }
435
436 /*
437 * If we previously received a discarded packets message with a
438 * time range, make sure that its beginning and end times match
439 * what's expected for CTF 1.8, that is:
440 *
441 * * Its beginning time is the previous packet's end time.
442 *
443 * * Its end time is the current packet's beginning time.
444 */
445 if (stream->discarded_packets_state.in_range) {
446 uint64_t expected_end_cs;
447
448 /*
449 * `stream->discarded_packets_state.in_range` is only
450 * set when the stream class's discarded packets have a
451 * time range.
452 *
453 * It is required that the packet beginning and end
454 * messages for this stream class have times when
455 * discarded packets have a time range.
456 */
457 BT_ASSERT(stream->sc->discarded_packets_has_ts);
458 BT_ASSERT(stream->sc->packets_have_ts_begin);
459 BT_ASSERT(stream->sc->packets_have_ts_end);
460
461 /*
462 * It is not supported to have a discarded packets
463 * message _before_ the first packet: we cannot validate
464 * that its beginning time is compatible with CTF 1.8 in
465 * this case.
466 */
467 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
468 BT_COMP_LOGE("Incompatible discarded packets message "
469 "occuring before the stream's first packet: "
470 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
471 "trace-name=\"%s\", path=\"%s/%s\"",
472 bt_stream_get_id(ir_stream),
473 bt_stream_get_name(ir_stream),
474 bt_trace_get_name(
475 bt_stream_borrow_trace_const(ir_stream)),
476 stream->trace->path->str, stream->file_name->str);
477 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
478 goto end;
479 }
480
481 if (stream->discarded_packets_state.beginning_cs !=
482 stream->prev_packet_state.end_cs) {
483 BT_COMP_LOGE("Incompatible discarded packets message: "
484 "unexpected beginning time: "
485 "beginning-cs-val=%" PRIu64 ", "
486 "expected-beginning-cs-val=%" PRIu64 ", "
487 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
488 "trace-name=\"%s\", path=\"%s/%s\"",
489 stream->discarded_packets_state.beginning_cs,
490 stream->prev_packet_state.end_cs,
491 bt_stream_get_id(ir_stream),
492 bt_stream_get_name(ir_stream),
493 bt_trace_get_name(
494 bt_stream_borrow_trace_const(ir_stream)),
495 stream->trace->path->str, stream->file_name->str);
496 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
497 goto end;
498 }
499
500 expected_end_cs = bt_clock_snapshot_get_value(cs);
501
502 if (stream->discarded_packets_state.end_cs !=
503 expected_end_cs) {
504 BT_COMP_LOGE("Incompatible discarded packets message: "
505 "unexpected end time: "
506 "end-cs-val=%" PRIu64 ", "
507 "expected-end-cs-val=%" PRIu64 ", "
508 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
509 "trace-name=\"%s\", path=\"%s/%s\"",
510 stream->discarded_packets_state.end_cs,
511 expected_end_cs,
512 bt_stream_get_id(ir_stream),
513 bt_stream_get_name(ir_stream),
514 bt_trace_get_name(
515 bt_stream_borrow_trace_const(ir_stream)),
516 stream->trace->path->str, stream->file_name->str);
517 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
518 goto end;
519 }
520 }
521
522 /*
523 * We're not in a discarded packets time range anymore since we
524 * require that the discarded packets time ranges go from one
525 * packet's end time to the next packet's beginning time, and
526 * we're handling a packet beginning message here.
527 */
528 stream->discarded_packets_state.in_range = false;
529
530 ret = fs_sink_stream_open_packet(stream, cs, ir_packet);
531 if (ret) {
532 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
533 goto end;
534 }
535
536 end:
537 return status;
538 }
539
540 static inline
541 bt_component_class_sink_consume_method_status handle_packet_end_msg(
542 struct fs_sink_comp *fs_sink, const bt_message *msg)
543 {
544 int ret;
545 bt_component_class_sink_consume_method_status status =
546 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
547 const bt_packet *ir_packet =
548 bt_message_packet_end_borrow_packet_const(msg);
549 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
550 struct fs_sink_stream *stream;
551 const bt_clock_snapshot *cs = NULL;
552
553 stream = borrow_stream(fs_sink, ir_stream);
554 if (G_UNLIKELY(!stream)) {
555 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
556 goto end;
557 }
558
559 if (stream->sc->packets_have_ts_end) {
560 cs = bt_message_packet_end_borrow_default_clock_snapshot_const(
561 msg);
562 BT_ASSERT(cs);
563 }
564
565 /*
566 * If we previously received a discarded events message with
567 * a time range, make sure that its end time matches what's
568 * expected for CTF 1.8, that is:
569 *
570 * * Its end time is the current packet's end time.
571 *
572 * Validation of the discarded events message's beginning time
573 * is performed in handle_packet_beginning_msg().
574 */
575 if (stream->discarded_events_state.in_range) {
576 uint64_t expected_cs;
577
578 /*
579 * `stream->discarded_events_state.in_range` is only set
580 * when the stream class's discarded events have a time
581 * range.
582 *
583 * It is required that the packet beginning and end
584 * messages for this stream class have times when
585 * discarded events have a time range.
586 */
587 BT_ASSERT(stream->sc->discarded_events_has_ts);
588 BT_ASSERT(stream->sc->packets_have_ts_begin);
589 BT_ASSERT(stream->sc->packets_have_ts_end);
590
591 expected_cs = bt_clock_snapshot_get_value(cs);
592
593 if (stream->discarded_events_state.end_cs != expected_cs) {
594 BT_COMP_LOGE("Incompatible discarded events message: "
595 "unexpected end time: "
596 "end-cs-val=%" PRIu64 ", "
597 "expected-end-cs-val=%" PRIu64 ", "
598 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
599 "trace-name=\"%s\", path=\"%s/%s\"",
600 stream->discarded_events_state.end_cs,
601 expected_cs,
602 bt_stream_get_id(ir_stream),
603 bt_stream_get_name(ir_stream),
604 bt_trace_get_name(
605 bt_stream_borrow_trace_const(ir_stream)),
606 stream->trace->path->str, stream->file_name->str);
607 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
608 goto end;
609 }
610 }
611
612 ret = fs_sink_stream_close_packet(stream, cs);
613 if (ret) {
614 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
615 goto end;
616 }
617
618 /*
619 * We're not in a discarded events time range anymore since we
620 * require that the discarded events time ranges go from one
621 * packet's end time to the next packet's end time, and we're
622 * handling a packet end message here.
623 */
624 stream->discarded_events_state.in_range = false;
625
626 end:
627 return status;
628 }
629
630 static inline
631 bt_component_class_sink_consume_method_status handle_stream_beginning_msg(
632 struct fs_sink_comp *fs_sink, const bt_message *msg)
633 {
634 bt_component_class_sink_consume_method_status status =
635 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
636 const bt_stream *ir_stream =
637 bt_message_stream_beginning_borrow_stream_const(msg);
638 const bt_stream_class *ir_sc =
639 bt_stream_borrow_class_const(ir_stream);
640 struct fs_sink_stream *stream;
641 bool packets_have_beginning_end_cs =
642 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc) &&
643 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc);
644
645 /*
646 * Not supported: discarded events or discarded packets support
647 * without packets support. Packets are the way to know where
648 * discarded events/packets occured in CTF 1.8.
649 */
650 if (!bt_stream_class_supports_packets(ir_sc)) {
651 BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc));
652
653 if (!fs_sink->ignore_discarded_events &&
654 bt_stream_class_supports_discarded_events(ir_sc)) {
655 BT_COMP_LOGE("Unsupported stream: "
656 "stream does not support packets, "
657 "but supports discarded events: "
658 "stream-addr=%p, "
659 "stream-id=%" PRIu64 ", "
660 "stream-name=\"%s\"",
661 ir_stream, bt_stream_get_id(ir_stream),
662 bt_stream_get_name(ir_stream));
663 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
664 goto end;
665 }
666 }
667
668 /*
669 * Not supported: discarded events with default clock snapshots,
670 * but packet beginning/end without default clock snapshot.
671 */
672 if (!fs_sink->ignore_discarded_events &&
673 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc) &&
674 !packets_have_beginning_end_cs) {
675 BT_COMP_LOGE("Unsupported stream: discarded events have "
676 "default clock snapshots, but packets have no "
677 "beginning and/or end default clock snapshots: "
678 "stream-addr=%p, "
679 "stream-id=%" PRIu64 ", "
680 "stream-name=\"%s\"",
681 ir_stream, bt_stream_get_id(ir_stream),
682 bt_stream_get_name(ir_stream));
683 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
684 goto end;
685 }
686
687 /*
688 * Not supported: discarded packets with default clock
689 * snapshots, but packet beginning/end without default clock
690 * snapshot.
691 */
692 if (!fs_sink->ignore_discarded_packets &&
693 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc) &&
694 !packets_have_beginning_end_cs) {
695 BT_COMP_LOGE("Unsupported stream: discarded packets have "
696 "default clock snapshots, but packets have no "
697 "beginning and/or end default clock snapshots: "
698 "stream-addr=%p, "
699 "stream-id=%" PRIu64 ", "
700 "stream-name=\"%s\"",
701 ir_stream, bt_stream_get_id(ir_stream),
702 bt_stream_get_name(ir_stream));
703 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
704 goto end;
705 }
706
707 stream = borrow_stream(fs_sink, ir_stream);
708 if (!stream) {
709 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
710 goto end;
711 }
712
713 BT_COMP_LOGI("Created new, empty stream file: "
714 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
715 "trace-name=\"%s\", path=\"%s/%s\"",
716 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
717 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
718 stream->trace->path->str, stream->file_name->str);
719
720 end:
721 return status;
722 }
723
724 static inline
725 bt_component_class_sink_consume_method_status handle_stream_end_msg(
726 struct fs_sink_comp *fs_sink, const bt_message *msg)
727 {
728 bt_component_class_sink_consume_method_status status =
729 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
730 const bt_stream *ir_stream =
731 bt_message_stream_end_borrow_stream_const(msg);
732 struct fs_sink_stream *stream;
733
734 stream = borrow_stream(fs_sink, ir_stream);
735 if (!stream) {
736 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
737 goto end;
738 }
739
740 if (G_UNLIKELY(!stream->sc->has_packets &&
741 stream->packet_state.is_open)) {
742 /* Close stream's current artificial packet */
743 int ret = fs_sink_stream_close_packet(stream, NULL);
744
745 if (ret) {
746 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
747 goto end;
748 }
749 }
750
751 BT_COMP_LOGI("Closing stream file: "
752 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
753 "trace-name=\"%s\", path=\"%s/%s\"",
754 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
755 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
756 stream->trace->path->str, stream->file_name->str);
757
758 /*
759 * This destroys the stream object and frees all its resources,
760 * closing the stream file.
761 */
762 g_hash_table_remove(stream->trace->streams, ir_stream);
763
764 end:
765 return status;
766 }
767
768 static inline
769 bt_component_class_sink_consume_method_status handle_discarded_events_msg(
770 struct fs_sink_comp *fs_sink, const bt_message *msg)
771 {
772 bt_component_class_sink_consume_method_status status =
773 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
774 const bt_stream *ir_stream =
775 bt_message_discarded_events_borrow_stream_const(msg);
776 struct fs_sink_stream *stream;
777 const bt_clock_snapshot *cs = NULL;
778 bt_property_availability avail;
779 uint64_t count;
780
781 stream = borrow_stream(fs_sink, ir_stream);
782 if (!stream) {
783 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
784 goto end;
785 }
786
787 if (fs_sink->ignore_discarded_events) {
788 BT_COMP_LOGI("Ignoring discarded events message: "
789 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
790 "trace-name=\"%s\", path=\"%s/%s\"",
791 bt_stream_get_id(ir_stream),
792 bt_stream_get_name(ir_stream),
793 bt_trace_get_name(
794 bt_stream_borrow_trace_const(ir_stream)),
795 stream->trace->path->str, stream->file_name->str);
796 goto end;
797 }
798
799 if (stream->discarded_events_state.in_range) {
800 BT_COMP_LOGE("Unsupported contiguous discarded events message: "
801 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
802 "trace-name=\"%s\", path=\"%s/%s\"",
803 bt_stream_get_id(ir_stream),
804 bt_stream_get_name(ir_stream),
805 bt_trace_get_name(
806 bt_stream_borrow_trace_const(ir_stream)),
807 stream->trace->path->str, stream->file_name->str);
808 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
809 goto end;
810 }
811
812 /*
813 * If we're currently in an opened packet (got a packet
814 * beginning message, but no packet end message yet), we do not
815 * support having a discarded events message with a time range
816 * because we require that the discarded events message's time
817 * range go from a packet's end time to the next packet's end
818 * time.
819 */
820 if (stream->packet_state.is_open &&
821 stream->sc->discarded_events_has_ts) {
822 BT_COMP_LOGE("Unsupported discarded events message with "
823 "default clock snapshots occuring within a packet: "
824 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
825 "trace-name=\"%s\", path=\"%s/%s\"",
826 bt_stream_get_id(ir_stream),
827 bt_stream_get_name(ir_stream),
828 bt_trace_get_name(
829 bt_stream_borrow_trace_const(ir_stream)),
830 stream->trace->path->str, stream->file_name->str);
831 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
832 goto end;
833 }
834
835 if (stream->sc->discarded_events_has_ts) {
836 /*
837 * Make the stream's state be in the time range of a
838 * discarded events message since we have the message's
839 * time range (`stream->sc->discarded_events_has_ts`).
840 */
841 stream->discarded_events_state.in_range = true;
842
843 /*
844 * The clock snapshot values will be validated when
845 * handling the next packet beginning and end messages
846 * (next calls to handle_packet_beginning_msg() and
847 * handle_packet_end_msg()).
848 */
849 cs = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
850 msg);
851 BT_ASSERT(cs);
852 stream->discarded_events_state.beginning_cs =
853 bt_clock_snapshot_get_value(cs);
854 cs = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(
855 msg);
856 BT_ASSERT(cs);
857 stream->discarded_events_state.end_cs = bt_clock_snapshot_get_value(cs);
858 }
859
860 avail = bt_message_discarded_events_get_count(msg, &count);
861 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
862 /*
863 * There's no specific count of discarded events: set it
864 * to 1 so that we know that we at least discarded
865 * something.
866 */
867 count = 1;
868 }
869
870 stream->packet_state.discarded_events_counter += count;
871
872 end:
873 return status;
874 }
875
876 static inline
877 bt_component_class_sink_consume_method_status handle_discarded_packets_msg(
878 struct fs_sink_comp *fs_sink, const bt_message *msg)
879 {
880 bt_component_class_sink_consume_method_status status =
881 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
882 const bt_stream *ir_stream =
883 bt_message_discarded_packets_borrow_stream_const(msg);
884 struct fs_sink_stream *stream;
885 const bt_clock_snapshot *cs = NULL;
886 bt_property_availability avail;
887 uint64_t count;
888
889 stream = borrow_stream(fs_sink, ir_stream);
890 if (!stream) {
891 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
892 goto end;
893 }
894
895 if (fs_sink->ignore_discarded_packets) {
896 BT_COMP_LOGI("Ignoring discarded packets message: "
897 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
898 "trace-name=\"%s\", path=\"%s/%s\"",
899 bt_stream_get_id(ir_stream),
900 bt_stream_get_name(ir_stream),
901 bt_trace_get_name(
902 bt_stream_borrow_trace_const(ir_stream)),
903 stream->trace->path->str, stream->file_name->str);
904 goto end;
905 }
906
907 if (stream->discarded_packets_state.in_range) {
908 BT_COMP_LOGE("Unsupported contiguous discarded packets message: "
909 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
910 "trace-name=\"%s\", path=\"%s/%s\"",
911 bt_stream_get_id(ir_stream),
912 bt_stream_get_name(ir_stream),
913 bt_trace_get_name(
914 bt_stream_borrow_trace_const(ir_stream)),
915 stream->trace->path->str, stream->file_name->str);
916 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
917 goto end;
918 }
919
920 /*
921 * Discarded packets messages are guaranteed to occur between
922 * packets.
923 */
924 BT_ASSERT(!stream->packet_state.is_open);
925
926 if (stream->sc->discarded_packets_has_ts) {
927 /*
928 * Make the stream's state be in the time range of a
929 * discarded packets message since we have the message's
930 * time range (`stream->sc->discarded_packets_has_ts`).
931 */
932 stream->discarded_packets_state.in_range = true;
933
934 /*
935 * The clock snapshot values will be validated when
936 * handling the next packet beginning message (next call
937 * to handle_packet_beginning_msg()).
938 */
939 cs = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
940 msg);
941 BT_ASSERT(cs);
942 stream->discarded_packets_state.beginning_cs =
943 bt_clock_snapshot_get_value(cs);
944 cs = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(
945 msg);
946 BT_ASSERT(cs);
947 stream->discarded_packets_state.end_cs =
948 bt_clock_snapshot_get_value(cs);
949 }
950
951 avail = bt_message_discarded_packets_get_count(msg, &count);
952 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
953 /*
954 * There's no specific count of discarded packets: set
955 * it to 1 so that we know that we at least discarded
956 * something.
957 */
958 count = 1;
959 }
960
961 stream->packet_state.seq_num += count;
962
963 end:
964 return status;
965 }
966
967 static inline
968 void put_messages(bt_message_array_const msgs, uint64_t count)
969 {
970 uint64_t i;
971
972 for (i = 0; i < count; i++) {
973 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
974 }
975 }
976
977 BT_HIDDEN
978 bt_component_class_sink_consume_method_status ctf_fs_sink_consume(
979 bt_self_component_sink *self_comp)
980 {
981 bt_component_class_sink_consume_method_status status =
982 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
983 struct fs_sink_comp *fs_sink;
984 bt_message_iterator_next_status next_status;
985 uint64_t msg_count = 0;
986 bt_message_array_const msgs;
987
988 fs_sink = bt_self_component_get_data(
989 bt_self_component_sink_as_self_component(self_comp));
990 BT_ASSERT(fs_sink);
991 BT_ASSERT(fs_sink->upstream_iter);
992
993 /* Consume messages */
994 next_status = bt_self_component_port_input_message_iterator_next(
995 fs_sink->upstream_iter, &msgs, &msg_count);
996 if (next_status < 0) {
997 status = (int) next_status;
998 goto end;
999 }
1000
1001 switch (next_status) {
1002 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK:
1003 {
1004 uint64_t i;
1005
1006 for (i = 0; i < msg_count; i++) {
1007 const bt_message *msg = msgs[i];
1008
1009 BT_ASSERT(msg);
1010
1011 switch (bt_message_get_type(msg)) {
1012 case BT_MESSAGE_TYPE_EVENT:
1013 status = handle_event_msg(fs_sink, msg);
1014 break;
1015 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
1016 status = handle_packet_beginning_msg(
1017 fs_sink, msg);
1018 break;
1019 case BT_MESSAGE_TYPE_PACKET_END:
1020 status = handle_packet_end_msg(
1021 fs_sink, msg);
1022 break;
1023 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
1024 /* Ignore */
1025 BT_COMP_LOGD_STR("Ignoring message iterator inactivity message.");
1026 break;
1027 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1028 status = handle_stream_beginning_msg(
1029 fs_sink, msg);
1030 break;
1031 case BT_MESSAGE_TYPE_STREAM_END:
1032 status = handle_stream_end_msg(
1033 fs_sink, msg);
1034 break;
1035 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1036 status = handle_discarded_events_msg(
1037 fs_sink, msg);
1038 break;
1039 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1040 status = handle_discarded_packets_msg(
1041 fs_sink, msg);
1042 break;
1043 default:
1044 abort();
1045 }
1046
1047 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
1048
1049 if (status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK) {
1050 BT_COMP_LOGE("Failed to handle message: "
1051 "generated CTF traces could be incomplete: "
1052 "output-dir-path=\"%s\"",
1053 fs_sink->output_dir_path->str);
1054 goto error;
1055 }
1056 }
1057
1058 break;
1059 }
1060 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN:
1061 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN;
1062 break;
1063 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END:
1064 /* TODO: Finalize all traces (should already be done?) */
1065 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END;
1066 break;
1067 case BT_MESSAGE_ITERATOR_NEXT_STATUS_MEMORY_ERROR:
1068 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_MEMORY_ERROR;
1069 break;
1070 case BT_MESSAGE_ITERATOR_NEXT_STATUS_ERROR:
1071 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_MEMORY_ERROR;
1072 break;
1073 default:
1074 break;
1075 }
1076
1077 goto end;
1078
1079 error:
1080 BT_ASSERT(status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK);
1081 put_messages(msgs, msg_count);
1082
1083 end:
1084 return status;
1085 }
1086
1087 BT_HIDDEN
1088 bt_component_class_sink_graph_is_configured_method_status
1089 ctf_fs_sink_graph_is_configured(
1090 bt_self_component_sink *self_comp)
1091 {
1092 bt_component_class_sink_graph_is_configured_method_status status;
1093 bt_self_component_port_input_message_iterator_create_from_sink_component_status
1094 msg_iter_status;
1095 struct fs_sink_comp *fs_sink = bt_self_component_get_data(
1096 bt_self_component_sink_as_self_component(self_comp));
1097
1098 msg_iter_status =
1099 bt_self_component_port_input_message_iterator_create_from_sink_component(
1100 self_comp,
1101 bt_self_component_sink_borrow_input_port_by_name(
1102 self_comp, in_port_name), &fs_sink->upstream_iter);
1103 if (msg_iter_status != BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_CREATE_FROM_SINK_COMPONENT_STATUS_OK) {
1104 status = (int) msg_iter_status;
1105 goto end;
1106 }
1107
1108 status = BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK;
1109 end:
1110 return status;
1111 }
1112
1113 BT_HIDDEN
1114 void ctf_fs_sink_finalize(bt_self_component_sink *self_comp)
1115 {
1116 struct fs_sink_comp *fs_sink = bt_self_component_get_data(
1117 bt_self_component_sink_as_self_component(self_comp));
1118
1119 destroy_fs_sink_comp(fs_sink);
1120 }
This page took 0.057233 seconds and 3 git commands to generate.