sink.text.details: use BT_COMP_LOG*() instead of BT_LOG*()
[babeltrace.git] / src / plugins / ctf / fs-sink / fs-sink.c
CommitLineData
15fe47e0
PP
1/*
2 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 * SOFTWARE.
21 */
22
ffa3b2b3 23#define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level)
350ad6c1 24#define BT_LOG_TAG "PLUGIN/SINK.CTF.FS"
ffa3b2b3 25#include "logging/log.h"
15fe47e0 26
3fadfbc0 27#include <babeltrace2/babeltrace.h>
15fe47e0
PP
28#include <stdio.h>
29#include <stdbool.h>
30#include <glib.h>
578e048b
MJ
31#include "common/assert.h"
32#include "ctfser/ctfser.h"
15fe47e0
PP
33
34#include "fs-sink.h"
35#include "fs-sink-trace.h"
36#include "fs-sink-stream.h"
37#include "fs-sink-ctf-meta.h"
38#include "translate-trace-ir-to-ctf-ir.h"
39#include "translate-ctf-ir-to-tsdl.h"
40
41static
42const char * const in_port_name = "in";
43
44static
45bt_self_component_status ensure_output_dir_exists(
46 struct fs_sink_comp *fs_sink)
47{
48 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
49 int ret;
50
51 ret = g_mkdir_with_parents(fs_sink->output_dir_path->str, 0755);
52 if (ret) {
53 BT_LOGE_ERRNO("Cannot create directories for output directory",
54 ": output-dir-path=\"%s\"",
55 fs_sink->output_dir_path->str);
56 status = BT_SELF_COMPONENT_STATUS_ERROR;
57 goto end;
58 }
59
60end:
61 return status;
62}
63
64static
65bt_self_component_status configure_component(struct fs_sink_comp *fs_sink,
66 const bt_value *params)
67{
68 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
69 const bt_value *value;
70
71 value = bt_value_map_borrow_entry_value_const(params, "path");
72 if (!value) {
73 BT_LOGE_STR("Missing mandatory `path` parameter.");
74 status = BT_SELF_COMPONENT_STATUS_ERROR;
75 goto end;
76 }
77
78 if (!bt_value_is_string(value)) {
79 BT_LOGE_STR("`path` parameter: expecting a string.");
80 status = BT_SELF_COMPONENT_STATUS_ERROR;
81 goto end;
82 }
83
84 g_string_assign(fs_sink->output_dir_path,
85 bt_value_string_get(value));
86 value = bt_value_map_borrow_entry_value_const(params,
87 "assume-single-trace");
88 if (value) {
89 if (!bt_value_is_bool(value)) {
90 BT_LOGE_STR("`assume-single-trace` parameter: expecting a boolean.");
91 status = BT_SELF_COMPONENT_STATUS_ERROR;
92 goto end;
93 }
94
95 fs_sink->assume_single_trace = (bool) bt_value_bool_get(value);
96 }
97
98 value = bt_value_map_borrow_entry_value_const(params,
99 "ignore-discarded-events");
100 if (value) {
101 if (!bt_value_is_bool(value)) {
102 BT_LOGE_STR("`ignore-discarded-events` parameter: expecting a boolean.");
103 status = BT_SELF_COMPONENT_STATUS_ERROR;
104 goto end;
105 }
106
107 fs_sink->ignore_discarded_events =
108 (bool) bt_value_bool_get(value);
109 }
110
111 value = bt_value_map_borrow_entry_value_const(params,
112 "ignore-discarded-packets");
113 if (value) {
114 if (!bt_value_is_bool(value)) {
115 BT_LOGE_STR("`ignore-discarded-packets` parameter: expecting a boolean.");
116 status = BT_SELF_COMPONENT_STATUS_ERROR;
117 goto end;
118 }
119
120 fs_sink->ignore_discarded_packets =
121 (bool) bt_value_bool_get(value);
122 }
123
124 value = bt_value_map_borrow_entry_value_const(params,
125 "quiet");
126 if (value) {
127 if (!bt_value_is_bool(value)) {
128 BT_LOGE_STR("`quiet` parameter: expecting a boolean.");
129 status = BT_SELF_COMPONENT_STATUS_ERROR;
130 goto end;
131 }
132
133 fs_sink->quiet = (bool) bt_value_bool_get(value);
134 }
135
136end:
137 return status;
138}
139
140static
141void destroy_fs_sink_comp(struct fs_sink_comp *fs_sink)
142{
143 if (!fs_sink) {
144 goto end;
145 }
146
147 if (fs_sink->output_dir_path) {
148 g_string_free(fs_sink->output_dir_path, TRUE);
149 fs_sink->output_dir_path = NULL;
150 }
151
152 if (fs_sink->traces) {
153 g_hash_table_destroy(fs_sink->traces);
154 fs_sink->traces = NULL;
155 }
156
157 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(
158 fs_sink->upstream_iter);
159 g_free(fs_sink);
160
161end:
162 return;
163}
164
165BT_HIDDEN
166bt_self_component_status ctf_fs_sink_init(
ffa3b2b3 167 bt_self_component_sink *self_comp_sink, const bt_value *params,
15fe47e0
PP
168 void *init_method_data)
169{
170 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
171 struct fs_sink_comp *fs_sink = NULL;
ffa3b2b3
PP
172 bt_self_component *self_comp =
173 bt_self_component_sink_as_self_component(self_comp_sink);
174 bt_logging_level log_level = bt_component_get_logging_level(
175 bt_self_component_as_component(self_comp));
15fe47e0
PP
176
177 fs_sink = g_new0(struct fs_sink_comp, 1);
178 if (!fs_sink) {
ffa3b2b3
PP
179 BT_LOG_WRITE_CUR_LVL(BT_LOG_ERROR, log_level, BT_LOG_TAG,
180 "Failed to allocate one CTF FS sink structure.");
15fe47e0
PP
181 status = BT_SELF_COMPONENT_STATUS_NOMEM;
182 goto end;
183 }
184
ffa3b2b3 185 fs_sink->log_level = log_level;
15fe47e0 186 fs_sink->output_dir_path = g_string_new(NULL);
ffa3b2b3 187 fs_sink->self_comp = self_comp_sink;
15fe47e0
PP
188 status = configure_component(fs_sink, params);
189 if (status != BT_SELF_COMPONENT_STATUS_OK) {
190 /* configure_component() logs errors */
191 goto end;
192 }
193
194 if (fs_sink->assume_single_trace &&
195 g_file_test(fs_sink->output_dir_path->str,
196 G_FILE_TEST_EXISTS)) {
197 BT_LOGE("Single trace mode, but output path exists: "
198 "output-path=\"%s\"", fs_sink->output_dir_path->str);
199 status = BT_SELF_COMPONENT_STATUS_ERROR;
200 goto end;
201 }
202
203 status = ensure_output_dir_exists(fs_sink);
204 if (status != BT_SELF_COMPONENT_STATUS_OK) {
205 /* ensure_output_dir_exists() logs errors */
206 goto end;
207 }
208
209 fs_sink->traces = g_hash_table_new_full(g_direct_hash, g_direct_equal,
210 NULL, (GDestroyNotify) fs_sink_trace_destroy);
211 if (!fs_sink->traces) {
212 BT_LOGE_STR("Failed to allocate one GHashTable.");
213 status = BT_SELF_COMPONENT_STATUS_NOMEM;
214 goto end;
215 }
216
ffa3b2b3
PP
217 status = bt_self_component_sink_add_input_port(self_comp_sink,
218 in_port_name, NULL, NULL);
15fe47e0
PP
219 if (status != BT_SELF_COMPONENT_STATUS_OK) {
220 goto end;
221 }
222
ffa3b2b3 223 bt_self_component_set_data(self_comp, fs_sink);
15fe47e0
PP
224
225end:
226 if (status != BT_SELF_COMPONENT_STATUS_OK) {
227 destroy_fs_sink_comp(fs_sink);
228 }
229
230 return status;
231}
232
233static inline
234struct fs_sink_stream *borrow_stream(struct fs_sink_comp *fs_sink,
235 const bt_stream *ir_stream)
236{
237 const bt_trace *ir_trace = bt_stream_borrow_trace_const(ir_stream);
238 struct fs_sink_trace *trace;
239 struct fs_sink_stream *stream = NULL;
240
241 trace = g_hash_table_lookup(fs_sink->traces, ir_trace);
91d81473 242 if (G_UNLIKELY(!trace)) {
15fe47e0
PP
243 if (fs_sink->assume_single_trace &&
244 g_hash_table_size(fs_sink->traces) > 0) {
245 BT_LOGE("Single trace mode, but getting more than one trace: "
246 "stream-name=\"%s\"",
247 bt_stream_get_name(ir_stream));
248 goto end;
249 }
250
251 trace = fs_sink_trace_create(fs_sink, ir_trace);
252 if (!trace) {
253 goto end;
254 }
255 }
256
257 stream = g_hash_table_lookup(trace->streams, ir_stream);
91d81473 258 if (G_UNLIKELY(!stream)) {
15fe47e0
PP
259 stream = fs_sink_stream_create(trace, ir_stream);
260 if (!stream) {
261 goto end;
262 }
263 }
264
265end:
266 return stream;
267}
268
269static inline
270bt_self_component_status handle_event_msg(struct fs_sink_comp *fs_sink,
271 const bt_message *msg)
272{
273 int ret;
274 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
275 const bt_event *ir_event = bt_message_event_borrow_event_const(msg);
276 const bt_stream *ir_stream = bt_event_borrow_stream_const(ir_event);
277 struct fs_sink_stream *stream;
278 struct fs_sink_ctf_event_class *ec = NULL;
279 const bt_clock_snapshot *cs = NULL;
280
281 stream = borrow_stream(fs_sink, ir_stream);
91d81473 282 if (G_UNLIKELY(!stream)) {
15fe47e0
PP
283 status = BT_SELF_COMPONENT_STATUS_ERROR;
284 goto end;
285 }
286
287 ret = try_translate_event_class_trace_ir_to_ctf_ir(stream->sc,
ffa3b2b3
PP
288 bt_event_borrow_class_const(ir_event), &ec,
289 fs_sink->log_level);
15fe47e0
PP
290 if (ret) {
291 status = BT_SELF_COMPONENT_STATUS_ERROR;
292 goto end;
293 }
294
295 BT_ASSERT(ec);
296
297 if (stream->sc->default_clock_class) {
0cbc2c33
PP
298 cs = bt_message_event_borrow_default_clock_snapshot_const(
299 msg);
15fe47e0
PP
300 }
301
302 ret = fs_sink_stream_write_event(stream, cs, ir_event, ec);
91d81473 303 if (G_UNLIKELY(ret)) {
15fe47e0
PP
304 status = BT_SELF_COMPONENT_STATUS_ERROR;
305 goto end;
306 }
307
308end:
309 return status;
310}
311
312static inline
313bt_self_component_status handle_packet_beginning_msg(
314 struct fs_sink_comp *fs_sink, const bt_message *msg)
315{
316 int ret;
317 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
318 const bt_packet *ir_packet =
319 bt_message_packet_beginning_borrow_packet_const(msg);
320 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
321 struct fs_sink_stream *stream;
322 const bt_clock_snapshot *cs = NULL;
323
324 stream = borrow_stream(fs_sink, ir_stream);
91d81473 325 if (G_UNLIKELY(!stream)) {
15fe47e0
PP
326 status = BT_SELF_COMPONENT_STATUS_ERROR;
327 goto end;
328 }
329
ffb5c13c 330 if (stream->sc->packets_have_ts_begin) {
0cbc2c33
PP
331 cs = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
332 msg);
15fe47e0
PP
333 BT_ASSERT(cs);
334 }
335
491c35cc
PP
336 /*
337 * If we previously received a discarded events message with
338 * a time range, make sure that its beginning time matches what's
339 * expected for CTF 1.8, that is:
340 *
341 * * Its beginning time is the previous packet's end
342 * time (or the current packet's beginning time if
343 * this is the first packet).
344 *
345 * We check this here instead of in handle_packet_end_msg()
346 * because we want to catch any incompatible message as early as
347 * possible to report the error.
348 *
349 * Validation of the discarded events message's end time is
350 * performed in handle_packet_end_msg().
351 */
15fe47e0 352 if (stream->discarded_events_state.in_range) {
ffb5c13c
PP
353 uint64_t expected_cs;
354
491c35cc
PP
355 /*
356 * `stream->discarded_events_state.in_range` is only set
357 * when the stream class's discarded events have a time
358 * range.
359 *
360 * It is required that the packet beginning and end
361 * messages for this stream class have times when
362 * discarded events have a time range.
363 */
ffb5c13c
PP
364 BT_ASSERT(stream->sc->discarded_events_has_ts);
365 BT_ASSERT(stream->sc->packets_have_ts_begin);
366 BT_ASSERT(stream->sc->packets_have_ts_end);
367
ffb5c13c
PP
368 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
369 /* We're opening the first packet */
370 expected_cs = bt_clock_snapshot_get_value(cs);
371 } else {
372 expected_cs = stream->prev_packet_state.end_cs;
15fe47e0 373 }
15fe47e0 374
ffb5c13c
PP
375 if (stream->discarded_events_state.beginning_cs !=
376 expected_cs) {
377 BT_LOGE("Incompatible discarded events message: "
378 "unexpected beginning time: "
379 "beginning-cs-val=%" PRIu64 ", "
380 "expected-beginning-cs-val=%" PRIu64 ", "
15fe47e0
PP
381 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
382 "trace-name=\"%s\", path=\"%s/%s\"",
ffb5c13c
PP
383 stream->discarded_events_state.beginning_cs,
384 expected_cs,
15fe47e0
PP
385 bt_stream_get_id(ir_stream),
386 bt_stream_get_name(ir_stream),
387 bt_trace_get_name(
388 bt_stream_borrow_trace_const(ir_stream)),
389 stream->trace->path->str, stream->file_name->str);
390 status = BT_SELF_COMPONENT_STATUS_ERROR;
391 goto end;
392 }
ffb5c13c
PP
393 }
394
491c35cc
PP
395 /*
396 * If we previously received a discarded packets message with a
397 * time range, make sure that its beginning and end times match
398 * what's expected for CTF 1.8, that is:
399 *
400 * * Its beginning time is the previous packet's end time.
401 *
402 * * Its end time is the current packet's beginning time.
403 */
ffb5c13c
PP
404 if (stream->discarded_packets_state.in_range) {
405 uint64_t expected_end_cs;
406
491c35cc
PP
407 /*
408 * `stream->discarded_packets_state.in_range` is only
409 * set when the stream class's discarded packets have a
410 * time range.
411 *
412 * It is required that the packet beginning and end
413 * messages for this stream class have times when
414 * discarded packets have a time range.
415 */
ffb5c13c
PP
416 BT_ASSERT(stream->sc->discarded_packets_has_ts);
417 BT_ASSERT(stream->sc->packets_have_ts_begin);
418 BT_ASSERT(stream->sc->packets_have_ts_end);
15fe47e0
PP
419
420 /*
491c35cc
PP
421 * It is not supported to have a discarded packets
422 * message _before_ the first packet: we cannot validate
423 * that its beginning time is compatible with CTF 1.8 in
424 * this case.
15fe47e0 425 */
ffb5c13c
PP
426 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
427 BT_LOGE("Incompatible discarded packets message "
428 "occuring before the stream's first packet: "
429 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
430 "trace-name=\"%s\", path=\"%s/%s\"",
431 bt_stream_get_id(ir_stream),
432 bt_stream_get_name(ir_stream),
433 bt_trace_get_name(
434 bt_stream_borrow_trace_const(ir_stream)),
435 stream->trace->path->str, stream->file_name->str);
436 status = BT_SELF_COMPONENT_STATUS_ERROR;
437 goto end;
438 }
439
440 if (stream->discarded_packets_state.beginning_cs !=
441 stream->prev_packet_state.end_cs) {
442 BT_LOGE("Incompatible discarded packets message: "
443 "unexpected beginning time: "
444 "beginning-cs-val=%" PRIu64 ", "
445 "expected-beginning-cs-val=%" PRIu64 ", "
446 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
447 "trace-name=\"%s\", path=\"%s/%s\"",
448 stream->discarded_packets_state.beginning_cs,
449 stream->prev_packet_state.end_cs,
450 bt_stream_get_id(ir_stream),
451 bt_stream_get_name(ir_stream),
452 bt_trace_get_name(
453 bt_stream_borrow_trace_const(ir_stream)),
454 stream->trace->path->str, stream->file_name->str);
455 status = BT_SELF_COMPONENT_STATUS_ERROR;
456 goto end;
15fe47e0 457 }
ffb5c13c
PP
458
459 expected_end_cs = bt_clock_snapshot_get_value(cs);
460
461 if (stream->discarded_packets_state.end_cs !=
462 expected_end_cs) {
463 BT_LOGE("Incompatible discarded packets message: "
464 "unexpected end time: "
465 "end-cs-val=%" PRIu64 ", "
466 "expected-end-cs-val=%" PRIu64 ", "
467 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
468 "trace-name=\"%s\", path=\"%s/%s\"",
469 stream->discarded_packets_state.end_cs,
470 expected_end_cs,
471 bt_stream_get_id(ir_stream),
472 bt_stream_get_name(ir_stream),
473 bt_trace_get_name(
474 bt_stream_borrow_trace_const(ir_stream)),
475 stream->trace->path->str, stream->file_name->str);
476 status = BT_SELF_COMPONENT_STATUS_ERROR;
477 goto end;
478 }
479 }
480
491c35cc
PP
481 /*
482 * We're not in a discarded packets time range anymore since we
483 * require that the discarded packets time ranges go from one
484 * packet's end time to the next packet's beginning time, and
485 * we're handling a packet beginning message here.
486 */
487 stream->discarded_packets_state.in_range = false;
15fe47e0 488
15fe47e0
PP
489 ret = fs_sink_stream_open_packet(stream, cs, ir_packet);
490 if (ret) {
491 status = BT_SELF_COMPONENT_STATUS_ERROR;
492 goto end;
493 }
494
495end:
496 return status;
497}
498
499static inline
500bt_self_component_status handle_packet_end_msg(
501 struct fs_sink_comp *fs_sink, const bt_message *msg)
502{
503 int ret;
504 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
505 const bt_packet *ir_packet =
506 bt_message_packet_end_borrow_packet_const(msg);
507 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
508 struct fs_sink_stream *stream;
509 const bt_clock_snapshot *cs = NULL;
510
511 stream = borrow_stream(fs_sink, ir_stream);
91d81473 512 if (G_UNLIKELY(!stream)) {
15fe47e0
PP
513 status = BT_SELF_COMPONENT_STATUS_ERROR;
514 goto end;
515 }
516
ffb5c13c 517 if (stream->sc->packets_have_ts_end) {
0cbc2c33
PP
518 cs = bt_message_packet_end_borrow_default_clock_snapshot_const(
519 msg);
15fe47e0
PP
520 BT_ASSERT(cs);
521 }
522
491c35cc
PP
523 /*
524 * If we previously received a discarded events message with
525 * a time range, make sure that its end time matches what's
526 * expected for CTF 1.8, that is:
527 *
528 * * Its end time is the current packet's end time.
529 *
530 * Validation of the discarded events message's beginning time
531 * is performed in handle_packet_beginning_msg().
532 */
15fe47e0 533 if (stream->discarded_events_state.in_range) {
ffb5c13c
PP
534 uint64_t expected_cs;
535
491c35cc
PP
536 /*
537 * `stream->discarded_events_state.in_range` is only set
538 * when the stream class's discarded events have a time
539 * range.
540 *
541 * It is required that the packet beginning and end
542 * messages for this stream class have times when
543 * discarded events have a time range.
544 */
ffb5c13c
PP
545 BT_ASSERT(stream->sc->discarded_events_has_ts);
546 BT_ASSERT(stream->sc->packets_have_ts_begin);
547 BT_ASSERT(stream->sc->packets_have_ts_end);
548
ffb5c13c
PP
549 expected_cs = bt_clock_snapshot_get_value(cs);
550
551 if (stream->discarded_events_state.end_cs != expected_cs) {
552 BT_LOGE("Incompatible discarded events message: "
553 "unexpected end time: "
554 "end-cs-val=%" PRIu64 ", "
555 "expected-end-cs-val=%" PRIu64 ", "
556 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
557 "trace-name=\"%s\", path=\"%s/%s\"",
558 stream->discarded_events_state.end_cs,
559 expected_cs,
560 bt_stream_get_id(ir_stream),
561 bt_stream_get_name(ir_stream),
562 bt_trace_get_name(
563 bt_stream_borrow_trace_const(ir_stream)),
564 stream->trace->path->str, stream->file_name->str);
565 status = BT_SELF_COMPONENT_STATUS_ERROR;
566 goto end;
15fe47e0
PP
567 }
568 }
569
570 ret = fs_sink_stream_close_packet(stream, cs);
571 if (ret) {
572 status = BT_SELF_COMPONENT_STATUS_ERROR;
573 goto end;
574 }
575
491c35cc
PP
576 /*
577 * We're not in a discarded events time range anymore since we
578 * require that the discarded events time ranges go from one
579 * packet's end time to the next packet's end time, and we're
580 * handling a packet end message here.
581 */
582 stream->discarded_events_state.in_range = false;
15fe47e0
PP
583
584end:
585 return status;
586}
587
588static inline
589bt_self_component_status handle_stream_beginning_msg(
590 struct fs_sink_comp *fs_sink, const bt_message *msg)
591{
592 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
593 const bt_stream *ir_stream =
594 bt_message_stream_beginning_borrow_stream_const(msg);
649934d2
PP
595 const bt_stream_class *ir_sc =
596 bt_stream_borrow_class_const(ir_stream);
15fe47e0 597 struct fs_sink_stream *stream;
ffb5c13c 598 bool packets_have_beginning_end_cs =
9b24b6aa
PP
599 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc) &&
600 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc);
15fe47e0 601
649934d2 602 /*
ffb5c13c
PP
603 * Not supported: discarded events with default clock snapshots,
604 * but packet beginning/end without default clock snapshot.
649934d2 605 */
ffb5c13c
PP
606 if (!fs_sink->ignore_discarded_events &&
607 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc) &&
608 !packets_have_beginning_end_cs) {
609 BT_LOGE("Unsupported stream: discarded events have "
610 "default clock snapshots, but packets have no "
611 "beginning and/or end default clock snapshots: "
612 "stream-addr=%p, "
613 "stream-id=%" PRIu64 ", "
614 "stream-name=\"%s\"",
615 ir_stream, bt_stream_get_id(ir_stream),
616 bt_stream_get_name(ir_stream));
617 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
618 goto end;
619 }
2e90378a 620
ffb5c13c
PP
621 /*
622 * Not supported: discarded packets with default clock
623 * snapshots, but packet beginning/end without default clock
624 * snapshot.
625 */
626 if (!fs_sink->ignore_discarded_packets &&
627 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc) &&
628 !packets_have_beginning_end_cs) {
629 BT_LOGE("Unsupported stream: discarded packets have "
630 "default clock snapshots, but packets have no "
631 "beginning and/or end default clock snapshots: "
632 "stream-addr=%p, "
633 "stream-id=%" PRIu64 ", "
634 "stream-name=\"%s\"",
635 ir_stream, bt_stream_get_id(ir_stream),
636 bt_stream_get_name(ir_stream));
637 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
638 goto end;
649934d2
PP
639 }
640
15fe47e0
PP
641 stream = borrow_stream(fs_sink, ir_stream);
642 if (!stream) {
643 status = BT_SELF_COMPONENT_STATUS_ERROR;
644 goto end;
645 }
646
647 BT_LOGI("Created new, empty stream file: "
648 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
649 "trace-name=\"%s\", path=\"%s/%s\"",
650 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
651 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
652 stream->trace->path->str, stream->file_name->str);
653
654end:
655 return status;
656}
657
658static inline
659bt_self_component_status handle_stream_end_msg(struct fs_sink_comp *fs_sink,
660 const bt_message *msg)
661{
662 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
663 const bt_stream *ir_stream =
664 bt_message_stream_end_borrow_stream_const(msg);
665 struct fs_sink_stream *stream;
666
667 stream = borrow_stream(fs_sink, ir_stream);
668 if (!stream) {
669 status = BT_SELF_COMPONENT_STATUS_ERROR;
670 goto end;
671 }
672
673 BT_LOGI("Closing stream file: "
674 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
675 "trace-name=\"%s\", path=\"%s/%s\"",
676 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
677 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
678 stream->trace->path->str, stream->file_name->str);
679
680 /*
681 * This destroys the stream object and frees all its resources,
682 * closing the stream file.
683 */
684 g_hash_table_remove(stream->trace->streams, ir_stream);
685
686end:
687 return status;
688}
689
690static inline
691bt_self_component_status handle_discarded_events_msg(
692 struct fs_sink_comp *fs_sink, const bt_message *msg)
693{
694 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
695 const bt_stream *ir_stream =
696 bt_message_discarded_events_borrow_stream_const(msg);
697 struct fs_sink_stream *stream;
698 const bt_clock_snapshot *cs = NULL;
699 bt_property_availability avail;
700 uint64_t count;
701
702 stream = borrow_stream(fs_sink, ir_stream);
703 if (!stream) {
704 status = BT_SELF_COMPONENT_STATUS_ERROR;
705 goto end;
706 }
707
708 if (fs_sink->ignore_discarded_events) {
709 BT_LOGI("Ignoring discarded events message: "
710 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
711 "trace-name=\"%s\", path=\"%s/%s\"",
712 bt_stream_get_id(ir_stream),
713 bt_stream_get_name(ir_stream),
714 bt_trace_get_name(
715 bt_stream_borrow_trace_const(ir_stream)),
716 stream->trace->path->str, stream->file_name->str);
717 goto end;
718 }
719
720 if (stream->discarded_events_state.in_range) {
721 BT_LOGE("Unsupported contiguous discarded events message: "
722 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
723 "trace-name=\"%s\", path=\"%s/%s\"",
724 bt_stream_get_id(ir_stream),
725 bt_stream_get_name(ir_stream),
726 bt_trace_get_name(
727 bt_stream_borrow_trace_const(ir_stream)),
728 stream->trace->path->str, stream->file_name->str);
729 status = BT_SELF_COMPONENT_STATUS_ERROR;
730 goto end;
731 }
732
491c35cc
PP
733 /*
734 * If we're currently in an opened packet (got a packet
735 * beginning message, but no packet end message yet), we do not
736 * support having a discarded events message with a time range
737 * because we require that the discarded events message's time
738 * range go from a packet's end time to the next packet's end
739 * time.
740 */
ffb5c13c
PP
741 if (stream->packet_state.is_open &&
742 stream->sc->discarded_events_has_ts) {
743 BT_LOGE("Unsupported discarded events message with "
744 "default clock snapshots occuring within a packet: "
15fe47e0
PP
745 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
746 "trace-name=\"%s\", path=\"%s/%s\"",
747 bt_stream_get_id(ir_stream),
748 bt_stream_get_name(ir_stream),
749 bt_trace_get_name(
750 bt_stream_borrow_trace_const(ir_stream)),
751 stream->trace->path->str, stream->file_name->str);
752 status = BT_SELF_COMPONENT_STATUS_ERROR;
753 goto end;
754 }
755
ffb5c13c 756 if (stream->sc->discarded_events_has_ts) {
491c35cc
PP
757 /*
758 * Make the stream's state be in the time range of a
759 * discarded events message since we have the message's
760 * time range (`stream->sc->discarded_events_has_ts`).
761 */
ffb5c13c 762 stream->discarded_events_state.in_range = true;
15fe47e0 763
15fe47e0
PP
764 /*
765 * The clock snapshot values will be validated when
491c35cc
PP
766 * handling the next packet beginning and end messages
767 * (next calls to handle_packet_beginning_msg() and
768 * handle_packet_end_msg()).
15fe47e0 769 */
9b24b6aa 770 cs = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
0cbc2c33 771 msg);
15fe47e0
PP
772 BT_ASSERT(cs);
773 stream->discarded_events_state.beginning_cs =
774 bt_clock_snapshot_get_value(cs);
9b24b6aa 775 cs = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(
0cbc2c33 776 msg);
15fe47e0 777 BT_ASSERT(cs);
ffb5c13c 778 stream->discarded_events_state.end_cs = bt_clock_snapshot_get_value(cs);
15fe47e0
PP
779 }
780
781 avail = bt_message_discarded_events_get_count(msg, &count);
782 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
491c35cc
PP
783 /*
784 * There's no specific count of discarded events: set it
785 * to 1 so that we know that we at least discarded
786 * something.
787 */
15fe47e0
PP
788 count = 1;
789 }
790
791 stream->packet_state.discarded_events_counter += count;
792
793end:
794 return status;
795}
796
797static inline
798bt_self_component_status handle_discarded_packets_msg(
799 struct fs_sink_comp *fs_sink, const bt_message *msg)
800{
801 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
802 const bt_stream *ir_stream =
803 bt_message_discarded_packets_borrow_stream_const(msg);
804 struct fs_sink_stream *stream;
805 const bt_clock_snapshot *cs = NULL;
806 bt_property_availability avail;
807 uint64_t count;
808
809 stream = borrow_stream(fs_sink, ir_stream);
810 if (!stream) {
811 status = BT_SELF_COMPONENT_STATUS_ERROR;
812 goto end;
813 }
814
815 if (fs_sink->ignore_discarded_packets) {
816 BT_LOGI("Ignoring discarded packets message: "
817 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
818 "trace-name=\"%s\", path=\"%s/%s\"",
819 bt_stream_get_id(ir_stream),
820 bt_stream_get_name(ir_stream),
821 bt_trace_get_name(
822 bt_stream_borrow_trace_const(ir_stream)),
823 stream->trace->path->str, stream->file_name->str);
824 goto end;
825 }
826
827 if (stream->discarded_packets_state.in_range) {
828 BT_LOGE("Unsupported contiguous discarded packets message: "
829 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
830 "trace-name=\"%s\", path=\"%s/%s\"",
831 bt_stream_get_id(ir_stream),
832 bt_stream_get_name(ir_stream),
833 bt_trace_get_name(
834 bt_stream_borrow_trace_const(ir_stream)),
835 stream->trace->path->str, stream->file_name->str);
836 status = BT_SELF_COMPONENT_STATUS_ERROR;
837 goto end;
838 }
839
491c35cc
PP
840 /*
841 * Discarded packets messages are guaranteed to occur between
842 * packets.
843 */
ffb5c13c 844 BT_ASSERT(!stream->packet_state.is_open);
15fe47e0 845
ffb5c13c 846 if (stream->sc->discarded_packets_has_ts) {
491c35cc
PP
847 /*
848 * Make the stream's state be in the time range of a
849 * discarded packets message since we have the message's
850 * time range (`stream->sc->discarded_packets_has_ts`).
851 */
ffb5c13c 852 stream->discarded_packets_state.in_range = true;
15fe47e0 853
15fe47e0
PP
854 /*
855 * The clock snapshot values will be validated when
491c35cc
PP
856 * handling the next packet beginning message (next call
857 * to handle_packet_beginning_msg()).
15fe47e0 858 */
9b24b6aa 859 cs = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
0cbc2c33 860 msg);
15fe47e0
PP
861 BT_ASSERT(cs);
862 stream->discarded_packets_state.beginning_cs =
863 bt_clock_snapshot_get_value(cs);
9b24b6aa 864 cs = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(
0cbc2c33 865 msg);
15fe47e0
PP
866 BT_ASSERT(cs);
867 stream->discarded_packets_state.end_cs =
868 bt_clock_snapshot_get_value(cs);
15fe47e0
PP
869 }
870
871 avail = bt_message_discarded_packets_get_count(msg, &count);
872 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
491c35cc
PP
873 /*
874 * There's no specific count of discarded packets: set
875 * it to 1 so that we know that we at least discarded
876 * something.
877 */
15fe47e0
PP
878 count = 1;
879 }
880
881 stream->packet_state.seq_num += count;
882
883end:
884 return status;
885}
886
887static inline
888void put_messages(bt_message_array_const msgs, uint64_t count)
889{
890 uint64_t i;
891
892 for (i = 0; i < count; i++) {
893 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
894 }
895}
896
897BT_HIDDEN
898bt_self_component_status ctf_fs_sink_consume(bt_self_component_sink *self_comp)
899{
900 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
901 struct fs_sink_comp *fs_sink;
902 bt_message_iterator_status it_status;
903 uint64_t msg_count = 0;
904 bt_message_array_const msgs;
905
906 fs_sink = bt_self_component_get_data(
907 bt_self_component_sink_as_self_component(self_comp));
908 BT_ASSERT(fs_sink);
909 BT_ASSERT(fs_sink->upstream_iter);
910
911 /* Consume messages */
912 it_status = bt_self_component_port_input_message_iterator_next(
913 fs_sink->upstream_iter, &msgs, &msg_count);
914 if (it_status < 0) {
915 status = BT_SELF_COMPONENT_STATUS_ERROR;
916 goto end;
917 }
918
919 switch (it_status) {
920 case BT_MESSAGE_ITERATOR_STATUS_OK:
921 {
922 uint64_t i;
923
924 for (i = 0; i < msg_count; i++) {
925 const bt_message *msg = msgs[i];
926
927 BT_ASSERT(msg);
928
929 switch (bt_message_get_type(msg)) {
930 case BT_MESSAGE_TYPE_EVENT:
931 status = handle_event_msg(fs_sink, msg);
932 break;
933 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
934 status = handle_packet_beginning_msg(
935 fs_sink, msg);
936 break;
937 case BT_MESSAGE_TYPE_PACKET_END:
938 status = handle_packet_end_msg(
939 fs_sink, msg);
940 break;
941 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
942 /* Ignore */
943 BT_LOGD_STR("Ignoring message iterator inactivity message.");
944 break;
945 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
946 status = handle_stream_beginning_msg(
947 fs_sink, msg);
948 break;
949 case BT_MESSAGE_TYPE_STREAM_END:
950 status = handle_stream_end_msg(
951 fs_sink, msg);
952 break;
953 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
954 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
955 /* Not supported by CTF 1.8 */
956 BT_LOGD_STR("Ignoring stream activity message.");
957 break;
958 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
959 status = handle_discarded_events_msg(
960 fs_sink, msg);
961 break;
962 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
963 status = handle_discarded_packets_msg(
964 fs_sink, msg);
965 break;
966 default:
967 abort();
968 }
969
970 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
971
972 if (status != BT_SELF_COMPONENT_STATUS_OK) {
973 BT_LOGE("Failed to handle message: "
974 "generated CTF traces could be incomplete: "
975 "output-dir-path=\"%s\"",
976 fs_sink->output_dir_path->str);
977 goto error;
978 }
979 }
980
981 break;
982 }
983 case BT_MESSAGE_ITERATOR_STATUS_AGAIN:
984 status = BT_SELF_COMPONENT_STATUS_AGAIN;
985 break;
986 case BT_MESSAGE_ITERATOR_STATUS_END:
987 /* TODO: Finalize all traces (should already be done?) */
988 status = BT_SELF_COMPONENT_STATUS_END;
989 break;
990 case BT_MESSAGE_ITERATOR_STATUS_NOMEM:
991 status = BT_SELF_COMPONENT_STATUS_NOMEM;
992 break;
993 case BT_MESSAGE_ITERATOR_STATUS_ERROR:
994 status = BT_SELF_COMPONENT_STATUS_NOMEM;
995 break;
996 default:
997 break;
998 }
999
1000 goto end;
1001
1002error:
1003 BT_ASSERT(status != BT_SELF_COMPONENT_STATUS_OK);
1004 put_messages(msgs, msg_count);
1005
1006end:
1007 return status;
1008}
1009
1010BT_HIDDEN
1011bt_self_component_status ctf_fs_sink_graph_is_configured(
1012 bt_self_component_sink *self_comp)
1013{
1014 bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
1015 struct fs_sink_comp *fs_sink = bt_self_component_get_data(
1016 bt_self_component_sink_as_self_component(self_comp));
1017
1018 fs_sink->upstream_iter =
1019 bt_self_component_port_input_message_iterator_create(
1020 bt_self_component_sink_borrow_input_port_by_name(
1021 self_comp, in_port_name));
1022 if (!fs_sink->upstream_iter) {
1023 status = BT_SELF_COMPONENT_STATUS_NOMEM;
1024 goto end;
1025 }
1026
1027end:
1028 return status;
1029}
1030
1031BT_HIDDEN
1032void ctf_fs_sink_finalize(bt_self_component_sink *self_comp)
1033{
1034 struct fs_sink_comp *fs_sink = bt_self_component_get_data(
1035 bt_self_component_sink_as_self_component(self_comp));
1036
1037 destroy_fs_sink_comp(fs_sink);
1038}
This page took 0.087021 seconds and 4 git commands to generate.