sink.ctf.fs: honor component's initial log level
[babeltrace.git] / src / plugins / ctf / fs-sink / fs-sink.c
1 /*
2 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 * SOFTWARE.
21 */
22
23 #define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level)
24 #define BT_LOG_TAG "PLUGIN/SINK.CTF.FS"
25 #include "logging/log.h"
26
27 #include <babeltrace2/babeltrace.h>
28 #include <stdio.h>
29 #include <stdbool.h>
30 #include <glib.h>
31 #include "common/assert.h"
32 #include "ctfser/ctfser.h"
33
34 #include "fs-sink.h"
35 #include "fs-sink-trace.h"
36 #include "fs-sink-stream.h"
37 #include "fs-sink-ctf-meta.h"
38 #include "translate-trace-ir-to-ctf-ir.h"
39 #include "translate-ctf-ir-to-tsdl.h"
40
41 static
42 const char * const in_port_name = "in";
43
44 static
45 bt_self_component_status ensure_output_dir_exists(
46 struct fs_sink_comp *fs_sink)
47 {
48 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
49 int ret;
50
51 ret = g_mkdir_with_parents(fs_sink->output_dir_path->str, 0755);
52 if (ret) {
53 BT_LOGE_ERRNO("Cannot create directories for output directory",
54 ": output-dir-path=\"%s\"",
55 fs_sink->output_dir_path->str);
56 status = BT_SELF_COMPONENT_STATUS_ERROR;
57 goto end;
58 }
59
60 end:
61 return status;
62 }
63
64 static
65 bt_self_component_status configure_component(struct fs_sink_comp *fs_sink,
66 const bt_value *params)
67 {
68 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
69 const bt_value *value;
70
71 value = bt_value_map_borrow_entry_value_const(params, "path");
72 if (!value) {
73 BT_LOGE_STR("Missing mandatory `path` parameter.");
74 status = BT_SELF_COMPONENT_STATUS_ERROR;
75 goto end;
76 }
77
78 if (!bt_value_is_string(value)) {
79 BT_LOGE_STR("`path` parameter: expecting a string.");
80 status = BT_SELF_COMPONENT_STATUS_ERROR;
81 goto end;
82 }
83
84 g_string_assign(fs_sink->output_dir_path,
85 bt_value_string_get(value));
86 value = bt_value_map_borrow_entry_value_const(params,
87 "assume-single-trace");
88 if (value) {
89 if (!bt_value_is_bool(value)) {
90 BT_LOGE_STR("`assume-single-trace` parameter: expecting a boolean.");
91 status = BT_SELF_COMPONENT_STATUS_ERROR;
92 goto end;
93 }
94
95 fs_sink->assume_single_trace = (bool) bt_value_bool_get(value);
96 }
97
98 value = bt_value_map_borrow_entry_value_const(params,
99 "ignore-discarded-events");
100 if (value) {
101 if (!bt_value_is_bool(value)) {
102 BT_LOGE_STR("`ignore-discarded-events` parameter: expecting a boolean.");
103 status = BT_SELF_COMPONENT_STATUS_ERROR;
104 goto end;
105 }
106
107 fs_sink->ignore_discarded_events =
108 (bool) bt_value_bool_get(value);
109 }
110
111 value = bt_value_map_borrow_entry_value_const(params,
112 "ignore-discarded-packets");
113 if (value) {
114 if (!bt_value_is_bool(value)) {
115 BT_LOGE_STR("`ignore-discarded-packets` parameter: expecting a boolean.");
116 status = BT_SELF_COMPONENT_STATUS_ERROR;
117 goto end;
118 }
119
120 fs_sink->ignore_discarded_packets =
121 (bool) bt_value_bool_get(value);
122 }
123
124 value = bt_value_map_borrow_entry_value_const(params,
125 "quiet");
126 if (value) {
127 if (!bt_value_is_bool(value)) {
128 BT_LOGE_STR("`quiet` parameter: expecting a boolean.");
129 status = BT_SELF_COMPONENT_STATUS_ERROR;
130 goto end;
131 }
132
133 fs_sink->quiet = (bool) bt_value_bool_get(value);
134 }
135
136 end:
137 return status;
138 }
139
140 static
141 void destroy_fs_sink_comp(struct fs_sink_comp *fs_sink)
142 {
143 if (!fs_sink) {
144 goto end;
145 }
146
147 if (fs_sink->output_dir_path) {
148 g_string_free(fs_sink->output_dir_path, TRUE);
149 fs_sink->output_dir_path = NULL;
150 }
151
152 if (fs_sink->traces) {
153 g_hash_table_destroy(fs_sink->traces);
154 fs_sink->traces = NULL;
155 }
156
157 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(
158 fs_sink->upstream_iter);
159 g_free(fs_sink);
160
161 end:
162 return;
163 }
164
165 BT_HIDDEN
166 bt_self_component_status ctf_fs_sink_init(
167 bt_self_component_sink *self_comp_sink, const bt_value *params,
168 void *init_method_data)
169 {
170 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
171 struct fs_sink_comp *fs_sink = NULL;
172 bt_self_component *self_comp =
173 bt_self_component_sink_as_self_component(self_comp_sink);
174 bt_logging_level log_level = bt_component_get_logging_level(
175 bt_self_component_as_component(self_comp));
176
177 fs_sink = g_new0(struct fs_sink_comp, 1);
178 if (!fs_sink) {
179 BT_LOG_WRITE_CUR_LVL(BT_LOG_ERROR, log_level, BT_LOG_TAG,
180 "Failed to allocate one CTF FS sink structure.");
181 status = BT_SELF_COMPONENT_STATUS_NOMEM;
182 goto end;
183 }
184
185 fs_sink->log_level = log_level;
186 fs_sink->output_dir_path = g_string_new(NULL);
187 fs_sink->self_comp = self_comp_sink;
188 status = configure_component(fs_sink, params);
189 if (status != BT_SELF_COMPONENT_STATUS_OK) {
190 /* configure_component() logs errors */
191 goto end;
192 }
193
194 if (fs_sink->assume_single_trace &&
195 g_file_test(fs_sink->output_dir_path->str,
196 G_FILE_TEST_EXISTS)) {
197 BT_LOGE("Single trace mode, but output path exists: "
198 "output-path=\"%s\"", fs_sink->output_dir_path->str);
199 status = BT_SELF_COMPONENT_STATUS_ERROR;
200 goto end;
201 }
202
203 status = ensure_output_dir_exists(fs_sink);
204 if (status != BT_SELF_COMPONENT_STATUS_OK) {
205 /* ensure_output_dir_exists() logs errors */
206 goto end;
207 }
208
209 fs_sink->traces = g_hash_table_new_full(g_direct_hash, g_direct_equal,
210 NULL, (GDestroyNotify) fs_sink_trace_destroy);
211 if (!fs_sink->traces) {
212 BT_LOGE_STR("Failed to allocate one GHashTable.");
213 status = BT_SELF_COMPONENT_STATUS_NOMEM;
214 goto end;
215 }
216
217 status = bt_self_component_sink_add_input_port(self_comp_sink,
218 in_port_name, NULL, NULL);
219 if (status != BT_SELF_COMPONENT_STATUS_OK) {
220 goto end;
221 }
222
223 bt_self_component_set_data(self_comp, fs_sink);
224
225 end:
226 if (status != BT_SELF_COMPONENT_STATUS_OK) {
227 destroy_fs_sink_comp(fs_sink);
228 }
229
230 return status;
231 }
232
233 static inline
234 struct fs_sink_stream *borrow_stream(struct fs_sink_comp *fs_sink,
235 const bt_stream *ir_stream)
236 {
237 const bt_trace *ir_trace = bt_stream_borrow_trace_const(ir_stream);
238 struct fs_sink_trace *trace;
239 struct fs_sink_stream *stream = NULL;
240
241 trace = g_hash_table_lookup(fs_sink->traces, ir_trace);
242 if (G_UNLIKELY(!trace)) {
243 if (fs_sink->assume_single_trace &&
244 g_hash_table_size(fs_sink->traces) > 0) {
245 BT_LOGE("Single trace mode, but getting more than one trace: "
246 "stream-name=\"%s\"",
247 bt_stream_get_name(ir_stream));
248 goto end;
249 }
250
251 trace = fs_sink_trace_create(fs_sink, ir_trace);
252 if (!trace) {
253 goto end;
254 }
255 }
256
257 stream = g_hash_table_lookup(trace->streams, ir_stream);
258 if (G_UNLIKELY(!stream)) {
259 stream = fs_sink_stream_create(trace, ir_stream);
260 if (!stream) {
261 goto end;
262 }
263 }
264
265 end:
266 return stream;
267 }
268
269 static inline
270 bt_self_component_status handle_event_msg(struct fs_sink_comp *fs_sink,
271 const bt_message *msg)
272 {
273 int ret;
274 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
275 const bt_event *ir_event = bt_message_event_borrow_event_const(msg);
276 const bt_stream *ir_stream = bt_event_borrow_stream_const(ir_event);
277 struct fs_sink_stream *stream;
278 struct fs_sink_ctf_event_class *ec = NULL;
279 const bt_clock_snapshot *cs = NULL;
280
281 stream = borrow_stream(fs_sink, ir_stream);
282 if (G_UNLIKELY(!stream)) {
283 status = BT_SELF_COMPONENT_STATUS_ERROR;
284 goto end;
285 }
286
287 ret = try_translate_event_class_trace_ir_to_ctf_ir(stream->sc,
288 bt_event_borrow_class_const(ir_event), &ec,
289 fs_sink->log_level);
290 if (ret) {
291 status = BT_SELF_COMPONENT_STATUS_ERROR;
292 goto end;
293 }
294
295 BT_ASSERT(ec);
296
297 if (stream->sc->default_clock_class) {
298 cs = bt_message_event_borrow_default_clock_snapshot_const(
299 msg);
300 }
301
302 ret = fs_sink_stream_write_event(stream, cs, ir_event, ec);
303 if (G_UNLIKELY(ret)) {
304 status = BT_SELF_COMPONENT_STATUS_ERROR;
305 goto end;
306 }
307
308 end:
309 return status;
310 }
311
312 static inline
313 bt_self_component_status handle_packet_beginning_msg(
314 struct fs_sink_comp *fs_sink, const bt_message *msg)
315 {
316 int ret;
317 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
318 const bt_packet *ir_packet =
319 bt_message_packet_beginning_borrow_packet_const(msg);
320 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
321 struct fs_sink_stream *stream;
322 const bt_clock_snapshot *cs = NULL;
323
324 stream = borrow_stream(fs_sink, ir_stream);
325 if (G_UNLIKELY(!stream)) {
326 status = BT_SELF_COMPONENT_STATUS_ERROR;
327 goto end;
328 }
329
330 if (stream->sc->packets_have_ts_begin) {
331 cs = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
332 msg);
333 BT_ASSERT(cs);
334 }
335
336 /*
337 * If we previously received a discarded events message with
338 * a time range, make sure that its beginning time matches what's
339 * expected for CTF 1.8, that is:
340 *
341 * * Its beginning time is the previous packet's end
342 * time (or the current packet's beginning time if
343 * this is the first packet).
344 *
345 * We check this here instead of in handle_packet_end_msg()
346 * because we want to catch any incompatible message as early as
347 * possible to report the error.
348 *
349 * Validation of the discarded events message's end time is
350 * performed in handle_packet_end_msg().
351 */
352 if (stream->discarded_events_state.in_range) {
353 uint64_t expected_cs;
354
355 /*
356 * `stream->discarded_events_state.in_range` is only set
357 * when the stream class's discarded events have a time
358 * range.
359 *
360 * It is required that the packet beginning and end
361 * messages for this stream class have times when
362 * discarded events have a time range.
363 */
364 BT_ASSERT(stream->sc->discarded_events_has_ts);
365 BT_ASSERT(stream->sc->packets_have_ts_begin);
366 BT_ASSERT(stream->sc->packets_have_ts_end);
367
368 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
369 /* We're opening the first packet */
370 expected_cs = bt_clock_snapshot_get_value(cs);
371 } else {
372 expected_cs = stream->prev_packet_state.end_cs;
373 }
374
375 if (stream->discarded_events_state.beginning_cs !=
376 expected_cs) {
377 BT_LOGE("Incompatible discarded events message: "
378 "unexpected beginning time: "
379 "beginning-cs-val=%" PRIu64 ", "
380 "expected-beginning-cs-val=%" PRIu64 ", "
381 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
382 "trace-name=\"%s\", path=\"%s/%s\"",
383 stream->discarded_events_state.beginning_cs,
384 expected_cs,
385 bt_stream_get_id(ir_stream),
386 bt_stream_get_name(ir_stream),
387 bt_trace_get_name(
388 bt_stream_borrow_trace_const(ir_stream)),
389 stream->trace->path->str, stream->file_name->str);
390 status = BT_SELF_COMPONENT_STATUS_ERROR;
391 goto end;
392 }
393 }
394
395 /*
396 * If we previously received a discarded packets message with a
397 * time range, make sure that its beginning and end times match
398 * what's expected for CTF 1.8, that is:
399 *
400 * * Its beginning time is the previous packet's end time.
401 *
402 * * Its end time is the current packet's beginning time.
403 */
404 if (stream->discarded_packets_state.in_range) {
405 uint64_t expected_end_cs;
406
407 /*
408 * `stream->discarded_packets_state.in_range` is only
409 * set when the stream class's discarded packets have a
410 * time range.
411 *
412 * It is required that the packet beginning and end
413 * messages for this stream class have times when
414 * discarded packets have a time range.
415 */
416 BT_ASSERT(stream->sc->discarded_packets_has_ts);
417 BT_ASSERT(stream->sc->packets_have_ts_begin);
418 BT_ASSERT(stream->sc->packets_have_ts_end);
419
420 /*
421 * It is not supported to have a discarded packets
422 * message _before_ the first packet: we cannot validate
423 * that its beginning time is compatible with CTF 1.8 in
424 * this case.
425 */
426 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
427 BT_LOGE("Incompatible discarded packets message "
428 "occuring before the stream's first packet: "
429 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
430 "trace-name=\"%s\", path=\"%s/%s\"",
431 bt_stream_get_id(ir_stream),
432 bt_stream_get_name(ir_stream),
433 bt_trace_get_name(
434 bt_stream_borrow_trace_const(ir_stream)),
435 stream->trace->path->str, stream->file_name->str);
436 status = BT_SELF_COMPONENT_STATUS_ERROR;
437 goto end;
438 }
439
440 if (stream->discarded_packets_state.beginning_cs !=
441 stream->prev_packet_state.end_cs) {
442 BT_LOGE("Incompatible discarded packets message: "
443 "unexpected beginning time: "
444 "beginning-cs-val=%" PRIu64 ", "
445 "expected-beginning-cs-val=%" PRIu64 ", "
446 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
447 "trace-name=\"%s\", path=\"%s/%s\"",
448 stream->discarded_packets_state.beginning_cs,
449 stream->prev_packet_state.end_cs,
450 bt_stream_get_id(ir_stream),
451 bt_stream_get_name(ir_stream),
452 bt_trace_get_name(
453 bt_stream_borrow_trace_const(ir_stream)),
454 stream->trace->path->str, stream->file_name->str);
455 status = BT_SELF_COMPONENT_STATUS_ERROR;
456 goto end;
457 }
458
459 expected_end_cs = bt_clock_snapshot_get_value(cs);
460
461 if (stream->discarded_packets_state.end_cs !=
462 expected_end_cs) {
463 BT_LOGE("Incompatible discarded packets message: "
464 "unexpected end time: "
465 "end-cs-val=%" PRIu64 ", "
466 "expected-end-cs-val=%" PRIu64 ", "
467 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
468 "trace-name=\"%s\", path=\"%s/%s\"",
469 stream->discarded_packets_state.end_cs,
470 expected_end_cs,
471 bt_stream_get_id(ir_stream),
472 bt_stream_get_name(ir_stream),
473 bt_trace_get_name(
474 bt_stream_borrow_trace_const(ir_stream)),
475 stream->trace->path->str, stream->file_name->str);
476 status = BT_SELF_COMPONENT_STATUS_ERROR;
477 goto end;
478 }
479 }
480
481 /*
482 * We're not in a discarded packets time range anymore since we
483 * require that the discarded packets time ranges go from one
484 * packet's end time to the next packet's beginning time, and
485 * we're handling a packet beginning message here.
486 */
487 stream->discarded_packets_state.in_range = false;
488
489 ret = fs_sink_stream_open_packet(stream, cs, ir_packet);
490 if (ret) {
491 status = BT_SELF_COMPONENT_STATUS_ERROR;
492 goto end;
493 }
494
495 end:
496 return status;
497 }
498
499 static inline
500 bt_self_component_status handle_packet_end_msg(
501 struct fs_sink_comp *fs_sink, const bt_message *msg)
502 {
503 int ret;
504 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
505 const bt_packet *ir_packet =
506 bt_message_packet_end_borrow_packet_const(msg);
507 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
508 struct fs_sink_stream *stream;
509 const bt_clock_snapshot *cs = NULL;
510
511 stream = borrow_stream(fs_sink, ir_stream);
512 if (G_UNLIKELY(!stream)) {
513 status = BT_SELF_COMPONENT_STATUS_ERROR;
514 goto end;
515 }
516
517 if (stream->sc->packets_have_ts_end) {
518 cs = bt_message_packet_end_borrow_default_clock_snapshot_const(
519 msg);
520 BT_ASSERT(cs);
521 }
522
523 /*
524 * If we previously received a discarded events message with
525 * a time range, make sure that its end time matches what's
526 * expected for CTF 1.8, that is:
527 *
528 * * Its end time is the current packet's end time.
529 *
530 * Validation of the discarded events message's beginning time
531 * is performed in handle_packet_beginning_msg().
532 */
533 if (stream->discarded_events_state.in_range) {
534 uint64_t expected_cs;
535
536 /*
537 * `stream->discarded_events_state.in_range` is only set
538 * when the stream class's discarded events have a time
539 * range.
540 *
541 * It is required that the packet beginning and end
542 * messages for this stream class have times when
543 * discarded events have a time range.
544 */
545 BT_ASSERT(stream->sc->discarded_events_has_ts);
546 BT_ASSERT(stream->sc->packets_have_ts_begin);
547 BT_ASSERT(stream->sc->packets_have_ts_end);
548
549 expected_cs = bt_clock_snapshot_get_value(cs);
550
551 if (stream->discarded_events_state.end_cs != expected_cs) {
552 BT_LOGE("Incompatible discarded events message: "
553 "unexpected end time: "
554 "end-cs-val=%" PRIu64 ", "
555 "expected-end-cs-val=%" PRIu64 ", "
556 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
557 "trace-name=\"%s\", path=\"%s/%s\"",
558 stream->discarded_events_state.end_cs,
559 expected_cs,
560 bt_stream_get_id(ir_stream),
561 bt_stream_get_name(ir_stream),
562 bt_trace_get_name(
563 bt_stream_borrow_trace_const(ir_stream)),
564 stream->trace->path->str, stream->file_name->str);
565 status = BT_SELF_COMPONENT_STATUS_ERROR;
566 goto end;
567 }
568 }
569
570 ret = fs_sink_stream_close_packet(stream, cs);
571 if (ret) {
572 status = BT_SELF_COMPONENT_STATUS_ERROR;
573 goto end;
574 }
575
576 /*
577 * We're not in a discarded events time range anymore since we
578 * require that the discarded events time ranges go from one
579 * packet's end time to the next packet's end time, and we're
580 * handling a packet end message here.
581 */
582 stream->discarded_events_state.in_range = false;
583
584 end:
585 return status;
586 }
587
588 static inline
589 bt_self_component_status handle_stream_beginning_msg(
590 struct fs_sink_comp *fs_sink, const bt_message *msg)
591 {
592 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
593 const bt_stream *ir_stream =
594 bt_message_stream_beginning_borrow_stream_const(msg);
595 const bt_stream_class *ir_sc =
596 bt_stream_borrow_class_const(ir_stream);
597 struct fs_sink_stream *stream;
598 bool packets_have_beginning_end_cs =
599 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc) &&
600 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc);
601
602 /*
603 * Not supported: discarded events with default clock snapshots,
604 * but packet beginning/end without default clock snapshot.
605 */
606 if (!fs_sink->ignore_discarded_events &&
607 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc) &&
608 !packets_have_beginning_end_cs) {
609 BT_LOGE("Unsupported stream: discarded events have "
610 "default clock snapshots, but packets have no "
611 "beginning and/or end default clock snapshots: "
612 "stream-addr=%p, "
613 "stream-id=%" PRIu64 ", "
614 "stream-name=\"%s\"",
615 ir_stream, bt_stream_get_id(ir_stream),
616 bt_stream_get_name(ir_stream));
617 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
618 goto end;
619 }
620
621 /*
622 * Not supported: discarded packets with default clock
623 * snapshots, but packet beginning/end without default clock
624 * snapshot.
625 */
626 if (!fs_sink->ignore_discarded_packets &&
627 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc) &&
628 !packets_have_beginning_end_cs) {
629 BT_LOGE("Unsupported stream: discarded packets have "
630 "default clock snapshots, but packets have no "
631 "beginning and/or end default clock snapshots: "
632 "stream-addr=%p, "
633 "stream-id=%" PRIu64 ", "
634 "stream-name=\"%s\"",
635 ir_stream, bt_stream_get_id(ir_stream),
636 bt_stream_get_name(ir_stream));
637 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
638 goto end;
639 }
640
641 stream = borrow_stream(fs_sink, ir_stream);
642 if (!stream) {
643 status = BT_SELF_COMPONENT_STATUS_ERROR;
644 goto end;
645 }
646
647 BT_LOGI("Created new, empty stream file: "
648 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
649 "trace-name=\"%s\", path=\"%s/%s\"",
650 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
651 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
652 stream->trace->path->str, stream->file_name->str);
653
654 end:
655 return status;
656 }
657
658 static inline
659 bt_self_component_status handle_stream_end_msg(struct fs_sink_comp *fs_sink,
660 const bt_message *msg)
661 {
662 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
663 const bt_stream *ir_stream =
664 bt_message_stream_end_borrow_stream_const(msg);
665 struct fs_sink_stream *stream;
666
667 stream = borrow_stream(fs_sink, ir_stream);
668 if (!stream) {
669 status = BT_SELF_COMPONENT_STATUS_ERROR;
670 goto end;
671 }
672
673 BT_LOGI("Closing stream file: "
674 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
675 "trace-name=\"%s\", path=\"%s/%s\"",
676 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
677 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
678 stream->trace->path->str, stream->file_name->str);
679
680 /*
681 * This destroys the stream object and frees all its resources,
682 * closing the stream file.
683 */
684 g_hash_table_remove(stream->trace->streams, ir_stream);
685
686 end:
687 return status;
688 }
689
690 static inline
691 bt_self_component_status handle_discarded_events_msg(
692 struct fs_sink_comp *fs_sink, const bt_message *msg)
693 {
694 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
695 const bt_stream *ir_stream =
696 bt_message_discarded_events_borrow_stream_const(msg);
697 struct fs_sink_stream *stream;
698 const bt_clock_snapshot *cs = NULL;
699 bt_property_availability avail;
700 uint64_t count;
701
702 stream = borrow_stream(fs_sink, ir_stream);
703 if (!stream) {
704 status = BT_SELF_COMPONENT_STATUS_ERROR;
705 goto end;
706 }
707
708 if (fs_sink->ignore_discarded_events) {
709 BT_LOGI("Ignoring discarded events message: "
710 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
711 "trace-name=\"%s\", path=\"%s/%s\"",
712 bt_stream_get_id(ir_stream),
713 bt_stream_get_name(ir_stream),
714 bt_trace_get_name(
715 bt_stream_borrow_trace_const(ir_stream)),
716 stream->trace->path->str, stream->file_name->str);
717 goto end;
718 }
719
720 if (stream->discarded_events_state.in_range) {
721 BT_LOGE("Unsupported contiguous discarded events message: "
722 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
723 "trace-name=\"%s\", path=\"%s/%s\"",
724 bt_stream_get_id(ir_stream),
725 bt_stream_get_name(ir_stream),
726 bt_trace_get_name(
727 bt_stream_borrow_trace_const(ir_stream)),
728 stream->trace->path->str, stream->file_name->str);
729 status = BT_SELF_COMPONENT_STATUS_ERROR;
730 goto end;
731 }
732
733 /*
734 * If we're currently in an opened packet (got a packet
735 * beginning message, but no packet end message yet), we do not
736 * support having a discarded events message with a time range
737 * because we require that the discarded events message's time
738 * range go from a packet's end time to the next packet's end
739 * time.
740 */
741 if (stream->packet_state.is_open &&
742 stream->sc->discarded_events_has_ts) {
743 BT_LOGE("Unsupported discarded events message with "
744 "default clock snapshots occuring within a packet: "
745 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
746 "trace-name=\"%s\", path=\"%s/%s\"",
747 bt_stream_get_id(ir_stream),
748 bt_stream_get_name(ir_stream),
749 bt_trace_get_name(
750 bt_stream_borrow_trace_const(ir_stream)),
751 stream->trace->path->str, stream->file_name->str);
752 status = BT_SELF_COMPONENT_STATUS_ERROR;
753 goto end;
754 }
755
756 if (stream->sc->discarded_events_has_ts) {
757 /*
758 * Make the stream's state be in the time range of a
759 * discarded events message since we have the message's
760 * time range (`stream->sc->discarded_events_has_ts`).
761 */
762 stream->discarded_events_state.in_range = true;
763
764 /*
765 * The clock snapshot values will be validated when
766 * handling the next packet beginning and end messages
767 * (next calls to handle_packet_beginning_msg() and
768 * handle_packet_end_msg()).
769 */
770 cs = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
771 msg);
772 BT_ASSERT(cs);
773 stream->discarded_events_state.beginning_cs =
774 bt_clock_snapshot_get_value(cs);
775 cs = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(
776 msg);
777 BT_ASSERT(cs);
778 stream->discarded_events_state.end_cs = bt_clock_snapshot_get_value(cs);
779 }
780
781 avail = bt_message_discarded_events_get_count(msg, &count);
782 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
783 /*
784 * There's no specific count of discarded events: set it
785 * to 1 so that we know that we at least discarded
786 * something.
787 */
788 count = 1;
789 }
790
791 stream->packet_state.discarded_events_counter += count;
792
793 end:
794 return status;
795 }
796
797 static inline
798 bt_self_component_status handle_discarded_packets_msg(
799 struct fs_sink_comp *fs_sink, const bt_message *msg)
800 {
801 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
802 const bt_stream *ir_stream =
803 bt_message_discarded_packets_borrow_stream_const(msg);
804 struct fs_sink_stream *stream;
805 const bt_clock_snapshot *cs = NULL;
806 bt_property_availability avail;
807 uint64_t count;
808
809 stream = borrow_stream(fs_sink, ir_stream);
810 if (!stream) {
811 status = BT_SELF_COMPONENT_STATUS_ERROR;
812 goto end;
813 }
814
815 if (fs_sink->ignore_discarded_packets) {
816 BT_LOGI("Ignoring discarded packets message: "
817 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
818 "trace-name=\"%s\", path=\"%s/%s\"",
819 bt_stream_get_id(ir_stream),
820 bt_stream_get_name(ir_stream),
821 bt_trace_get_name(
822 bt_stream_borrow_trace_const(ir_stream)),
823 stream->trace->path->str, stream->file_name->str);
824 goto end;
825 }
826
827 if (stream->discarded_packets_state.in_range) {
828 BT_LOGE("Unsupported contiguous discarded packets message: "
829 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
830 "trace-name=\"%s\", path=\"%s/%s\"",
831 bt_stream_get_id(ir_stream),
832 bt_stream_get_name(ir_stream),
833 bt_trace_get_name(
834 bt_stream_borrow_trace_const(ir_stream)),
835 stream->trace->path->str, stream->file_name->str);
836 status = BT_SELF_COMPONENT_STATUS_ERROR;
837 goto end;
838 }
839
840 /*
841 * Discarded packets messages are guaranteed to occur between
842 * packets.
843 */
844 BT_ASSERT(!stream->packet_state.is_open);
845
846 if (stream->sc->discarded_packets_has_ts) {
847 /*
848 * Make the stream's state be in the time range of a
849 * discarded packets message since we have the message's
850 * time range (`stream->sc->discarded_packets_has_ts`).
851 */
852 stream->discarded_packets_state.in_range = true;
853
854 /*
855 * The clock snapshot values will be validated when
856 * handling the next packet beginning message (next call
857 * to handle_packet_beginning_msg()).
858 */
859 cs = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
860 msg);
861 BT_ASSERT(cs);
862 stream->discarded_packets_state.beginning_cs =
863 bt_clock_snapshot_get_value(cs);
864 cs = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(
865 msg);
866 BT_ASSERT(cs);
867 stream->discarded_packets_state.end_cs =
868 bt_clock_snapshot_get_value(cs);
869 }
870
871 avail = bt_message_discarded_packets_get_count(msg, &count);
872 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
873 /*
874 * There's no specific count of discarded packets: set
875 * it to 1 so that we know that we at least discarded
876 * something.
877 */
878 count = 1;
879 }
880
881 stream->packet_state.seq_num += count;
882
883 end:
884 return status;
885 }
886
887 static inline
888 void put_messages(bt_message_array_const msgs, uint64_t count)
889 {
890 uint64_t i;
891
892 for (i = 0; i < count; i++) {
893 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
894 }
895 }
896
897 BT_HIDDEN
898 bt_self_component_status ctf_fs_sink_consume(bt_self_component_sink *self_comp)
899 {
900 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
901 struct fs_sink_comp *fs_sink;
902 bt_message_iterator_status it_status;
903 uint64_t msg_count = 0;
904 bt_message_array_const msgs;
905
906 fs_sink = bt_self_component_get_data(
907 bt_self_component_sink_as_self_component(self_comp));
908 BT_ASSERT(fs_sink);
909 BT_ASSERT(fs_sink->upstream_iter);
910
911 /* Consume messages */
912 it_status = bt_self_component_port_input_message_iterator_next(
913 fs_sink->upstream_iter, &msgs, &msg_count);
914 if (it_status < 0) {
915 status = BT_SELF_COMPONENT_STATUS_ERROR;
916 goto end;
917 }
918
919 switch (it_status) {
920 case BT_MESSAGE_ITERATOR_STATUS_OK:
921 {
922 uint64_t i;
923
924 for (i = 0; i < msg_count; i++) {
925 const bt_message *msg = msgs[i];
926
927 BT_ASSERT(msg);
928
929 switch (bt_message_get_type(msg)) {
930 case BT_MESSAGE_TYPE_EVENT:
931 status = handle_event_msg(fs_sink, msg);
932 break;
933 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
934 status = handle_packet_beginning_msg(
935 fs_sink, msg);
936 break;
937 case BT_MESSAGE_TYPE_PACKET_END:
938 status = handle_packet_end_msg(
939 fs_sink, msg);
940 break;
941 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
942 /* Ignore */
943 BT_LOGD_STR("Ignoring message iterator inactivity message.");
944 break;
945 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
946 status = handle_stream_beginning_msg(
947 fs_sink, msg);
948 break;
949 case BT_MESSAGE_TYPE_STREAM_END:
950 status = handle_stream_end_msg(
951 fs_sink, msg);
952 break;
953 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
954 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
955 /* Not supported by CTF 1.8 */
956 BT_LOGD_STR("Ignoring stream activity message.");
957 break;
958 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
959 status = handle_discarded_events_msg(
960 fs_sink, msg);
961 break;
962 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
963 status = handle_discarded_packets_msg(
964 fs_sink, msg);
965 break;
966 default:
967 abort();
968 }
969
970 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
971
972 if (status != BT_SELF_COMPONENT_STATUS_OK) {
973 BT_LOGE("Failed to handle message: "
974 "generated CTF traces could be incomplete: "
975 "output-dir-path=\"%s\"",
976 fs_sink->output_dir_path->str);
977 goto error;
978 }
979 }
980
981 break;
982 }
983 case BT_MESSAGE_ITERATOR_STATUS_AGAIN:
984 status = BT_SELF_COMPONENT_STATUS_AGAIN;
985 break;
986 case BT_MESSAGE_ITERATOR_STATUS_END:
987 /* TODO: Finalize all traces (should already be done?) */
988 status = BT_SELF_COMPONENT_STATUS_END;
989 break;
990 case BT_MESSAGE_ITERATOR_STATUS_NOMEM:
991 status = BT_SELF_COMPONENT_STATUS_NOMEM;
992 break;
993 case BT_MESSAGE_ITERATOR_STATUS_ERROR:
994 status = BT_SELF_COMPONENT_STATUS_NOMEM;
995 break;
996 default:
997 break;
998 }
999
1000 goto end;
1001
1002 error:
1003 BT_ASSERT(status != BT_SELF_COMPONENT_STATUS_OK);
1004 put_messages(msgs, msg_count);
1005
1006 end:
1007 return status;
1008 }
1009
1010 BT_HIDDEN
1011 bt_self_component_status ctf_fs_sink_graph_is_configured(
1012 bt_self_component_sink *self_comp)
1013 {
1014 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
1015 struct fs_sink_comp *fs_sink = bt_self_component_get_data(
1016 bt_self_component_sink_as_self_component(self_comp));
1017
1018 fs_sink->upstream_iter =
1019 bt_self_component_port_input_message_iterator_create(
1020 bt_self_component_sink_borrow_input_port_by_name(
1021 self_comp, in_port_name));
1022 if (!fs_sink->upstream_iter) {
1023 status = BT_SELF_COMPONENT_STATUS_NOMEM;
1024 goto end;
1025 }
1026
1027 end:
1028 return status;
1029 }
1030
1031 BT_HIDDEN
1032 void ctf_fs_sink_finalize(bt_self_component_sink *self_comp)
1033 {
1034 struct fs_sink_comp *fs_sink = bt_self_component_get_data(
1035 bt_self_component_sink_as_self_component(self_comp));
1036
1037 destroy_fs_sink_comp(fs_sink);
1038 }
This page took 0.051184 seconds and 5 git commands to generate.