Logging: standardize logging tags
[babeltrace.git] / src / plugins / ctf / fs-sink / fs-sink.c
1 /*
2 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 * SOFTWARE.
21 */
22
23 #define BT_LOG_TAG "PLUGIN/SINK.CTF.FS"
24 #include "logging.h"
25
26 #include <babeltrace2/babeltrace.h>
27 #include <stdio.h>
28 #include <stdbool.h>
29 #include <glib.h>
30 #include "common/assert.h"
31 #include "ctfser/ctfser.h"
32
33 #include "fs-sink.h"
34 #include "fs-sink-trace.h"
35 #include "fs-sink-stream.h"
36 #include "fs-sink-ctf-meta.h"
37 #include "translate-trace-ir-to-ctf-ir.h"
38 #include "translate-ctf-ir-to-tsdl.h"
39
40 static
41 const char * const in_port_name = "in";
42
43 static
44 bt_self_component_status ensure_output_dir_exists(
45 struct fs_sink_comp *fs_sink)
46 {
47 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
48 int ret;
49
50 ret = g_mkdir_with_parents(fs_sink->output_dir_path->str, 0755);
51 if (ret) {
52 BT_LOGE_ERRNO("Cannot create directories for output directory",
53 ": output-dir-path=\"%s\"",
54 fs_sink->output_dir_path->str);
55 status = BT_SELF_COMPONENT_STATUS_ERROR;
56 goto end;
57 }
58
59 end:
60 return status;
61 }
62
63 static
64 bt_self_component_status configure_component(struct fs_sink_comp *fs_sink,
65 const bt_value *params)
66 {
67 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
68 const bt_value *value;
69
70 value = bt_value_map_borrow_entry_value_const(params, "path");
71 if (!value) {
72 BT_LOGE_STR("Missing mandatory `path` parameter.");
73 status = BT_SELF_COMPONENT_STATUS_ERROR;
74 goto end;
75 }
76
77 if (!bt_value_is_string(value)) {
78 BT_LOGE_STR("`path` parameter: expecting a string.");
79 status = BT_SELF_COMPONENT_STATUS_ERROR;
80 goto end;
81 }
82
83 g_string_assign(fs_sink->output_dir_path,
84 bt_value_string_get(value));
85 value = bt_value_map_borrow_entry_value_const(params,
86 "assume-single-trace");
87 if (value) {
88 if (!bt_value_is_bool(value)) {
89 BT_LOGE_STR("`assume-single-trace` parameter: expecting a boolean.");
90 status = BT_SELF_COMPONENT_STATUS_ERROR;
91 goto end;
92 }
93
94 fs_sink->assume_single_trace = (bool) bt_value_bool_get(value);
95 }
96
97 value = bt_value_map_borrow_entry_value_const(params,
98 "ignore-discarded-events");
99 if (value) {
100 if (!bt_value_is_bool(value)) {
101 BT_LOGE_STR("`ignore-discarded-events` parameter: expecting a boolean.");
102 status = BT_SELF_COMPONENT_STATUS_ERROR;
103 goto end;
104 }
105
106 fs_sink->ignore_discarded_events =
107 (bool) bt_value_bool_get(value);
108 }
109
110 value = bt_value_map_borrow_entry_value_const(params,
111 "ignore-discarded-packets");
112 if (value) {
113 if (!bt_value_is_bool(value)) {
114 BT_LOGE_STR("`ignore-discarded-packets` parameter: expecting a boolean.");
115 status = BT_SELF_COMPONENT_STATUS_ERROR;
116 goto end;
117 }
118
119 fs_sink->ignore_discarded_packets =
120 (bool) bt_value_bool_get(value);
121 }
122
123 value = bt_value_map_borrow_entry_value_const(params,
124 "quiet");
125 if (value) {
126 if (!bt_value_is_bool(value)) {
127 BT_LOGE_STR("`quiet` parameter: expecting a boolean.");
128 status = BT_SELF_COMPONENT_STATUS_ERROR;
129 goto end;
130 }
131
132 fs_sink->quiet = (bool) bt_value_bool_get(value);
133 }
134
135 end:
136 return status;
137 }
138
139 static
140 void destroy_fs_sink_comp(struct fs_sink_comp *fs_sink)
141 {
142 if (!fs_sink) {
143 goto end;
144 }
145
146 if (fs_sink->output_dir_path) {
147 g_string_free(fs_sink->output_dir_path, TRUE);
148 fs_sink->output_dir_path = NULL;
149 }
150
151 if (fs_sink->traces) {
152 g_hash_table_destroy(fs_sink->traces);
153 fs_sink->traces = NULL;
154 }
155
156 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(
157 fs_sink->upstream_iter);
158 g_free(fs_sink);
159
160 end:
161 return;
162 }
163
164 BT_HIDDEN
165 bt_self_component_status ctf_fs_sink_init(
166 bt_self_component_sink *self_comp, const bt_value *params,
167 void *init_method_data)
168 {
169 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
170 struct fs_sink_comp *fs_sink = NULL;
171
172 fs_sink = g_new0(struct fs_sink_comp, 1);
173 if (!fs_sink) {
174 BT_LOGE_STR("Failed to allocate one CTF FS sink structure.");
175 status = BT_SELF_COMPONENT_STATUS_NOMEM;
176 goto end;
177 }
178
179 fs_sink->output_dir_path = g_string_new(NULL);
180 fs_sink->self_comp = self_comp;
181 status = configure_component(fs_sink, params);
182 if (status != BT_SELF_COMPONENT_STATUS_OK) {
183 /* configure_component() logs errors */
184 goto end;
185 }
186
187 if (fs_sink->assume_single_trace &&
188 g_file_test(fs_sink->output_dir_path->str,
189 G_FILE_TEST_EXISTS)) {
190 BT_LOGE("Single trace mode, but output path exists: "
191 "output-path=\"%s\"", fs_sink->output_dir_path->str);
192 status = BT_SELF_COMPONENT_STATUS_ERROR;
193 goto end;
194 }
195
196 status = ensure_output_dir_exists(fs_sink);
197 if (status != BT_SELF_COMPONENT_STATUS_OK) {
198 /* ensure_output_dir_exists() logs errors */
199 goto end;
200 }
201
202 fs_sink->traces = g_hash_table_new_full(g_direct_hash, g_direct_equal,
203 NULL, (GDestroyNotify) fs_sink_trace_destroy);
204 if (!fs_sink->traces) {
205 BT_LOGE_STR("Failed to allocate one GHashTable.");
206 status = BT_SELF_COMPONENT_STATUS_NOMEM;
207 goto end;
208 }
209
210 status = bt_self_component_sink_add_input_port(self_comp, in_port_name,
211 NULL, NULL);
212 if (status != BT_SELF_COMPONENT_STATUS_OK) {
213 goto end;
214 }
215
216 bt_self_component_set_data(
217 bt_self_component_sink_as_self_component(self_comp), fs_sink);
218
219 end:
220 if (status != BT_SELF_COMPONENT_STATUS_OK) {
221 destroy_fs_sink_comp(fs_sink);
222 }
223
224 return status;
225 }
226
227 static inline
228 struct fs_sink_stream *borrow_stream(struct fs_sink_comp *fs_sink,
229 const bt_stream *ir_stream)
230 {
231 const bt_trace *ir_trace = bt_stream_borrow_trace_const(ir_stream);
232 struct fs_sink_trace *trace;
233 struct fs_sink_stream *stream = NULL;
234
235 trace = g_hash_table_lookup(fs_sink->traces, ir_trace);
236 if (G_UNLIKELY(!trace)) {
237 if (fs_sink->assume_single_trace &&
238 g_hash_table_size(fs_sink->traces) > 0) {
239 BT_LOGE("Single trace mode, but getting more than one trace: "
240 "stream-name=\"%s\"",
241 bt_stream_get_name(ir_stream));
242 goto end;
243 }
244
245 trace = fs_sink_trace_create(fs_sink, ir_trace);
246 if (!trace) {
247 goto end;
248 }
249 }
250
251 stream = g_hash_table_lookup(trace->streams, ir_stream);
252 if (G_UNLIKELY(!stream)) {
253 stream = fs_sink_stream_create(trace, ir_stream);
254 if (!stream) {
255 goto end;
256 }
257 }
258
259 end:
260 return stream;
261 }
262
263 static inline
264 bt_self_component_status handle_event_msg(struct fs_sink_comp *fs_sink,
265 const bt_message *msg)
266 {
267 int ret;
268 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
269 const bt_event *ir_event = bt_message_event_borrow_event_const(msg);
270 const bt_stream *ir_stream = bt_event_borrow_stream_const(ir_event);
271 struct fs_sink_stream *stream;
272 struct fs_sink_ctf_event_class *ec = NULL;
273 const bt_clock_snapshot *cs = NULL;
274
275 stream = borrow_stream(fs_sink, ir_stream);
276 if (G_UNLIKELY(!stream)) {
277 status = BT_SELF_COMPONENT_STATUS_ERROR;
278 goto end;
279 }
280
281 ret = try_translate_event_class_trace_ir_to_ctf_ir(stream->sc,
282 bt_event_borrow_class_const(ir_event), &ec);
283 if (ret) {
284 status = BT_SELF_COMPONENT_STATUS_ERROR;
285 goto end;
286 }
287
288 BT_ASSERT(ec);
289
290 if (stream->sc->default_clock_class) {
291 cs = bt_message_event_borrow_default_clock_snapshot_const(
292 msg);
293 }
294
295 ret = fs_sink_stream_write_event(stream, cs, ir_event, ec);
296 if (G_UNLIKELY(ret)) {
297 status = BT_SELF_COMPONENT_STATUS_ERROR;
298 goto end;
299 }
300
301 end:
302 return status;
303 }
304
305 static inline
306 bt_self_component_status handle_packet_beginning_msg(
307 struct fs_sink_comp *fs_sink, const bt_message *msg)
308 {
309 int ret;
310 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
311 const bt_packet *ir_packet =
312 bt_message_packet_beginning_borrow_packet_const(msg);
313 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
314 struct fs_sink_stream *stream;
315 const bt_clock_snapshot *cs = NULL;
316
317 stream = borrow_stream(fs_sink, ir_stream);
318 if (G_UNLIKELY(!stream)) {
319 status = BT_SELF_COMPONENT_STATUS_ERROR;
320 goto end;
321 }
322
323 if (stream->sc->packets_have_ts_begin) {
324 cs = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
325 msg);
326 BT_ASSERT(cs);
327 }
328
329 /*
330 * If we previously received a discarded events message with
331 * a time range, make sure that its beginning time matches what's
332 * expected for CTF 1.8, that is:
333 *
334 * * Its beginning time is the previous packet's end
335 * time (or the current packet's beginning time if
336 * this is the first packet).
337 *
338 * We check this here instead of in handle_packet_end_msg()
339 * because we want to catch any incompatible message as early as
340 * possible to report the error.
341 *
342 * Validation of the discarded events message's end time is
343 * performed in handle_packet_end_msg().
344 */
345 if (stream->discarded_events_state.in_range) {
346 uint64_t expected_cs;
347
348 /*
349 * `stream->discarded_events_state.in_range` is only set
350 * when the stream class's discarded events have a time
351 * range.
352 *
353 * It is required that the packet beginning and end
354 * messages for this stream class have times when
355 * discarded events have a time range.
356 */
357 BT_ASSERT(stream->sc->discarded_events_has_ts);
358 BT_ASSERT(stream->sc->packets_have_ts_begin);
359 BT_ASSERT(stream->sc->packets_have_ts_end);
360
361 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
362 /* We're opening the first packet */
363 expected_cs = bt_clock_snapshot_get_value(cs);
364 } else {
365 expected_cs = stream->prev_packet_state.end_cs;
366 }
367
368 if (stream->discarded_events_state.beginning_cs !=
369 expected_cs) {
370 BT_LOGE("Incompatible discarded events message: "
371 "unexpected beginning time: "
372 "beginning-cs-val=%" PRIu64 ", "
373 "expected-beginning-cs-val=%" PRIu64 ", "
374 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
375 "trace-name=\"%s\", path=\"%s/%s\"",
376 stream->discarded_events_state.beginning_cs,
377 expected_cs,
378 bt_stream_get_id(ir_stream),
379 bt_stream_get_name(ir_stream),
380 bt_trace_get_name(
381 bt_stream_borrow_trace_const(ir_stream)),
382 stream->trace->path->str, stream->file_name->str);
383 status = BT_SELF_COMPONENT_STATUS_ERROR;
384 goto end;
385 }
386 }
387
388 /*
389 * If we previously received a discarded packets message with a
390 * time range, make sure that its beginning and end times match
391 * what's expected for CTF 1.8, that is:
392 *
393 * * Its beginning time is the previous packet's end time.
394 *
395 * * Its end time is the current packet's beginning time.
396 */
397 if (stream->discarded_packets_state.in_range) {
398 uint64_t expected_end_cs;
399
400 /*
401 * `stream->discarded_packets_state.in_range` is only
402 * set when the stream class's discarded packets have a
403 * time range.
404 *
405 * It is required that the packet beginning and end
406 * messages for this stream class have times when
407 * discarded packets have a time range.
408 */
409 BT_ASSERT(stream->sc->discarded_packets_has_ts);
410 BT_ASSERT(stream->sc->packets_have_ts_begin);
411 BT_ASSERT(stream->sc->packets_have_ts_end);
412
413 /*
414 * It is not supported to have a discarded packets
415 * message _before_ the first packet: we cannot validate
416 * that its beginning time is compatible with CTF 1.8 in
417 * this case.
418 */
419 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
420 BT_LOGE("Incompatible discarded packets message "
421 "occuring before the stream's first packet: "
422 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
423 "trace-name=\"%s\", path=\"%s/%s\"",
424 bt_stream_get_id(ir_stream),
425 bt_stream_get_name(ir_stream),
426 bt_trace_get_name(
427 bt_stream_borrow_trace_const(ir_stream)),
428 stream->trace->path->str, stream->file_name->str);
429 status = BT_SELF_COMPONENT_STATUS_ERROR;
430 goto end;
431 }
432
433 if (stream->discarded_packets_state.beginning_cs !=
434 stream->prev_packet_state.end_cs) {
435 BT_LOGE("Incompatible discarded packets message: "
436 "unexpected beginning time: "
437 "beginning-cs-val=%" PRIu64 ", "
438 "expected-beginning-cs-val=%" PRIu64 ", "
439 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
440 "trace-name=\"%s\", path=\"%s/%s\"",
441 stream->discarded_packets_state.beginning_cs,
442 stream->prev_packet_state.end_cs,
443 bt_stream_get_id(ir_stream),
444 bt_stream_get_name(ir_stream),
445 bt_trace_get_name(
446 bt_stream_borrow_trace_const(ir_stream)),
447 stream->trace->path->str, stream->file_name->str);
448 status = BT_SELF_COMPONENT_STATUS_ERROR;
449 goto end;
450 }
451
452 expected_end_cs = bt_clock_snapshot_get_value(cs);
453
454 if (stream->discarded_packets_state.end_cs !=
455 expected_end_cs) {
456 BT_LOGE("Incompatible discarded packets message: "
457 "unexpected end time: "
458 "end-cs-val=%" PRIu64 ", "
459 "expected-end-cs-val=%" PRIu64 ", "
460 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
461 "trace-name=\"%s\", path=\"%s/%s\"",
462 stream->discarded_packets_state.end_cs,
463 expected_end_cs,
464 bt_stream_get_id(ir_stream),
465 bt_stream_get_name(ir_stream),
466 bt_trace_get_name(
467 bt_stream_borrow_trace_const(ir_stream)),
468 stream->trace->path->str, stream->file_name->str);
469 status = BT_SELF_COMPONENT_STATUS_ERROR;
470 goto end;
471 }
472 }
473
474 /*
475 * We're not in a discarded packets time range anymore since we
476 * require that the discarded packets time ranges go from one
477 * packet's end time to the next packet's beginning time, and
478 * we're handling a packet beginning message here.
479 */
480 stream->discarded_packets_state.in_range = false;
481
482 ret = fs_sink_stream_open_packet(stream, cs, ir_packet);
483 if (ret) {
484 status = BT_SELF_COMPONENT_STATUS_ERROR;
485 goto end;
486 }
487
488 end:
489 return status;
490 }
491
492 static inline
493 bt_self_component_status handle_packet_end_msg(
494 struct fs_sink_comp *fs_sink, const bt_message *msg)
495 {
496 int ret;
497 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
498 const bt_packet *ir_packet =
499 bt_message_packet_end_borrow_packet_const(msg);
500 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
501 struct fs_sink_stream *stream;
502 const bt_clock_snapshot *cs = NULL;
503
504 stream = borrow_stream(fs_sink, ir_stream);
505 if (G_UNLIKELY(!stream)) {
506 status = BT_SELF_COMPONENT_STATUS_ERROR;
507 goto end;
508 }
509
510 if (stream->sc->packets_have_ts_end) {
511 cs = bt_message_packet_end_borrow_default_clock_snapshot_const(
512 msg);
513 BT_ASSERT(cs);
514 }
515
516 /*
517 * If we previously received a discarded events message with
518 * a time range, make sure that its end time matches what's
519 * expected for CTF 1.8, that is:
520 *
521 * * Its end time is the current packet's end time.
522 *
523 * Validation of the discarded events message's beginning time
524 * is performed in handle_packet_beginning_msg().
525 */
526 if (stream->discarded_events_state.in_range) {
527 uint64_t expected_cs;
528
529 /*
530 * `stream->discarded_events_state.in_range` is only set
531 * when the stream class's discarded events have a time
532 * range.
533 *
534 * It is required that the packet beginning and end
535 * messages for this stream class have times when
536 * discarded events have a time range.
537 */
538 BT_ASSERT(stream->sc->discarded_events_has_ts);
539 BT_ASSERT(stream->sc->packets_have_ts_begin);
540 BT_ASSERT(stream->sc->packets_have_ts_end);
541
542 expected_cs = bt_clock_snapshot_get_value(cs);
543
544 if (stream->discarded_events_state.end_cs != expected_cs) {
545 BT_LOGE("Incompatible discarded events message: "
546 "unexpected end time: "
547 "end-cs-val=%" PRIu64 ", "
548 "expected-end-cs-val=%" PRIu64 ", "
549 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
550 "trace-name=\"%s\", path=\"%s/%s\"",
551 stream->discarded_events_state.end_cs,
552 expected_cs,
553 bt_stream_get_id(ir_stream),
554 bt_stream_get_name(ir_stream),
555 bt_trace_get_name(
556 bt_stream_borrow_trace_const(ir_stream)),
557 stream->trace->path->str, stream->file_name->str);
558 status = BT_SELF_COMPONENT_STATUS_ERROR;
559 goto end;
560 }
561 }
562
563 ret = fs_sink_stream_close_packet(stream, cs);
564 if (ret) {
565 status = BT_SELF_COMPONENT_STATUS_ERROR;
566 goto end;
567 }
568
569 /*
570 * We're not in a discarded events time range anymore since we
571 * require that the discarded events time ranges go from one
572 * packet's end time to the next packet's end time, and we're
573 * handling a packet end message here.
574 */
575 stream->discarded_events_state.in_range = false;
576
577 end:
578 return status;
579 }
580
581 static inline
582 bt_self_component_status handle_stream_beginning_msg(
583 struct fs_sink_comp *fs_sink, const bt_message *msg)
584 {
585 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
586 const bt_stream *ir_stream =
587 bt_message_stream_beginning_borrow_stream_const(msg);
588 const bt_stream_class *ir_sc =
589 bt_stream_borrow_class_const(ir_stream);
590 struct fs_sink_stream *stream;
591 bool packets_have_beginning_end_cs =
592 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc) &&
593 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc);
594
595 /*
596 * Not supported: discarded events with default clock snapshots,
597 * but packet beginning/end without default clock snapshot.
598 */
599 if (!fs_sink->ignore_discarded_events &&
600 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc) &&
601 !packets_have_beginning_end_cs) {
602 BT_LOGE("Unsupported stream: discarded events have "
603 "default clock snapshots, but packets have no "
604 "beginning and/or end default clock snapshots: "
605 "stream-addr=%p, "
606 "stream-id=%" PRIu64 ", "
607 "stream-name=\"%s\"",
608 ir_stream, bt_stream_get_id(ir_stream),
609 bt_stream_get_name(ir_stream));
610 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
611 goto end;
612 }
613
614 /*
615 * Not supported: discarded packets with default clock
616 * snapshots, but packet beginning/end without default clock
617 * snapshot.
618 */
619 if (!fs_sink->ignore_discarded_packets &&
620 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc) &&
621 !packets_have_beginning_end_cs) {
622 BT_LOGE("Unsupported stream: discarded packets have "
623 "default clock snapshots, but packets have no "
624 "beginning and/or end default clock snapshots: "
625 "stream-addr=%p, "
626 "stream-id=%" PRIu64 ", "
627 "stream-name=\"%s\"",
628 ir_stream, bt_stream_get_id(ir_stream),
629 bt_stream_get_name(ir_stream));
630 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
631 goto end;
632 }
633
634 stream = borrow_stream(fs_sink, ir_stream);
635 if (!stream) {
636 status = BT_SELF_COMPONENT_STATUS_ERROR;
637 goto end;
638 }
639
640 BT_LOGI("Created new, empty stream file: "
641 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
642 "trace-name=\"%s\", path=\"%s/%s\"",
643 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
644 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
645 stream->trace->path->str, stream->file_name->str);
646
647 end:
648 return status;
649 }
650
651 static inline
652 bt_self_component_status handle_stream_end_msg(struct fs_sink_comp *fs_sink,
653 const bt_message *msg)
654 {
655 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
656 const bt_stream *ir_stream =
657 bt_message_stream_end_borrow_stream_const(msg);
658 struct fs_sink_stream *stream;
659
660 stream = borrow_stream(fs_sink, ir_stream);
661 if (!stream) {
662 status = BT_SELF_COMPONENT_STATUS_ERROR;
663 goto end;
664 }
665
666 BT_LOGI("Closing stream file: "
667 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
668 "trace-name=\"%s\", path=\"%s/%s\"",
669 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
670 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
671 stream->trace->path->str, stream->file_name->str);
672
673 /*
674 * This destroys the stream object and frees all its resources,
675 * closing the stream file.
676 */
677 g_hash_table_remove(stream->trace->streams, ir_stream);
678
679 end:
680 return status;
681 }
682
683 static inline
684 bt_self_component_status handle_discarded_events_msg(
685 struct fs_sink_comp *fs_sink, const bt_message *msg)
686 {
687 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
688 const bt_stream *ir_stream =
689 bt_message_discarded_events_borrow_stream_const(msg);
690 struct fs_sink_stream *stream;
691 const bt_clock_snapshot *cs = NULL;
692 bt_property_availability avail;
693 uint64_t count;
694
695 stream = borrow_stream(fs_sink, ir_stream);
696 if (!stream) {
697 status = BT_SELF_COMPONENT_STATUS_ERROR;
698 goto end;
699 }
700
701 if (fs_sink->ignore_discarded_events) {
702 BT_LOGI("Ignoring discarded events message: "
703 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
704 "trace-name=\"%s\", path=\"%s/%s\"",
705 bt_stream_get_id(ir_stream),
706 bt_stream_get_name(ir_stream),
707 bt_trace_get_name(
708 bt_stream_borrow_trace_const(ir_stream)),
709 stream->trace->path->str, stream->file_name->str);
710 goto end;
711 }
712
713 if (stream->discarded_events_state.in_range) {
714 BT_LOGE("Unsupported contiguous discarded events message: "
715 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
716 "trace-name=\"%s\", path=\"%s/%s\"",
717 bt_stream_get_id(ir_stream),
718 bt_stream_get_name(ir_stream),
719 bt_trace_get_name(
720 bt_stream_borrow_trace_const(ir_stream)),
721 stream->trace->path->str, stream->file_name->str);
722 status = BT_SELF_COMPONENT_STATUS_ERROR;
723 goto end;
724 }
725
726 /*
727 * If we're currently in an opened packet (got a packet
728 * beginning message, but no packet end message yet), we do not
729 * support having a discarded events message with a time range
730 * because we require that the discarded events message's time
731 * range go from a packet's end time to the next packet's end
732 * time.
733 */
734 if (stream->packet_state.is_open &&
735 stream->sc->discarded_events_has_ts) {
736 BT_LOGE("Unsupported discarded events message with "
737 "default clock snapshots occuring within a packet: "
738 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
739 "trace-name=\"%s\", path=\"%s/%s\"",
740 bt_stream_get_id(ir_stream),
741 bt_stream_get_name(ir_stream),
742 bt_trace_get_name(
743 bt_stream_borrow_trace_const(ir_stream)),
744 stream->trace->path->str, stream->file_name->str);
745 status = BT_SELF_COMPONENT_STATUS_ERROR;
746 goto end;
747 }
748
749 if (stream->sc->discarded_events_has_ts) {
750 /*
751 * Make the stream's state be in the time range of a
752 * discarded events message since we have the message's
753 * time range (`stream->sc->discarded_events_has_ts`).
754 */
755 stream->discarded_events_state.in_range = true;
756
757 /*
758 * The clock snapshot values will be validated when
759 * handling the next packet beginning and end messages
760 * (next calls to handle_packet_beginning_msg() and
761 * handle_packet_end_msg()).
762 */
763 cs = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
764 msg);
765 BT_ASSERT(cs);
766 stream->discarded_events_state.beginning_cs =
767 bt_clock_snapshot_get_value(cs);
768 cs = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(
769 msg);
770 BT_ASSERT(cs);
771 stream->discarded_events_state.end_cs = bt_clock_snapshot_get_value(cs);
772 }
773
774 avail = bt_message_discarded_events_get_count(msg, &count);
775 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
776 /*
777 * There's no specific count of discarded events: set it
778 * to 1 so that we know that we at least discarded
779 * something.
780 */
781 count = 1;
782 }
783
784 stream->packet_state.discarded_events_counter += count;
785
786 end:
787 return status;
788 }
789
790 static inline
791 bt_self_component_status handle_discarded_packets_msg(
792 struct fs_sink_comp *fs_sink, const bt_message *msg)
793 {
794 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
795 const bt_stream *ir_stream =
796 bt_message_discarded_packets_borrow_stream_const(msg);
797 struct fs_sink_stream *stream;
798 const bt_clock_snapshot *cs = NULL;
799 bt_property_availability avail;
800 uint64_t count;
801
802 stream = borrow_stream(fs_sink, ir_stream);
803 if (!stream) {
804 status = BT_SELF_COMPONENT_STATUS_ERROR;
805 goto end;
806 }
807
808 if (fs_sink->ignore_discarded_packets) {
809 BT_LOGI("Ignoring discarded packets message: "
810 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
811 "trace-name=\"%s\", path=\"%s/%s\"",
812 bt_stream_get_id(ir_stream),
813 bt_stream_get_name(ir_stream),
814 bt_trace_get_name(
815 bt_stream_borrow_trace_const(ir_stream)),
816 stream->trace->path->str, stream->file_name->str);
817 goto end;
818 }
819
820 if (stream->discarded_packets_state.in_range) {
821 BT_LOGE("Unsupported contiguous discarded packets message: "
822 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
823 "trace-name=\"%s\", path=\"%s/%s\"",
824 bt_stream_get_id(ir_stream),
825 bt_stream_get_name(ir_stream),
826 bt_trace_get_name(
827 bt_stream_borrow_trace_const(ir_stream)),
828 stream->trace->path->str, stream->file_name->str);
829 status = BT_SELF_COMPONENT_STATUS_ERROR;
830 goto end;
831 }
832
833 /*
834 * Discarded packets messages are guaranteed to occur between
835 * packets.
836 */
837 BT_ASSERT(!stream->packet_state.is_open);
838
839 if (stream->sc->discarded_packets_has_ts) {
840 /*
841 * Make the stream's state be in the time range of a
842 * discarded packets message since we have the message's
843 * time range (`stream->sc->discarded_packets_has_ts`).
844 */
845 stream->discarded_packets_state.in_range = true;
846
847 /*
848 * The clock snapshot values will be validated when
849 * handling the next packet beginning message (next call
850 * to handle_packet_beginning_msg()).
851 */
852 cs = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
853 msg);
854 BT_ASSERT(cs);
855 stream->discarded_packets_state.beginning_cs =
856 bt_clock_snapshot_get_value(cs);
857 cs = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(
858 msg);
859 BT_ASSERT(cs);
860 stream->discarded_packets_state.end_cs =
861 bt_clock_snapshot_get_value(cs);
862 }
863
864 avail = bt_message_discarded_packets_get_count(msg, &count);
865 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
866 /*
867 * There's no specific count of discarded packets: set
868 * it to 1 so that we know that we at least discarded
869 * something.
870 */
871 count = 1;
872 }
873
874 stream->packet_state.seq_num += count;
875
876 end:
877 return status;
878 }
879
880 static inline
881 void put_messages(bt_message_array_const msgs, uint64_t count)
882 {
883 uint64_t i;
884
885 for (i = 0; i < count; i++) {
886 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
887 }
888 }
889
890 BT_HIDDEN
891 bt_self_component_status ctf_fs_sink_consume(bt_self_component_sink *self_comp)
892 {
893 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
894 struct fs_sink_comp *fs_sink;
895 bt_message_iterator_status it_status;
896 uint64_t msg_count = 0;
897 bt_message_array_const msgs;
898
899 fs_sink = bt_self_component_get_data(
900 bt_self_component_sink_as_self_component(self_comp));
901 BT_ASSERT(fs_sink);
902 BT_ASSERT(fs_sink->upstream_iter);
903
904 /* Consume messages */
905 it_status = bt_self_component_port_input_message_iterator_next(
906 fs_sink->upstream_iter, &msgs, &msg_count);
907 if (it_status < 0) {
908 status = BT_SELF_COMPONENT_STATUS_ERROR;
909 goto end;
910 }
911
912 switch (it_status) {
913 case BT_MESSAGE_ITERATOR_STATUS_OK:
914 {
915 uint64_t i;
916
917 for (i = 0; i < msg_count; i++) {
918 const bt_message *msg = msgs[i];
919
920 BT_ASSERT(msg);
921
922 switch (bt_message_get_type(msg)) {
923 case BT_MESSAGE_TYPE_EVENT:
924 status = handle_event_msg(fs_sink, msg);
925 break;
926 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
927 status = handle_packet_beginning_msg(
928 fs_sink, msg);
929 break;
930 case BT_MESSAGE_TYPE_PACKET_END:
931 status = handle_packet_end_msg(
932 fs_sink, msg);
933 break;
934 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
935 /* Ignore */
936 BT_LOGD_STR("Ignoring message iterator inactivity message.");
937 break;
938 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
939 status = handle_stream_beginning_msg(
940 fs_sink, msg);
941 break;
942 case BT_MESSAGE_TYPE_STREAM_END:
943 status = handle_stream_end_msg(
944 fs_sink, msg);
945 break;
946 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
947 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
948 /* Not supported by CTF 1.8 */
949 BT_LOGD_STR("Ignoring stream activity message.");
950 break;
951 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
952 status = handle_discarded_events_msg(
953 fs_sink, msg);
954 break;
955 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
956 status = handle_discarded_packets_msg(
957 fs_sink, msg);
958 break;
959 default:
960 abort();
961 }
962
963 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
964
965 if (status != BT_SELF_COMPONENT_STATUS_OK) {
966 BT_LOGE("Failed to handle message: "
967 "generated CTF traces could be incomplete: "
968 "output-dir-path=\"%s\"",
969 fs_sink->output_dir_path->str);
970 goto error;
971 }
972 }
973
974 break;
975 }
976 case BT_MESSAGE_ITERATOR_STATUS_AGAIN:
977 status = BT_SELF_COMPONENT_STATUS_AGAIN;
978 break;
979 case BT_MESSAGE_ITERATOR_STATUS_END:
980 /* TODO: Finalize all traces (should already be done?) */
981 status = BT_SELF_COMPONENT_STATUS_END;
982 break;
983 case BT_MESSAGE_ITERATOR_STATUS_NOMEM:
984 status = BT_SELF_COMPONENT_STATUS_NOMEM;
985 break;
986 case BT_MESSAGE_ITERATOR_STATUS_ERROR:
987 status = BT_SELF_COMPONENT_STATUS_NOMEM;
988 break;
989 default:
990 break;
991 }
992
993 goto end;
994
995 error:
996 BT_ASSERT(status != BT_SELF_COMPONENT_STATUS_OK);
997 put_messages(msgs, msg_count);
998
999 end:
1000 return status;
1001 }
1002
1003 BT_HIDDEN
1004 bt_self_component_status ctf_fs_sink_graph_is_configured(
1005 bt_self_component_sink *self_comp)
1006 {
1007 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
1008 struct fs_sink_comp *fs_sink = bt_self_component_get_data(
1009 bt_self_component_sink_as_self_component(self_comp));
1010
1011 fs_sink->upstream_iter =
1012 bt_self_component_port_input_message_iterator_create(
1013 bt_self_component_sink_borrow_input_port_by_name(
1014 self_comp, in_port_name));
1015 if (!fs_sink->upstream_iter) {
1016 status = BT_SELF_COMPONENT_STATUS_NOMEM;
1017 goto end;
1018 }
1019
1020 end:
1021 return status;
1022 }
1023
1024 BT_HIDDEN
1025 void ctf_fs_sink_finalize(bt_self_component_sink *self_comp)
1026 {
1027 struct fs_sink_comp *fs_sink = bt_self_component_get_data(
1028 bt_self_component_sink_as_self_component(self_comp));
1029
1030 destroy_fs_sink_comp(fs_sink);
1031 }
This page took 0.05064 seconds and 4 git commands to generate.