compat: fix compilation with !BABELTRACE_HAVE_OPEN_MEMSTREAM
[babeltrace.git] / src / plugins / ctf / fs-sink / fs-sink.c
CommitLineData
15fe47e0 1/*
0235b0db 2 * SPDX-License-Identifier: MIT
15fe47e0 3 *
0235b0db 4 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
15fe47e0
PP
5 */
6
aa1a7452 7#define BT_COMP_LOG_SELF_COMP (fs_sink->self_comp)
ffa3b2b3 8#define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level)
350ad6c1 9#define BT_LOG_TAG "PLUGIN/SINK.CTF.FS"
d9c39b0a 10#include "logging/comp-logging.h"
15fe47e0 11
3fadfbc0 12#include <babeltrace2/babeltrace.h>
15fe47e0
PP
13#include <stdio.h>
14#include <stdbool.h>
15#include <glib.h>
578e048b
MJ
16#include "common/assert.h"
17#include "ctfser/ctfser.h"
ce67f561 18#include "plugins/common/param-validation/param-validation.h"
15fe47e0
PP
19
20#include "fs-sink.h"
21#include "fs-sink-trace.h"
22#include "fs-sink-stream.h"
23#include "fs-sink-ctf-meta.h"
24#include "translate-trace-ir-to-ctf-ir.h"
25#include "translate-ctf-ir-to-tsdl.h"
26
27static
28const char * const in_port_name = "in";
29
30static
21a9f056 31bt_component_class_initialize_method_status ensure_output_dir_exists(
15fe47e0
PP
32 struct fs_sink_comp *fs_sink)
33{
21a9f056
FD
34 bt_component_class_initialize_method_status status =
35 BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
15fe47e0
PP
36 int ret;
37
38 ret = g_mkdir_with_parents(fs_sink->output_dir_path->str, 0755);
39 if (ret) {
05ad2afe 40 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(fs_sink->self_comp,
aa1a7452 41 "Cannot create directories for output directory",
15fe47e0
PP
42 ": output-dir-path=\"%s\"",
43 fs_sink->output_dir_path->str);
21a9f056 44 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
15fe47e0
PP
45 goto end;
46 }
47
48end:
49 return status;
50}
51
ce67f561
SM
52static struct bt_param_validation_map_value_entry_descr fs_sink_params_descr[] = {
53 { "path", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY, { .type = BT_VALUE_TYPE_STRING } },
54 { "assume-single-trace", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .type = BT_VALUE_TYPE_BOOL } },
55 { "ignore-discarded-events", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .type = BT_VALUE_TYPE_BOOL } },
56 { "ignore-discarded-packets", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .type = BT_VALUE_TYPE_BOOL } },
57 { "quiet", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .type = BT_VALUE_TYPE_BOOL } },
58 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END
59};
60
15fe47e0 61static
21a9f056 62bt_component_class_initialize_method_status
ce67f561 63configure_component(struct fs_sink_comp *fs_sink, const bt_value *params)
15fe47e0 64{
ce67f561 65 bt_component_class_initialize_method_status status;
15fe47e0 66 const bt_value *value;
ce67f561
SM
67 enum bt_param_validation_status validation_status;
68 gchar *validation_error;
15fe47e0 69
ce67f561
SM
70 validation_status = bt_param_validation_validate(params,
71 fs_sink_params_descr, &validation_error);
72 if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
21a9f056 73 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
05ad2afe
SM
74 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
75 "%s", validation_error);
15fe47e0 76 goto end;
ce67f561
SM
77 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
78 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
15fe47e0
PP
79 goto end;
80 }
81
ce67f561 82 value = bt_value_map_borrow_entry_value_const(params, "path");
15fe47e0
PP
83 g_string_assign(fs_sink->output_dir_path,
84 bt_value_string_get(value));
ce67f561 85
15fe47e0
PP
86 value = bt_value_map_borrow_entry_value_const(params,
87 "assume-single-trace");
88 if (value) {
15fe47e0
PP
89 fs_sink->assume_single_trace = (bool) bt_value_bool_get(value);
90 }
91
92 value = bt_value_map_borrow_entry_value_const(params,
93 "ignore-discarded-events");
94 if (value) {
15fe47e0
PP
95 fs_sink->ignore_discarded_events =
96 (bool) bt_value_bool_get(value);
97 }
98
99 value = bt_value_map_borrow_entry_value_const(params,
100 "ignore-discarded-packets");
101 if (value) {
15fe47e0
PP
102 fs_sink->ignore_discarded_packets =
103 (bool) bt_value_bool_get(value);
104 }
105
106 value = bt_value_map_borrow_entry_value_const(params,
107 "quiet");
108 if (value) {
15fe47e0
PP
109 fs_sink->quiet = (bool) bt_value_bool_get(value);
110 }
111
ce67f561
SM
112 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
113
15fe47e0 114end:
ce67f561 115 g_free(validation_error);
15fe47e0
PP
116 return status;
117}
118
119static
120void destroy_fs_sink_comp(struct fs_sink_comp *fs_sink)
121{
122 if (!fs_sink) {
123 goto end;
124 }
125
126 if (fs_sink->output_dir_path) {
127 g_string_free(fs_sink->output_dir_path, TRUE);
128 fs_sink->output_dir_path = NULL;
129 }
130
131 if (fs_sink->traces) {
132 g_hash_table_destroy(fs_sink->traces);
133 fs_sink->traces = NULL;
134 }
135
9a2c8b8e 136 BT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(
15fe47e0
PP
137 fs_sink->upstream_iter);
138 g_free(fs_sink);
139
140end:
141 return;
142}
143
144BT_HIDDEN
21a9f056 145bt_component_class_initialize_method_status ctf_fs_sink_init(
59225a3e
SM
146 bt_self_component_sink *self_comp_sink,
147 bt_self_component_sink_configuration *config,
148 const bt_value *params,
15fe47e0
PP
149 void *init_method_data)
150{
05ad2afe 151 bt_component_class_initialize_method_status status;
d24d5663 152 bt_self_component_add_port_status add_port_status;
15fe47e0 153 struct fs_sink_comp *fs_sink = NULL;
ffa3b2b3
PP
154 bt_self_component *self_comp =
155 bt_self_component_sink_as_self_component(self_comp_sink);
156 bt_logging_level log_level = bt_component_get_logging_level(
157 bt_self_component_as_component(self_comp));
15fe47e0
PP
158
159 fs_sink = g_new0(struct fs_sink_comp, 1);
160 if (!fs_sink) {
aa1a7452 161 BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR, log_level, self_comp,
ffa3b2b3 162 "Failed to allocate one CTF FS sink structure.");
05ad2afe
SM
163 BT_CURRENT_THREAD_ERROR_APPEND_CAUSE_FROM_COMPONENT(
164 self_comp, "Failed to allocate one CTF FS sink structure.");
21a9f056 165 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
15fe47e0
PP
166 goto end;
167 }
168
ffa3b2b3 169 fs_sink->log_level = log_level;
aa1a7452 170 fs_sink->self_comp = self_comp;
15fe47e0 171 fs_sink->output_dir_path = g_string_new(NULL);
15fe47e0 172 status = configure_component(fs_sink, params);
21a9f056 173 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
15fe47e0
PP
174 /* configure_component() logs errors */
175 goto end;
176 }
177
178 if (fs_sink->assume_single_trace &&
179 g_file_test(fs_sink->output_dir_path->str,
180 G_FILE_TEST_EXISTS)) {
05ad2afe
SM
181 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
182 "Single trace mode, but output path exists: output-path=\"%s\"",
183 fs_sink->output_dir_path->str);
21a9f056 184 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
15fe47e0
PP
185 goto end;
186 }
187
188 status = ensure_output_dir_exists(fs_sink);
21a9f056 189 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
15fe47e0
PP
190 /* ensure_output_dir_exists() logs errors */
191 goto end;
192 }
193
194 fs_sink->traces = g_hash_table_new_full(g_direct_hash, g_direct_equal,
195 NULL, (GDestroyNotify) fs_sink_trace_destroy);
196 if (!fs_sink->traces) {
05ad2afe 197 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate one GHashTable.");
21a9f056 198 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
15fe47e0
PP
199 goto end;
200 }
201
d24d5663
PP
202 add_port_status = bt_self_component_sink_add_input_port(
203 self_comp_sink, in_port_name, NULL, NULL);
05ad2afe
SM
204 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
205 status = (int) add_port_status;
206 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to add input port.");
15fe47e0
PP
207 goto end;
208 }
209
ffa3b2b3 210 bt_self_component_set_data(self_comp, fs_sink);
15fe47e0
PP
211
212end:
21a9f056 213 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
15fe47e0
PP
214 destroy_fs_sink_comp(fs_sink);
215 }
216
217 return status;
218}
219
220static inline
221struct fs_sink_stream *borrow_stream(struct fs_sink_comp *fs_sink,
222 const bt_stream *ir_stream)
223{
224 const bt_trace *ir_trace = bt_stream_borrow_trace_const(ir_stream);
225 struct fs_sink_trace *trace;
226 struct fs_sink_stream *stream = NULL;
227
228 trace = g_hash_table_lookup(fs_sink->traces, ir_trace);
91d81473 229 if (G_UNLIKELY(!trace)) {
15fe47e0
PP
230 if (fs_sink->assume_single_trace &&
231 g_hash_table_size(fs_sink->traces) > 0) {
05ad2afe
SM
232 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
233 "Single trace mode, but getting more than one trace: "
15fe47e0
PP
234 "stream-name=\"%s\"",
235 bt_stream_get_name(ir_stream));
236 goto end;
237 }
238
239 trace = fs_sink_trace_create(fs_sink, ir_trace);
240 if (!trace) {
241 goto end;
242 }
243 }
244
245 stream = g_hash_table_lookup(trace->streams, ir_stream);
91d81473 246 if (G_UNLIKELY(!stream)) {
15fe47e0
PP
247 stream = fs_sink_stream_create(trace, ir_stream);
248 if (!stream) {
249 goto end;
250 }
251 }
252
253end:
254 return stream;
255}
256
257static inline
d24d5663
PP
258bt_component_class_sink_consume_method_status handle_event_msg(
259 struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0
PP
260{
261 int ret;
d24d5663
PP
262 bt_component_class_sink_consume_method_status status =
263 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
15fe47e0
PP
264 const bt_event *ir_event = bt_message_event_borrow_event_const(msg);
265 const bt_stream *ir_stream = bt_event_borrow_stream_const(ir_event);
266 struct fs_sink_stream *stream;
267 struct fs_sink_ctf_event_class *ec = NULL;
268 const bt_clock_snapshot *cs = NULL;
269
270 stream = borrow_stream(fs_sink, ir_stream);
91d81473 271 if (G_UNLIKELY(!stream)) {
05ad2afe
SM
272 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
273 "Failed to borrow stream.");
d24d5663 274 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
275 goto end;
276 }
277
aa1a7452
PP
278 ret = try_translate_event_class_trace_ir_to_ctf_ir(fs_sink,
279 stream->sc, bt_event_borrow_class_const(ir_event), &ec);
15fe47e0 280 if (ret) {
05ad2afe
SM
281 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
282 "Failed to translate event class to CTF IR.");
d24d5663 283 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
284 goto end;
285 }
286
98b15851 287 BT_ASSERT_DBG(ec);
15fe47e0
PP
288
289 if (stream->sc->default_clock_class) {
0cbc2c33
PP
290 cs = bt_message_event_borrow_default_clock_snapshot_const(
291 msg);
15fe47e0
PP
292 }
293
26fc5aed
PP
294 /*
295 * If this event's stream does not support packets, then we
296 * lazily create artificial packets.
297 *
298 * The size of an artificial packet is arbitrarily at least
299 * 4 MiB (it usually is greater because we close it when
300 * comes the time to write a new event and the packet's content
301 * size is >= 4 MiB), except the last one which can be smaller.
302 */
303 if (G_UNLIKELY(!stream->sc->has_packets)) {
304 if (stream->packet_state.is_open &&
305 bt_ctfser_get_offset_in_current_packet_bits(&stream->ctfser) / 8 >=
306 4 * 1024 * 1024) {
307 /*
308 * Stream's current packet is larger than 4 MiB:
309 * close it. A new packet will be opened just
310 * below.
311 */
312 ret = fs_sink_stream_close_packet(stream, NULL);
313 if (ret) {
05ad2afe
SM
314 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
315 "Failed to close packet.");
26fc5aed
PP
316 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
317 goto end;
318 }
319 }
320
321 if (!stream->packet_state.is_open) {
322 /* Stream's packet is not currently opened: open it */
323 ret = fs_sink_stream_open_packet(stream, NULL, NULL);
324 if (ret) {
05ad2afe
SM
325 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
326 "Failed to open packet.");
26fc5aed
PP
327 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
328 goto end;
329 }
330 }
331 }
332
98b15851 333 BT_ASSERT_DBG(stream->packet_state.is_open);
15fe47e0 334 ret = fs_sink_stream_write_event(stream, cs, ir_event, ec);
91d81473 335 if (G_UNLIKELY(ret)) {
05ad2afe
SM
336 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
337 "Failed to write event.");
d24d5663 338 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
339 goto end;
340 }
341
342end:
343 return status;
344}
345
346static inline
d24d5663 347bt_component_class_sink_consume_method_status handle_packet_beginning_msg(
15fe47e0
PP
348 struct fs_sink_comp *fs_sink, const bt_message *msg)
349{
350 int ret;
d24d5663
PP
351 bt_component_class_sink_consume_method_status status =
352 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
15fe47e0
PP
353 const bt_packet *ir_packet =
354 bt_message_packet_beginning_borrow_packet_const(msg);
355 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
356 struct fs_sink_stream *stream;
357 const bt_clock_snapshot *cs = NULL;
358
359 stream = borrow_stream(fs_sink, ir_stream);
91d81473 360 if (G_UNLIKELY(!stream)) {
05ad2afe
SM
361 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
362 "Failed to borrow stream.");
d24d5663 363 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
364 goto end;
365 }
366
ffb5c13c 367 if (stream->sc->packets_have_ts_begin) {
0cbc2c33
PP
368 cs = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
369 msg);
15fe47e0
PP
370 BT_ASSERT(cs);
371 }
372
491c35cc
PP
373 /*
374 * If we previously received a discarded events message with
375 * a time range, make sure that its beginning time matches what's
376 * expected for CTF 1.8, that is:
377 *
378 * * Its beginning time is the previous packet's end
379 * time (or the current packet's beginning time if
380 * this is the first packet).
381 *
382 * We check this here instead of in handle_packet_end_msg()
383 * because we want to catch any incompatible message as early as
384 * possible to report the error.
385 *
386 * Validation of the discarded events message's end time is
387 * performed in handle_packet_end_msg().
388 */
15fe47e0 389 if (stream->discarded_events_state.in_range) {
ffb5c13c
PP
390 uint64_t expected_cs;
391
491c35cc
PP
392 /*
393 * `stream->discarded_events_state.in_range` is only set
394 * when the stream class's discarded events have a time
395 * range.
396 *
397 * It is required that the packet beginning and end
398 * messages for this stream class have times when
399 * discarded events have a time range.
400 */
ffb5c13c
PP
401 BT_ASSERT(stream->sc->discarded_events_has_ts);
402 BT_ASSERT(stream->sc->packets_have_ts_begin);
403 BT_ASSERT(stream->sc->packets_have_ts_end);
404
ffb5c13c
PP
405 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
406 /* We're opening the first packet */
407 expected_cs = bt_clock_snapshot_get_value(cs);
408 } else {
409 expected_cs = stream->prev_packet_state.end_cs;
15fe47e0 410 }
15fe47e0 411
ffb5c13c
PP
412 if (stream->discarded_events_state.beginning_cs !=
413 expected_cs) {
05ad2afe
SM
414 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
415 "Incompatible discarded events message: "
ffb5c13c
PP
416 "unexpected beginning time: "
417 "beginning-cs-val=%" PRIu64 ", "
418 "expected-beginning-cs-val=%" PRIu64 ", "
15fe47e0
PP
419 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
420 "trace-name=\"%s\", path=\"%s/%s\"",
ffb5c13c
PP
421 stream->discarded_events_state.beginning_cs,
422 expected_cs,
15fe47e0
PP
423 bt_stream_get_id(ir_stream),
424 bt_stream_get_name(ir_stream),
425 bt_trace_get_name(
426 bt_stream_borrow_trace_const(ir_stream)),
427 stream->trace->path->str, stream->file_name->str);
d24d5663 428 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
429 goto end;
430 }
ffb5c13c
PP
431 }
432
491c35cc
PP
433 /*
434 * If we previously received a discarded packets message with a
435 * time range, make sure that its beginning and end times match
436 * what's expected for CTF 1.8, that is:
437 *
438 * * Its beginning time is the previous packet's end time.
439 *
440 * * Its end time is the current packet's beginning time.
441 */
ffb5c13c
PP
442 if (stream->discarded_packets_state.in_range) {
443 uint64_t expected_end_cs;
444
491c35cc
PP
445 /*
446 * `stream->discarded_packets_state.in_range` is only
447 * set when the stream class's discarded packets have a
448 * time range.
449 *
450 * It is required that the packet beginning and end
451 * messages for this stream class have times when
452 * discarded packets have a time range.
453 */
ffb5c13c
PP
454 BT_ASSERT(stream->sc->discarded_packets_has_ts);
455 BT_ASSERT(stream->sc->packets_have_ts_begin);
456 BT_ASSERT(stream->sc->packets_have_ts_end);
15fe47e0
PP
457
458 /*
491c35cc
PP
459 * It is not supported to have a discarded packets
460 * message _before_ the first packet: we cannot validate
461 * that its beginning time is compatible with CTF 1.8 in
462 * this case.
15fe47e0 463 */
ffb5c13c 464 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
05ad2afe
SM
465 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
466 "Incompatible discarded packets message "
118ae153 467 "occurring before the stream's first packet: "
ffb5c13c
PP
468 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
469 "trace-name=\"%s\", path=\"%s/%s\"",
470 bt_stream_get_id(ir_stream),
471 bt_stream_get_name(ir_stream),
472 bt_trace_get_name(
473 bt_stream_borrow_trace_const(ir_stream)),
474 stream->trace->path->str, stream->file_name->str);
d24d5663 475 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
ffb5c13c
PP
476 goto end;
477 }
478
479 if (stream->discarded_packets_state.beginning_cs !=
480 stream->prev_packet_state.end_cs) {
05ad2afe
SM
481 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
482 "Incompatible discarded packets message: "
ffb5c13c
PP
483 "unexpected beginning time: "
484 "beginning-cs-val=%" PRIu64 ", "
485 "expected-beginning-cs-val=%" PRIu64 ", "
486 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
487 "trace-name=\"%s\", path=\"%s/%s\"",
488 stream->discarded_packets_state.beginning_cs,
489 stream->prev_packet_state.end_cs,
490 bt_stream_get_id(ir_stream),
491 bt_stream_get_name(ir_stream),
492 bt_trace_get_name(
493 bt_stream_borrow_trace_const(ir_stream)),
494 stream->trace->path->str, stream->file_name->str);
d24d5663 495 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
ffb5c13c 496 goto end;
15fe47e0 497 }
ffb5c13c
PP
498
499 expected_end_cs = bt_clock_snapshot_get_value(cs);
500
501 if (stream->discarded_packets_state.end_cs !=
502 expected_end_cs) {
05ad2afe
SM
503 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
504 "Incompatible discarded packets message: "
ffb5c13c
PP
505 "unexpected end time: "
506 "end-cs-val=%" PRIu64 ", "
507 "expected-end-cs-val=%" PRIu64 ", "
508 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
509 "trace-name=\"%s\", path=\"%s/%s\"",
510 stream->discarded_packets_state.end_cs,
511 expected_end_cs,
512 bt_stream_get_id(ir_stream),
513 bt_stream_get_name(ir_stream),
514 bt_trace_get_name(
515 bt_stream_borrow_trace_const(ir_stream)),
516 stream->trace->path->str, stream->file_name->str);
d24d5663 517 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
ffb5c13c
PP
518 goto end;
519 }
520 }
521
491c35cc
PP
522 /*
523 * We're not in a discarded packets time range anymore since we
524 * require that the discarded packets time ranges go from one
525 * packet's end time to the next packet's beginning time, and
526 * we're handling a packet beginning message here.
527 */
528 stream->discarded_packets_state.in_range = false;
15fe47e0 529
15fe47e0
PP
530 ret = fs_sink_stream_open_packet(stream, cs, ir_packet);
531 if (ret) {
05ad2afe
SM
532 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
533 "Failed to open packet.");
d24d5663 534 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
535 goto end;
536 }
537
538end:
539 return status;
540}
541
542static inline
d24d5663 543bt_component_class_sink_consume_method_status handle_packet_end_msg(
15fe47e0
PP
544 struct fs_sink_comp *fs_sink, const bt_message *msg)
545{
546 int ret;
d24d5663
PP
547 bt_component_class_sink_consume_method_status status =
548 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
15fe47e0
PP
549 const bt_packet *ir_packet =
550 bt_message_packet_end_borrow_packet_const(msg);
551 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
552 struct fs_sink_stream *stream;
553 const bt_clock_snapshot *cs = NULL;
554
555 stream = borrow_stream(fs_sink, ir_stream);
91d81473 556 if (G_UNLIKELY(!stream)) {
05ad2afe
SM
557 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
558 "Failed to borrow stream.");
d24d5663 559 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
560 goto end;
561 }
562
ffb5c13c 563 if (stream->sc->packets_have_ts_end) {
0cbc2c33
PP
564 cs = bt_message_packet_end_borrow_default_clock_snapshot_const(
565 msg);
15fe47e0
PP
566 BT_ASSERT(cs);
567 }
568
491c35cc
PP
569 /*
570 * If we previously received a discarded events message with
571 * a time range, make sure that its end time matches what's
572 * expected for CTF 1.8, that is:
573 *
574 * * Its end time is the current packet's end time.
575 *
576 * Validation of the discarded events message's beginning time
577 * is performed in handle_packet_beginning_msg().
578 */
15fe47e0 579 if (stream->discarded_events_state.in_range) {
ffb5c13c
PP
580 uint64_t expected_cs;
581
491c35cc
PP
582 /*
583 * `stream->discarded_events_state.in_range` is only set
584 * when the stream class's discarded events have a time
585 * range.
586 *
587 * It is required that the packet beginning and end
588 * messages for this stream class have times when
589 * discarded events have a time range.
590 */
ffb5c13c
PP
591 BT_ASSERT(stream->sc->discarded_events_has_ts);
592 BT_ASSERT(stream->sc->packets_have_ts_begin);
593 BT_ASSERT(stream->sc->packets_have_ts_end);
594
ffb5c13c
PP
595 expected_cs = bt_clock_snapshot_get_value(cs);
596
597 if (stream->discarded_events_state.end_cs != expected_cs) {
05ad2afe
SM
598 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
599 "Incompatible discarded events message: "
ffb5c13c
PP
600 "unexpected end time: "
601 "end-cs-val=%" PRIu64 ", "
602 "expected-end-cs-val=%" PRIu64 ", "
603 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
604 "trace-name=\"%s\", path=\"%s/%s\"",
605 stream->discarded_events_state.end_cs,
606 expected_cs,
607 bt_stream_get_id(ir_stream),
608 bt_stream_get_name(ir_stream),
609 bt_trace_get_name(
610 bt_stream_borrow_trace_const(ir_stream)),
611 stream->trace->path->str, stream->file_name->str);
d24d5663 612 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
ffb5c13c 613 goto end;
15fe47e0
PP
614 }
615 }
616
617 ret = fs_sink_stream_close_packet(stream, cs);
618 if (ret) {
05ad2afe
SM
619 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
620 "Failed to close packet.");
d24d5663 621 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
622 goto end;
623 }
624
491c35cc
PP
625 /*
626 * We're not in a discarded events time range anymore since we
627 * require that the discarded events time ranges go from one
628 * packet's end time to the next packet's end time, and we're
629 * handling a packet end message here.
630 */
631 stream->discarded_events_state.in_range = false;
15fe47e0
PP
632
633end:
634 return status;
635}
636
637static inline
d24d5663 638bt_component_class_sink_consume_method_status handle_stream_beginning_msg(
15fe47e0
PP
639 struct fs_sink_comp *fs_sink, const bt_message *msg)
640{
d24d5663
PP
641 bt_component_class_sink_consume_method_status status =
642 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
15fe47e0
PP
643 const bt_stream *ir_stream =
644 bt_message_stream_beginning_borrow_stream_const(msg);
649934d2
PP
645 const bt_stream_class *ir_sc =
646 bt_stream_borrow_class_const(ir_stream);
15fe47e0 647 struct fs_sink_stream *stream;
ffb5c13c 648 bool packets_have_beginning_end_cs =
9b24b6aa
PP
649 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc) &&
650 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc);
15fe47e0 651
26fc5aed
PP
652 /*
653 * Not supported: discarded events or discarded packets support
654 * without packets support. Packets are the way to know where
118ae153 655 * discarded events/packets occurred in CTF 1.8.
26fc5aed
PP
656 */
657 if (!bt_stream_class_supports_packets(ir_sc)) {
658 BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc));
659
660 if (!fs_sink->ignore_discarded_events &&
661 bt_stream_class_supports_discarded_events(ir_sc)) {
05ad2afe
SM
662 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
663 "Unsupported stream: "
26fc5aed
PP
664 "stream does not support packets, "
665 "but supports discarded events: "
666 "stream-addr=%p, "
667 "stream-id=%" PRIu64 ", "
668 "stream-name=\"%s\"",
669 ir_stream, bt_stream_get_id(ir_stream),
670 bt_stream_get_name(ir_stream));
671 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
672 goto end;
673 }
674 }
675
649934d2 676 /*
ffb5c13c
PP
677 * Not supported: discarded events with default clock snapshots,
678 * but packet beginning/end without default clock snapshot.
649934d2 679 */
ffb5c13c
PP
680 if (!fs_sink->ignore_discarded_events &&
681 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc) &&
682 !packets_have_beginning_end_cs) {
05ad2afe
SM
683 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
684 "Unsupported stream: discarded events have "
ffb5c13c
PP
685 "default clock snapshots, but packets have no "
686 "beginning and/or end default clock snapshots: "
687 "stream-addr=%p, "
688 "stream-id=%" PRIu64 ", "
689 "stream-name=\"%s\"",
690 ir_stream, bt_stream_get_id(ir_stream),
691 bt_stream_get_name(ir_stream));
d24d5663 692 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
ffb5c13c
PP
693 goto end;
694 }
2e90378a 695
ffb5c13c
PP
696 /*
697 * Not supported: discarded packets with default clock
698 * snapshots, but packet beginning/end without default clock
699 * snapshot.
700 */
701 if (!fs_sink->ignore_discarded_packets &&
702 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc) &&
703 !packets_have_beginning_end_cs) {
05ad2afe
SM
704 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
705 "Unsupported stream: discarded packets have "
ffb5c13c
PP
706 "default clock snapshots, but packets have no "
707 "beginning and/or end default clock snapshots: "
708 "stream-addr=%p, "
709 "stream-id=%" PRIu64 ", "
710 "stream-name=\"%s\"",
711 ir_stream, bt_stream_get_id(ir_stream),
712 bt_stream_get_name(ir_stream));
d24d5663 713 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
ffb5c13c 714 goto end;
649934d2
PP
715 }
716
15fe47e0
PP
717 stream = borrow_stream(fs_sink, ir_stream);
718 if (!stream) {
05ad2afe
SM
719 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
720 "Failed to borrow stream.");
d24d5663 721 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
722 goto end;
723 }
724
aa1a7452 725 BT_COMP_LOGI("Created new, empty stream file: "
15fe47e0
PP
726 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
727 "trace-name=\"%s\", path=\"%s/%s\"",
728 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
729 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
730 stream->trace->path->str, stream->file_name->str);
731
732end:
733 return status;
734}
735
736static inline
d24d5663
PP
737bt_component_class_sink_consume_method_status handle_stream_end_msg(
738 struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 739{
d24d5663
PP
740 bt_component_class_sink_consume_method_status status =
741 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
15fe47e0
PP
742 const bt_stream *ir_stream =
743 bt_message_stream_end_borrow_stream_const(msg);
744 struct fs_sink_stream *stream;
745
746 stream = borrow_stream(fs_sink, ir_stream);
747 if (!stream) {
05ad2afe
SM
748 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
749 "Failed to borrow stream.");
d24d5663 750 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
751 goto end;
752 }
753
26fc5aed
PP
754 if (G_UNLIKELY(!stream->sc->has_packets &&
755 stream->packet_state.is_open)) {
756 /* Close stream's current artificial packet */
757 int ret = fs_sink_stream_close_packet(stream, NULL);
758
759 if (ret) {
05ad2afe
SM
760 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
761 "Failed to close packet.");
26fc5aed
PP
762 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
763 goto end;
764 }
765 }
766
aa1a7452 767 BT_COMP_LOGI("Closing stream file: "
15fe47e0
PP
768 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
769 "trace-name=\"%s\", path=\"%s/%s\"",
770 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
771 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
772 stream->trace->path->str, stream->file_name->str);
773
774 /*
775 * This destroys the stream object and frees all its resources,
776 * closing the stream file.
777 */
778 g_hash_table_remove(stream->trace->streams, ir_stream);
779
780end:
781 return status;
782}
783
784static inline
d24d5663 785bt_component_class_sink_consume_method_status handle_discarded_events_msg(
15fe47e0
PP
786 struct fs_sink_comp *fs_sink, const bt_message *msg)
787{
d24d5663
PP
788 bt_component_class_sink_consume_method_status status =
789 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
15fe47e0
PP
790 const bt_stream *ir_stream =
791 bt_message_discarded_events_borrow_stream_const(msg);
792 struct fs_sink_stream *stream;
793 const bt_clock_snapshot *cs = NULL;
794 bt_property_availability avail;
795 uint64_t count;
796
797 stream = borrow_stream(fs_sink, ir_stream);
798 if (!stream) {
05ad2afe
SM
799 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
800 "Failed to borrow stream.");
d24d5663 801 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
802 goto end;
803 }
804
805 if (fs_sink->ignore_discarded_events) {
aa1a7452 806 BT_COMP_LOGI("Ignoring discarded events message: "
15fe47e0
PP
807 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
808 "trace-name=\"%s\", path=\"%s/%s\"",
809 bt_stream_get_id(ir_stream),
810 bt_stream_get_name(ir_stream),
811 bt_trace_get_name(
812 bt_stream_borrow_trace_const(ir_stream)),
813 stream->trace->path->str, stream->file_name->str);
814 goto end;
815 }
816
817 if (stream->discarded_events_state.in_range) {
05ad2afe
SM
818 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
819 "Unsupported contiguous discarded events message: "
15fe47e0
PP
820 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
821 "trace-name=\"%s\", path=\"%s/%s\"",
822 bt_stream_get_id(ir_stream),
823 bt_stream_get_name(ir_stream),
824 bt_trace_get_name(
825 bt_stream_borrow_trace_const(ir_stream)),
826 stream->trace->path->str, stream->file_name->str);
d24d5663 827 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
828 goto end;
829 }
830
491c35cc
PP
831 /*
832 * If we're currently in an opened packet (got a packet
833 * beginning message, but no packet end message yet), we do not
834 * support having a discarded events message with a time range
835 * because we require that the discarded events message's time
836 * range go from a packet's end time to the next packet's end
837 * time.
838 */
ffb5c13c
PP
839 if (stream->packet_state.is_open &&
840 stream->sc->discarded_events_has_ts) {
05ad2afe
SM
841 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
842 "Unsupported discarded events message with "
118ae153 843 "default clock snapshots occurring within a packet: "
15fe47e0
PP
844 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
845 "trace-name=\"%s\", path=\"%s/%s\"",
846 bt_stream_get_id(ir_stream),
847 bt_stream_get_name(ir_stream),
848 bt_trace_get_name(
849 bt_stream_borrow_trace_const(ir_stream)),
850 stream->trace->path->str, stream->file_name->str);
d24d5663 851 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
852 goto end;
853 }
854
ffb5c13c 855 if (stream->sc->discarded_events_has_ts) {
491c35cc
PP
856 /*
857 * Make the stream's state be in the time range of a
858 * discarded events message since we have the message's
859 * time range (`stream->sc->discarded_events_has_ts`).
860 */
ffb5c13c 861 stream->discarded_events_state.in_range = true;
15fe47e0 862
15fe47e0
PP
863 /*
864 * The clock snapshot values will be validated when
491c35cc
PP
865 * handling the next packet beginning and end messages
866 * (next calls to handle_packet_beginning_msg() and
867 * handle_packet_end_msg()).
15fe47e0 868 */
9b24b6aa 869 cs = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
0cbc2c33 870 msg);
15fe47e0
PP
871 BT_ASSERT(cs);
872 stream->discarded_events_state.beginning_cs =
873 bt_clock_snapshot_get_value(cs);
9b24b6aa 874 cs = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(
0cbc2c33 875 msg);
15fe47e0 876 BT_ASSERT(cs);
ffb5c13c 877 stream->discarded_events_state.end_cs = bt_clock_snapshot_get_value(cs);
15fe47e0
PP
878 }
879
880 avail = bt_message_discarded_events_get_count(msg, &count);
881 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
491c35cc
PP
882 /*
883 * There's no specific count of discarded events: set it
884 * to 1 so that we know that we at least discarded
885 * something.
886 */
15fe47e0
PP
887 count = 1;
888 }
889
890 stream->packet_state.discarded_events_counter += count;
891
892end:
893 return status;
894}
895
896static inline
d24d5663 897bt_component_class_sink_consume_method_status handle_discarded_packets_msg(
15fe47e0
PP
898 struct fs_sink_comp *fs_sink, const bt_message *msg)
899{
d24d5663
PP
900 bt_component_class_sink_consume_method_status status =
901 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
15fe47e0
PP
902 const bt_stream *ir_stream =
903 bt_message_discarded_packets_borrow_stream_const(msg);
904 struct fs_sink_stream *stream;
905 const bt_clock_snapshot *cs = NULL;
906 bt_property_availability avail;
907 uint64_t count;
908
909 stream = borrow_stream(fs_sink, ir_stream);
910 if (!stream) {
05ad2afe
SM
911 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
912 "Failed to borrow stream.");
d24d5663 913 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
914 goto end;
915 }
916
917 if (fs_sink->ignore_discarded_packets) {
aa1a7452 918 BT_COMP_LOGI("Ignoring discarded packets message: "
15fe47e0
PP
919 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
920 "trace-name=\"%s\", path=\"%s/%s\"",
921 bt_stream_get_id(ir_stream),
922 bt_stream_get_name(ir_stream),
923 bt_trace_get_name(
924 bt_stream_borrow_trace_const(ir_stream)),
925 stream->trace->path->str, stream->file_name->str);
926 goto end;
927 }
928
929 if (stream->discarded_packets_state.in_range) {
05ad2afe
SM
930 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
931 "Unsupported contiguous discarded packets message: "
15fe47e0
PP
932 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
933 "trace-name=\"%s\", path=\"%s/%s\"",
934 bt_stream_get_id(ir_stream),
935 bt_stream_get_name(ir_stream),
936 bt_trace_get_name(
937 bt_stream_borrow_trace_const(ir_stream)),
938 stream->trace->path->str, stream->file_name->str);
d24d5663 939 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
940 goto end;
941 }
942
491c35cc
PP
943 /*
944 * Discarded packets messages are guaranteed to occur between
945 * packets.
946 */
ffb5c13c 947 BT_ASSERT(!stream->packet_state.is_open);
15fe47e0 948
ffb5c13c 949 if (stream->sc->discarded_packets_has_ts) {
491c35cc
PP
950 /*
951 * Make the stream's state be in the time range of a
952 * discarded packets message since we have the message's
953 * time range (`stream->sc->discarded_packets_has_ts`).
954 */
ffb5c13c 955 stream->discarded_packets_state.in_range = true;
15fe47e0 956
15fe47e0
PP
957 /*
958 * The clock snapshot values will be validated when
491c35cc
PP
959 * handling the next packet beginning message (next call
960 * to handle_packet_beginning_msg()).
15fe47e0 961 */
9b24b6aa 962 cs = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
0cbc2c33 963 msg);
15fe47e0
PP
964 BT_ASSERT(cs);
965 stream->discarded_packets_state.beginning_cs =
966 bt_clock_snapshot_get_value(cs);
9b24b6aa 967 cs = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(
0cbc2c33 968 msg);
15fe47e0
PP
969 BT_ASSERT(cs);
970 stream->discarded_packets_state.end_cs =
971 bt_clock_snapshot_get_value(cs);
15fe47e0
PP
972 }
973
974 avail = bt_message_discarded_packets_get_count(msg, &count);
975 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
491c35cc
PP
976 /*
977 * There's no specific count of discarded packets: set
978 * it to 1 so that we know that we at least discarded
979 * something.
980 */
15fe47e0
PP
981 count = 1;
982 }
983
984 stream->packet_state.seq_num += count;
985
986end:
987 return status;
988}
989
990static inline
991void put_messages(bt_message_array_const msgs, uint64_t count)
992{
993 uint64_t i;
994
995 for (i = 0; i < count; i++) {
996 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
997 }
998}
999
1000BT_HIDDEN
d24d5663
PP
1001bt_component_class_sink_consume_method_status ctf_fs_sink_consume(
1002 bt_self_component_sink *self_comp)
15fe47e0 1003{
d24d5663
PP
1004 bt_component_class_sink_consume_method_status status =
1005 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
15fe47e0 1006 struct fs_sink_comp *fs_sink;
d24d5663 1007 bt_message_iterator_next_status next_status;
15fe47e0
PP
1008 uint64_t msg_count = 0;
1009 bt_message_array_const msgs;
1010
1011 fs_sink = bt_self_component_get_data(
1012 bt_self_component_sink_as_self_component(self_comp));
98b15851
PP
1013 BT_ASSERT_DBG(fs_sink);
1014 BT_ASSERT_DBG(fs_sink->upstream_iter);
15fe47e0
PP
1015
1016 /* Consume messages */
9a2c8b8e 1017 next_status = bt_message_iterator_next(
15fe47e0 1018 fs_sink->upstream_iter, &msgs, &msg_count);
d24d5663
PP
1019 if (next_status < 0) {
1020 status = (int) next_status;
05ad2afe
SM
1021 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
1022 "Failed to get next message from upstream iterator.");
15fe47e0
PP
1023 goto end;
1024 }
1025
d24d5663
PP
1026 switch (next_status) {
1027 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK:
15fe47e0
PP
1028 {
1029 uint64_t i;
1030
1031 for (i = 0; i < msg_count; i++) {
1032 const bt_message *msg = msgs[i];
1033
98b15851 1034 BT_ASSERT_DBG(msg);
15fe47e0
PP
1035
1036 switch (bt_message_get_type(msg)) {
1037 case BT_MESSAGE_TYPE_EVENT:
1038 status = handle_event_msg(fs_sink, msg);
1039 break;
1040 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
1041 status = handle_packet_beginning_msg(
1042 fs_sink, msg);
1043 break;
1044 case BT_MESSAGE_TYPE_PACKET_END:
1045 status = handle_packet_end_msg(
1046 fs_sink, msg);
1047 break;
1048 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
1049 /* Ignore */
aa1a7452 1050 BT_COMP_LOGD_STR("Ignoring message iterator inactivity message.");
15fe47e0
PP
1051 break;
1052 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1053 status = handle_stream_beginning_msg(
1054 fs_sink, msg);
1055 break;
1056 case BT_MESSAGE_TYPE_STREAM_END:
1057 status = handle_stream_end_msg(
1058 fs_sink, msg);
1059 break;
15fe47e0
PP
1060 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1061 status = handle_discarded_events_msg(
1062 fs_sink, msg);
1063 break;
1064 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1065 status = handle_discarded_packets_msg(
1066 fs_sink, msg);
1067 break;
1068 default:
498e7994 1069 bt_common_abort();
15fe47e0
PP
1070 }
1071
1072 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
1073
d24d5663 1074 if (status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK) {
05ad2afe
SM
1075 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
1076 "Failed to handle message: "
15fe47e0
PP
1077 "generated CTF traces could be incomplete: "
1078 "output-dir-path=\"%s\"",
1079 fs_sink->output_dir_path->str);
1080 goto error;
1081 }
1082 }
1083
1084 break;
1085 }
d24d5663
PP
1086 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN:
1087 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN;
15fe47e0 1088 break;
d24d5663 1089 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END:
15fe47e0 1090 /* TODO: Finalize all traces (should already be done?) */
d24d5663 1091 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END;
15fe47e0 1092 break;
15fe47e0
PP
1093 default:
1094 break;
1095 }
1096
1097 goto end;
1098
1099error:
d24d5663 1100 BT_ASSERT(status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK);
15fe47e0
PP
1101 put_messages(msgs, msg_count);
1102
1103end:
1104 return status;
1105}
1106
1107BT_HIDDEN
e803df70
SM
1108bt_component_class_sink_graph_is_configured_method_status
1109ctf_fs_sink_graph_is_configured(
15fe47e0
PP
1110 bt_self_component_sink *self_comp)
1111{
e803df70 1112 bt_component_class_sink_graph_is_configured_method_status status;
9a2c8b8e 1113 bt_message_iterator_create_from_sink_component_status
e803df70 1114 msg_iter_status;
15fe47e0
PP
1115 struct fs_sink_comp *fs_sink = bt_self_component_get_data(
1116 bt_self_component_sink_as_self_component(self_comp));
1117
e803df70 1118 msg_iter_status =
9a2c8b8e 1119 bt_message_iterator_create_from_sink_component(
ca02df0a 1120 self_comp,
15fe47e0 1121 bt_self_component_sink_borrow_input_port_by_name(
e803df70 1122 self_comp, in_port_name), &fs_sink->upstream_iter);
9a2c8b8e 1123 if (msg_iter_status != BT_MESSAGE_ITERATOR_CREATE_FROM_SINK_COMPONENT_STATUS_OK) {
e803df70 1124 status = (int) msg_iter_status;
05ad2afe
SM
1125 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
1126 "Failed to create upstream iterator.");
15fe47e0
PP
1127 goto end;
1128 }
1129
e803df70 1130 status = BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK;
15fe47e0
PP
1131end:
1132 return status;
1133}
1134
1135BT_HIDDEN
1136void ctf_fs_sink_finalize(bt_self_component_sink *self_comp)
1137{
1138 struct fs_sink_comp *fs_sink = bt_self_component_get_data(
1139 bt_self_component_sink_as_self_component(self_comp));
1140
1141 destroy_fs_sink_comp(fs_sink);
1142}
This page took 0.113114 seconds and 4 git commands to generate.