ctf: compile plugin as C++
[babeltrace.git] / src / plugins / ctf / fs-sink / fs-sink.cpp
CommitLineData
15fe47e0 1/*
0235b0db 2 * SPDX-License-Identifier: MIT
15fe47e0 3 *
0235b0db 4 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
15fe47e0
PP
5 */
6
aa1a7452 7#define BT_COMP_LOG_SELF_COMP (fs_sink->self_comp)
ffa3b2b3 8#define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level)
350ad6c1 9#define BT_LOG_TAG "PLUGIN/SINK.CTF.FS"
d9c39b0a 10#include "logging/comp-logging.h"
15fe47e0 11
3fadfbc0 12#include <babeltrace2/babeltrace.h>
15fe47e0
PP
13#include <stdio.h>
14#include <stdbool.h>
15#include <glib.h>
578e048b
MJ
16#include "common/assert.h"
17#include "ctfser/ctfser.h"
ce67f561 18#include "plugins/common/param-validation/param-validation.h"
15fe47e0 19
087cd0f5
SM
20#include "fs-sink.hpp"
21#include "fs-sink-trace.hpp"
22#include "fs-sink-stream.hpp"
23#include "fs-sink-ctf-meta.hpp"
24#include "translate-trace-ir-to-ctf-ir.hpp"
25#include "translate-ctf-ir-to-tsdl.hpp"
15fe47e0
PP
26
27static
28const char * const in_port_name = "in";
29
30static
21a9f056 31bt_component_class_initialize_method_status ensure_output_dir_exists(
15fe47e0
PP
32 struct fs_sink_comp *fs_sink)
33{
21a9f056
FD
34 bt_component_class_initialize_method_status status =
35 BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
15fe47e0
PP
36 int ret;
37
38 ret = g_mkdir_with_parents(fs_sink->output_dir_path->str, 0755);
39 if (ret) {
05ad2afe 40 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(fs_sink->self_comp,
aa1a7452 41 "Cannot create directories for output directory",
15fe47e0
PP
42 ": output-dir-path=\"%s\"",
43 fs_sink->output_dir_path->str);
21a9f056 44 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
15fe47e0
PP
45 goto end;
46 }
47
48end:
49 return status;
50}
51
087cd0f5
SM
52static bt_param_validation_map_value_entry_descr fs_sink_params_descr[] = {
53 { "path", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
54 { bt_param_validation_value_descr::string_t } },
55 { "assume-single-trace", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
56 { bt_param_validation_value_descr::bool_t } },
57 { "ignore-discarded-events", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
58 { bt_param_validation_value_descr::bool_t } },
59 { "ignore-discarded-packets", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
60 { bt_param_validation_value_descr::bool_t } },
61 { "quiet", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
62 { bt_param_validation_value_descr::bool_t } },
ce67f561
SM
63 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END
64};
65
15fe47e0 66static
21a9f056 67bt_component_class_initialize_method_status
ce67f561 68configure_component(struct fs_sink_comp *fs_sink, const bt_value *params)
15fe47e0 69{
ce67f561 70 bt_component_class_initialize_method_status status;
15fe47e0 71 const bt_value *value;
ce67f561
SM
72 enum bt_param_validation_status validation_status;
73 gchar *validation_error;
15fe47e0 74
ce67f561
SM
75 validation_status = bt_param_validation_validate(params,
76 fs_sink_params_descr, &validation_error);
77 if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
21a9f056 78 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
05ad2afe
SM
79 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
80 "%s", validation_error);
15fe47e0 81 goto end;
ce67f561
SM
82 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
83 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
15fe47e0
PP
84 goto end;
85 }
86
ce67f561 87 value = bt_value_map_borrow_entry_value_const(params, "path");
15fe47e0
PP
88 g_string_assign(fs_sink->output_dir_path,
89 bt_value_string_get(value));
ce67f561 90
15fe47e0
PP
91 value = bt_value_map_borrow_entry_value_const(params,
92 "assume-single-trace");
93 if (value) {
15fe47e0
PP
94 fs_sink->assume_single_trace = (bool) bt_value_bool_get(value);
95 }
96
97 value = bt_value_map_borrow_entry_value_const(params,
98 "ignore-discarded-events");
99 if (value) {
15fe47e0
PP
100 fs_sink->ignore_discarded_events =
101 (bool) bt_value_bool_get(value);
102 }
103
104 value = bt_value_map_borrow_entry_value_const(params,
105 "ignore-discarded-packets");
106 if (value) {
15fe47e0
PP
107 fs_sink->ignore_discarded_packets =
108 (bool) bt_value_bool_get(value);
109 }
110
111 value = bt_value_map_borrow_entry_value_const(params,
112 "quiet");
113 if (value) {
15fe47e0
PP
114 fs_sink->quiet = (bool) bt_value_bool_get(value);
115 }
116
ce67f561
SM
117 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
118
15fe47e0 119end:
ce67f561 120 g_free(validation_error);
15fe47e0
PP
121 return status;
122}
123
124static
125void destroy_fs_sink_comp(struct fs_sink_comp *fs_sink)
126{
127 if (!fs_sink) {
128 goto end;
129 }
130
131 if (fs_sink->output_dir_path) {
132 g_string_free(fs_sink->output_dir_path, TRUE);
133 fs_sink->output_dir_path = NULL;
134 }
135
136 if (fs_sink->traces) {
137 g_hash_table_destroy(fs_sink->traces);
138 fs_sink->traces = NULL;
139 }
140
9a2c8b8e 141 BT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(
15fe47e0
PP
142 fs_sink->upstream_iter);
143 g_free(fs_sink);
144
145end:
146 return;
147}
148
149BT_HIDDEN
21a9f056 150bt_component_class_initialize_method_status ctf_fs_sink_init(
59225a3e
SM
151 bt_self_component_sink *self_comp_sink,
152 bt_self_component_sink_configuration *config,
153 const bt_value *params,
15fe47e0
PP
154 void *init_method_data)
155{
05ad2afe 156 bt_component_class_initialize_method_status status;
d24d5663 157 bt_self_component_add_port_status add_port_status;
15fe47e0 158 struct fs_sink_comp *fs_sink = NULL;
ffa3b2b3
PP
159 bt_self_component *self_comp =
160 bt_self_component_sink_as_self_component(self_comp_sink);
161 bt_logging_level log_level = bt_component_get_logging_level(
162 bt_self_component_as_component(self_comp));
15fe47e0
PP
163
164 fs_sink = g_new0(struct fs_sink_comp, 1);
165 if (!fs_sink) {
aa1a7452 166 BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR, log_level, self_comp,
ffa3b2b3 167 "Failed to allocate one CTF FS sink structure.");
05ad2afe
SM
168 BT_CURRENT_THREAD_ERROR_APPEND_CAUSE_FROM_COMPONENT(
169 self_comp, "Failed to allocate one CTF FS sink structure.");
21a9f056 170 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
15fe47e0
PP
171 goto end;
172 }
173
ffa3b2b3 174 fs_sink->log_level = log_level;
aa1a7452 175 fs_sink->self_comp = self_comp;
15fe47e0 176 fs_sink->output_dir_path = g_string_new(NULL);
15fe47e0 177 status = configure_component(fs_sink, params);
21a9f056 178 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
15fe47e0
PP
179 /* configure_component() logs errors */
180 goto end;
181 }
182
183 if (fs_sink->assume_single_trace &&
184 g_file_test(fs_sink->output_dir_path->str,
185 G_FILE_TEST_EXISTS)) {
05ad2afe
SM
186 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
187 "Single trace mode, but output path exists: output-path=\"%s\"",
188 fs_sink->output_dir_path->str);
21a9f056 189 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
15fe47e0
PP
190 goto end;
191 }
192
193 status = ensure_output_dir_exists(fs_sink);
21a9f056 194 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
15fe47e0
PP
195 /* ensure_output_dir_exists() logs errors */
196 goto end;
197 }
198
199 fs_sink->traces = g_hash_table_new_full(g_direct_hash, g_direct_equal,
200 NULL, (GDestroyNotify) fs_sink_trace_destroy);
201 if (!fs_sink->traces) {
05ad2afe 202 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate one GHashTable.");
21a9f056 203 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
15fe47e0
PP
204 goto end;
205 }
206
d24d5663
PP
207 add_port_status = bt_self_component_sink_add_input_port(
208 self_comp_sink, in_port_name, NULL, NULL);
05ad2afe 209 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
087cd0f5 210 status = (bt_component_class_initialize_method_status) add_port_status;
05ad2afe 211 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to add input port.");
15fe47e0
PP
212 goto end;
213 }
214
ffa3b2b3 215 bt_self_component_set_data(self_comp, fs_sink);
15fe47e0
PP
216
217end:
21a9f056 218 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
15fe47e0
PP
219 destroy_fs_sink_comp(fs_sink);
220 }
221
222 return status;
223}
224
225static inline
226struct fs_sink_stream *borrow_stream(struct fs_sink_comp *fs_sink,
227 const bt_stream *ir_stream)
228{
229 const bt_trace *ir_trace = bt_stream_borrow_trace_const(ir_stream);
230 struct fs_sink_trace *trace;
231 struct fs_sink_stream *stream = NULL;
232
087cd0f5 233 trace = (fs_sink_trace *) g_hash_table_lookup(fs_sink->traces, ir_trace);
91d81473 234 if (G_UNLIKELY(!trace)) {
15fe47e0
PP
235 if (fs_sink->assume_single_trace &&
236 g_hash_table_size(fs_sink->traces) > 0) {
05ad2afe
SM
237 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
238 "Single trace mode, but getting more than one trace: "
15fe47e0
PP
239 "stream-name=\"%s\"",
240 bt_stream_get_name(ir_stream));
241 goto end;
242 }
243
244 trace = fs_sink_trace_create(fs_sink, ir_trace);
245 if (!trace) {
246 goto end;
247 }
248 }
249
087cd0f5 250 stream = (fs_sink_stream *) g_hash_table_lookup(trace->streams, ir_stream);
91d81473 251 if (G_UNLIKELY(!stream)) {
15fe47e0
PP
252 stream = fs_sink_stream_create(trace, ir_stream);
253 if (!stream) {
254 goto end;
255 }
256 }
257
258end:
259 return stream;
260}
261
262static inline
d24d5663
PP
263bt_component_class_sink_consume_method_status handle_event_msg(
264 struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0
PP
265{
266 int ret;
d24d5663
PP
267 bt_component_class_sink_consume_method_status status =
268 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
15fe47e0
PP
269 const bt_event *ir_event = bt_message_event_borrow_event_const(msg);
270 const bt_stream *ir_stream = bt_event_borrow_stream_const(ir_event);
271 struct fs_sink_stream *stream;
272 struct fs_sink_ctf_event_class *ec = NULL;
273 const bt_clock_snapshot *cs = NULL;
274
275 stream = borrow_stream(fs_sink, ir_stream);
91d81473 276 if (G_UNLIKELY(!stream)) {
05ad2afe
SM
277 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
278 "Failed to borrow stream.");
d24d5663 279 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
280 goto end;
281 }
282
aa1a7452
PP
283 ret = try_translate_event_class_trace_ir_to_ctf_ir(fs_sink,
284 stream->sc, bt_event_borrow_class_const(ir_event), &ec);
15fe47e0 285 if (ret) {
05ad2afe
SM
286 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
287 "Failed to translate event class to CTF IR.");
d24d5663 288 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
289 goto end;
290 }
291
98b15851 292 BT_ASSERT_DBG(ec);
15fe47e0
PP
293
294 if (stream->sc->default_clock_class) {
0cbc2c33
PP
295 cs = bt_message_event_borrow_default_clock_snapshot_const(
296 msg);
15fe47e0
PP
297 }
298
26fc5aed
PP
299 /*
300 * If this event's stream does not support packets, then we
301 * lazily create artificial packets.
302 *
303 * The size of an artificial packet is arbitrarily at least
304 * 4 MiB (it usually is greater because we close it when
305 * comes the time to write a new event and the packet's content
306 * size is >= 4 MiB), except the last one which can be smaller.
307 */
308 if (G_UNLIKELY(!stream->sc->has_packets)) {
309 if (stream->packet_state.is_open &&
310 bt_ctfser_get_offset_in_current_packet_bits(&stream->ctfser) / 8 >=
311 4 * 1024 * 1024) {
312 /*
313 * Stream's current packet is larger than 4 MiB:
314 * close it. A new packet will be opened just
315 * below.
316 */
317 ret = fs_sink_stream_close_packet(stream, NULL);
318 if (ret) {
05ad2afe
SM
319 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
320 "Failed to close packet.");
26fc5aed
PP
321 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
322 goto end;
323 }
324 }
325
326 if (!stream->packet_state.is_open) {
327 /* Stream's packet is not currently opened: open it */
328 ret = fs_sink_stream_open_packet(stream, NULL, NULL);
329 if (ret) {
05ad2afe
SM
330 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
331 "Failed to open packet.");
26fc5aed
PP
332 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
333 goto end;
334 }
335 }
336 }
337
98b15851 338 BT_ASSERT_DBG(stream->packet_state.is_open);
15fe47e0 339 ret = fs_sink_stream_write_event(stream, cs, ir_event, ec);
91d81473 340 if (G_UNLIKELY(ret)) {
05ad2afe
SM
341 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
342 "Failed to write event.");
d24d5663 343 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
344 goto end;
345 }
346
347end:
348 return status;
349}
350
351static inline
d24d5663 352bt_component_class_sink_consume_method_status handle_packet_beginning_msg(
15fe47e0
PP
353 struct fs_sink_comp *fs_sink, const bt_message *msg)
354{
355 int ret;
d24d5663
PP
356 bt_component_class_sink_consume_method_status status =
357 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
15fe47e0
PP
358 const bt_packet *ir_packet =
359 bt_message_packet_beginning_borrow_packet_const(msg);
360 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
361 struct fs_sink_stream *stream;
362 const bt_clock_snapshot *cs = NULL;
363
364 stream = borrow_stream(fs_sink, ir_stream);
91d81473 365 if (G_UNLIKELY(!stream)) {
05ad2afe
SM
366 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
367 "Failed to borrow stream.");
d24d5663 368 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
369 goto end;
370 }
371
ffb5c13c 372 if (stream->sc->packets_have_ts_begin) {
0cbc2c33
PP
373 cs = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
374 msg);
15fe47e0
PP
375 BT_ASSERT(cs);
376 }
377
491c35cc
PP
378 /*
379 * If we previously received a discarded events message with
380 * a time range, make sure that its beginning time matches what's
381 * expected for CTF 1.8, that is:
382 *
383 * * Its beginning time is the previous packet's end
384 * time (or the current packet's beginning time if
385 * this is the first packet).
386 *
387 * We check this here instead of in handle_packet_end_msg()
388 * because we want to catch any incompatible message as early as
389 * possible to report the error.
390 *
391 * Validation of the discarded events message's end time is
392 * performed in handle_packet_end_msg().
393 */
15fe47e0 394 if (stream->discarded_events_state.in_range) {
ffb5c13c
PP
395 uint64_t expected_cs;
396
491c35cc
PP
397 /*
398 * `stream->discarded_events_state.in_range` is only set
399 * when the stream class's discarded events have a time
400 * range.
401 *
402 * It is required that the packet beginning and end
403 * messages for this stream class have times when
404 * discarded events have a time range.
405 */
ffb5c13c
PP
406 BT_ASSERT(stream->sc->discarded_events_has_ts);
407 BT_ASSERT(stream->sc->packets_have_ts_begin);
408 BT_ASSERT(stream->sc->packets_have_ts_end);
409
ffb5c13c
PP
410 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
411 /* We're opening the first packet */
412 expected_cs = bt_clock_snapshot_get_value(cs);
413 } else {
414 expected_cs = stream->prev_packet_state.end_cs;
15fe47e0 415 }
15fe47e0 416
ffb5c13c
PP
417 if (stream->discarded_events_state.beginning_cs !=
418 expected_cs) {
05ad2afe
SM
419 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
420 "Incompatible discarded events message: "
ffb5c13c
PP
421 "unexpected beginning time: "
422 "beginning-cs-val=%" PRIu64 ", "
423 "expected-beginning-cs-val=%" PRIu64 ", "
15fe47e0
PP
424 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
425 "trace-name=\"%s\", path=\"%s/%s\"",
ffb5c13c
PP
426 stream->discarded_events_state.beginning_cs,
427 expected_cs,
15fe47e0
PP
428 bt_stream_get_id(ir_stream),
429 bt_stream_get_name(ir_stream),
430 bt_trace_get_name(
431 bt_stream_borrow_trace_const(ir_stream)),
432 stream->trace->path->str, stream->file_name->str);
d24d5663 433 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
434 goto end;
435 }
ffb5c13c
PP
436 }
437
491c35cc
PP
438 /*
439 * If we previously received a discarded packets message with a
440 * time range, make sure that its beginning and end times match
441 * what's expected for CTF 1.8, that is:
442 *
443 * * Its beginning time is the previous packet's end time.
444 *
445 * * Its end time is the current packet's beginning time.
446 */
ffb5c13c
PP
447 if (stream->discarded_packets_state.in_range) {
448 uint64_t expected_end_cs;
449
491c35cc
PP
450 /*
451 * `stream->discarded_packets_state.in_range` is only
452 * set when the stream class's discarded packets have a
453 * time range.
454 *
455 * It is required that the packet beginning and end
456 * messages for this stream class have times when
457 * discarded packets have a time range.
458 */
ffb5c13c
PP
459 BT_ASSERT(stream->sc->discarded_packets_has_ts);
460 BT_ASSERT(stream->sc->packets_have_ts_begin);
461 BT_ASSERT(stream->sc->packets_have_ts_end);
15fe47e0
PP
462
463 /*
491c35cc
PP
464 * It is not supported to have a discarded packets
465 * message _before_ the first packet: we cannot validate
466 * that its beginning time is compatible with CTF 1.8 in
467 * this case.
15fe47e0 468 */
ffb5c13c 469 if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
05ad2afe
SM
470 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
471 "Incompatible discarded packets message "
118ae153 472 "occurring before the stream's first packet: "
ffb5c13c
PP
473 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
474 "trace-name=\"%s\", path=\"%s/%s\"",
475 bt_stream_get_id(ir_stream),
476 bt_stream_get_name(ir_stream),
477 bt_trace_get_name(
478 bt_stream_borrow_trace_const(ir_stream)),
479 stream->trace->path->str, stream->file_name->str);
d24d5663 480 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
ffb5c13c
PP
481 goto end;
482 }
483
484 if (stream->discarded_packets_state.beginning_cs !=
485 stream->prev_packet_state.end_cs) {
05ad2afe
SM
486 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
487 "Incompatible discarded packets message: "
ffb5c13c
PP
488 "unexpected beginning time: "
489 "beginning-cs-val=%" PRIu64 ", "
490 "expected-beginning-cs-val=%" PRIu64 ", "
491 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
492 "trace-name=\"%s\", path=\"%s/%s\"",
493 stream->discarded_packets_state.beginning_cs,
494 stream->prev_packet_state.end_cs,
495 bt_stream_get_id(ir_stream),
496 bt_stream_get_name(ir_stream),
497 bt_trace_get_name(
498 bt_stream_borrow_trace_const(ir_stream)),
499 stream->trace->path->str, stream->file_name->str);
d24d5663 500 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
ffb5c13c 501 goto end;
15fe47e0 502 }
ffb5c13c
PP
503
504 expected_end_cs = bt_clock_snapshot_get_value(cs);
505
506 if (stream->discarded_packets_state.end_cs !=
507 expected_end_cs) {
05ad2afe
SM
508 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
509 "Incompatible discarded packets message: "
ffb5c13c
PP
510 "unexpected end time: "
511 "end-cs-val=%" PRIu64 ", "
512 "expected-end-cs-val=%" PRIu64 ", "
513 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
514 "trace-name=\"%s\", path=\"%s/%s\"",
515 stream->discarded_packets_state.end_cs,
516 expected_end_cs,
517 bt_stream_get_id(ir_stream),
518 bt_stream_get_name(ir_stream),
519 bt_trace_get_name(
520 bt_stream_borrow_trace_const(ir_stream)),
521 stream->trace->path->str, stream->file_name->str);
d24d5663 522 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
ffb5c13c
PP
523 goto end;
524 }
525 }
526
491c35cc
PP
527 /*
528 * We're not in a discarded packets time range anymore since we
529 * require that the discarded packets time ranges go from one
530 * packet's end time to the next packet's beginning time, and
531 * we're handling a packet beginning message here.
532 */
533 stream->discarded_packets_state.in_range = false;
15fe47e0 534
15fe47e0
PP
535 ret = fs_sink_stream_open_packet(stream, cs, ir_packet);
536 if (ret) {
05ad2afe
SM
537 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
538 "Failed to open packet.");
d24d5663 539 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
540 goto end;
541 }
542
543end:
544 return status;
545}
546
547static inline
d24d5663 548bt_component_class_sink_consume_method_status handle_packet_end_msg(
15fe47e0
PP
549 struct fs_sink_comp *fs_sink, const bt_message *msg)
550{
551 int ret;
d24d5663
PP
552 bt_component_class_sink_consume_method_status status =
553 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
15fe47e0
PP
554 const bt_packet *ir_packet =
555 bt_message_packet_end_borrow_packet_const(msg);
556 const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
557 struct fs_sink_stream *stream;
558 const bt_clock_snapshot *cs = NULL;
559
560 stream = borrow_stream(fs_sink, ir_stream);
91d81473 561 if (G_UNLIKELY(!stream)) {
05ad2afe
SM
562 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
563 "Failed to borrow stream.");
d24d5663 564 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
565 goto end;
566 }
567
ffb5c13c 568 if (stream->sc->packets_have_ts_end) {
0cbc2c33
PP
569 cs = bt_message_packet_end_borrow_default_clock_snapshot_const(
570 msg);
15fe47e0
PP
571 BT_ASSERT(cs);
572 }
573
491c35cc
PP
574 /*
575 * If we previously received a discarded events message with
576 * a time range, make sure that its end time matches what's
577 * expected for CTF 1.8, that is:
578 *
579 * * Its end time is the current packet's end time.
580 *
581 * Validation of the discarded events message's beginning time
582 * is performed in handle_packet_beginning_msg().
583 */
15fe47e0 584 if (stream->discarded_events_state.in_range) {
ffb5c13c
PP
585 uint64_t expected_cs;
586
491c35cc
PP
587 /*
588 * `stream->discarded_events_state.in_range` is only set
589 * when the stream class's discarded events have a time
590 * range.
591 *
592 * It is required that the packet beginning and end
593 * messages for this stream class have times when
594 * discarded events have a time range.
595 */
ffb5c13c
PP
596 BT_ASSERT(stream->sc->discarded_events_has_ts);
597 BT_ASSERT(stream->sc->packets_have_ts_begin);
598 BT_ASSERT(stream->sc->packets_have_ts_end);
599
ffb5c13c
PP
600 expected_cs = bt_clock_snapshot_get_value(cs);
601
602 if (stream->discarded_events_state.end_cs != expected_cs) {
05ad2afe
SM
603 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
604 "Incompatible discarded events message: "
ffb5c13c
PP
605 "unexpected end time: "
606 "end-cs-val=%" PRIu64 ", "
607 "expected-end-cs-val=%" PRIu64 ", "
608 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
609 "trace-name=\"%s\", path=\"%s/%s\"",
610 stream->discarded_events_state.end_cs,
611 expected_cs,
612 bt_stream_get_id(ir_stream),
613 bt_stream_get_name(ir_stream),
614 bt_trace_get_name(
615 bt_stream_borrow_trace_const(ir_stream)),
616 stream->trace->path->str, stream->file_name->str);
d24d5663 617 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
ffb5c13c 618 goto end;
15fe47e0
PP
619 }
620 }
621
622 ret = fs_sink_stream_close_packet(stream, cs);
623 if (ret) {
05ad2afe
SM
624 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
625 "Failed to close packet.");
d24d5663 626 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
627 goto end;
628 }
629
491c35cc
PP
630 /*
631 * We're not in a discarded events time range anymore since we
632 * require that the discarded events time ranges go from one
633 * packet's end time to the next packet's end time, and we're
634 * handling a packet end message here.
635 */
636 stream->discarded_events_state.in_range = false;
15fe47e0
PP
637
638end:
639 return status;
640}
641
642static inline
d24d5663 643bt_component_class_sink_consume_method_status handle_stream_beginning_msg(
15fe47e0
PP
644 struct fs_sink_comp *fs_sink, const bt_message *msg)
645{
d24d5663
PP
646 bt_component_class_sink_consume_method_status status =
647 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
15fe47e0
PP
648 const bt_stream *ir_stream =
649 bt_message_stream_beginning_borrow_stream_const(msg);
649934d2
PP
650 const bt_stream_class *ir_sc =
651 bt_stream_borrow_class_const(ir_stream);
15fe47e0 652 struct fs_sink_stream *stream;
ffb5c13c 653 bool packets_have_beginning_end_cs =
9b24b6aa
PP
654 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc) &&
655 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc);
15fe47e0 656
26fc5aed
PP
657 /*
658 * Not supported: discarded events or discarded packets support
659 * without packets support. Packets are the way to know where
118ae153 660 * discarded events/packets occurred in CTF 1.8.
26fc5aed
PP
661 */
662 if (!bt_stream_class_supports_packets(ir_sc)) {
663 BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc));
664
665 if (!fs_sink->ignore_discarded_events &&
666 bt_stream_class_supports_discarded_events(ir_sc)) {
05ad2afe
SM
667 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
668 "Unsupported stream: "
26fc5aed
PP
669 "stream does not support packets, "
670 "but supports discarded events: "
671 "stream-addr=%p, "
672 "stream-id=%" PRIu64 ", "
673 "stream-name=\"%s\"",
674 ir_stream, bt_stream_get_id(ir_stream),
675 bt_stream_get_name(ir_stream));
676 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
677 goto end;
678 }
679 }
680
649934d2 681 /*
ffb5c13c
PP
682 * Not supported: discarded events with default clock snapshots,
683 * but packet beginning/end without default clock snapshot.
649934d2 684 */
ffb5c13c
PP
685 if (!fs_sink->ignore_discarded_events &&
686 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc) &&
687 !packets_have_beginning_end_cs) {
05ad2afe
SM
688 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
689 "Unsupported stream: discarded events have "
ffb5c13c
PP
690 "default clock snapshots, but packets have no "
691 "beginning and/or end default clock snapshots: "
692 "stream-addr=%p, "
693 "stream-id=%" PRIu64 ", "
694 "stream-name=\"%s\"",
695 ir_stream, bt_stream_get_id(ir_stream),
696 bt_stream_get_name(ir_stream));
d24d5663 697 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
ffb5c13c
PP
698 goto end;
699 }
2e90378a 700
ffb5c13c
PP
701 /*
702 * Not supported: discarded packets with default clock
703 * snapshots, but packet beginning/end without default clock
704 * snapshot.
705 */
706 if (!fs_sink->ignore_discarded_packets &&
707 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc) &&
708 !packets_have_beginning_end_cs) {
05ad2afe
SM
709 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
710 "Unsupported stream: discarded packets have "
ffb5c13c
PP
711 "default clock snapshots, but packets have no "
712 "beginning and/or end default clock snapshots: "
713 "stream-addr=%p, "
714 "stream-id=%" PRIu64 ", "
715 "stream-name=\"%s\"",
716 ir_stream, bt_stream_get_id(ir_stream),
717 bt_stream_get_name(ir_stream));
d24d5663 718 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
ffb5c13c 719 goto end;
649934d2
PP
720 }
721
15fe47e0
PP
722 stream = borrow_stream(fs_sink, ir_stream);
723 if (!stream) {
05ad2afe
SM
724 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
725 "Failed to borrow stream.");
d24d5663 726 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
727 goto end;
728 }
729
aa1a7452 730 BT_COMP_LOGI("Created new, empty stream file: "
15fe47e0
PP
731 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
732 "trace-name=\"%s\", path=\"%s/%s\"",
733 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
734 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
735 stream->trace->path->str, stream->file_name->str);
736
737end:
738 return status;
739}
740
741static inline
d24d5663
PP
742bt_component_class_sink_consume_method_status handle_stream_end_msg(
743 struct fs_sink_comp *fs_sink, const bt_message *msg)
15fe47e0 744{
d24d5663
PP
745 bt_component_class_sink_consume_method_status status =
746 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
15fe47e0
PP
747 const bt_stream *ir_stream =
748 bt_message_stream_end_borrow_stream_const(msg);
749 struct fs_sink_stream *stream;
750
751 stream = borrow_stream(fs_sink, ir_stream);
752 if (!stream) {
05ad2afe
SM
753 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
754 "Failed to borrow stream.");
d24d5663 755 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
756 goto end;
757 }
758
26fc5aed
PP
759 if (G_UNLIKELY(!stream->sc->has_packets &&
760 stream->packet_state.is_open)) {
761 /* Close stream's current artificial packet */
762 int ret = fs_sink_stream_close_packet(stream, NULL);
763
764 if (ret) {
05ad2afe
SM
765 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
766 "Failed to close packet.");
26fc5aed
PP
767 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
768 goto end;
769 }
770 }
771
aa1a7452 772 BT_COMP_LOGI("Closing stream file: "
15fe47e0
PP
773 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
774 "trace-name=\"%s\", path=\"%s/%s\"",
775 bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
776 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)),
777 stream->trace->path->str, stream->file_name->str);
778
779 /*
780 * This destroys the stream object and frees all its resources,
781 * closing the stream file.
782 */
783 g_hash_table_remove(stream->trace->streams, ir_stream);
784
785end:
786 return status;
787}
788
789static inline
d24d5663 790bt_component_class_sink_consume_method_status handle_discarded_events_msg(
15fe47e0
PP
791 struct fs_sink_comp *fs_sink, const bt_message *msg)
792{
d24d5663
PP
793 bt_component_class_sink_consume_method_status status =
794 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
15fe47e0
PP
795 const bt_stream *ir_stream =
796 bt_message_discarded_events_borrow_stream_const(msg);
797 struct fs_sink_stream *stream;
798 const bt_clock_snapshot *cs = NULL;
799 bt_property_availability avail;
800 uint64_t count;
801
802 stream = borrow_stream(fs_sink, ir_stream);
803 if (!stream) {
05ad2afe
SM
804 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
805 "Failed to borrow stream.");
d24d5663 806 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
807 goto end;
808 }
809
810 if (fs_sink->ignore_discarded_events) {
aa1a7452 811 BT_COMP_LOGI("Ignoring discarded events message: "
15fe47e0
PP
812 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
813 "trace-name=\"%s\", path=\"%s/%s\"",
814 bt_stream_get_id(ir_stream),
815 bt_stream_get_name(ir_stream),
816 bt_trace_get_name(
817 bt_stream_borrow_trace_const(ir_stream)),
818 stream->trace->path->str, stream->file_name->str);
819 goto end;
820 }
821
822 if (stream->discarded_events_state.in_range) {
05ad2afe
SM
823 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
824 "Unsupported contiguous discarded events message: "
15fe47e0
PP
825 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
826 "trace-name=\"%s\", path=\"%s/%s\"",
827 bt_stream_get_id(ir_stream),
828 bt_stream_get_name(ir_stream),
829 bt_trace_get_name(
830 bt_stream_borrow_trace_const(ir_stream)),
831 stream->trace->path->str, stream->file_name->str);
d24d5663 832 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
833 goto end;
834 }
835
491c35cc
PP
836 /*
837 * If we're currently in an opened packet (got a packet
838 * beginning message, but no packet end message yet), we do not
839 * support having a discarded events message with a time range
840 * because we require that the discarded events message's time
841 * range go from a packet's end time to the next packet's end
842 * time.
843 */
ffb5c13c
PP
844 if (stream->packet_state.is_open &&
845 stream->sc->discarded_events_has_ts) {
05ad2afe
SM
846 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
847 "Unsupported discarded events message with "
118ae153 848 "default clock snapshots occurring within a packet: "
15fe47e0
PP
849 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
850 "trace-name=\"%s\", path=\"%s/%s\"",
851 bt_stream_get_id(ir_stream),
852 bt_stream_get_name(ir_stream),
853 bt_trace_get_name(
854 bt_stream_borrow_trace_const(ir_stream)),
855 stream->trace->path->str, stream->file_name->str);
d24d5663 856 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
857 goto end;
858 }
859
ffb5c13c 860 if (stream->sc->discarded_events_has_ts) {
491c35cc
PP
861 /*
862 * Make the stream's state be in the time range of a
863 * discarded events message since we have the message's
864 * time range (`stream->sc->discarded_events_has_ts`).
865 */
ffb5c13c 866 stream->discarded_events_state.in_range = true;
15fe47e0 867
15fe47e0
PP
868 /*
869 * The clock snapshot values will be validated when
491c35cc
PP
870 * handling the next packet beginning and end messages
871 * (next calls to handle_packet_beginning_msg() and
872 * handle_packet_end_msg()).
15fe47e0 873 */
9b24b6aa 874 cs = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
0cbc2c33 875 msg);
15fe47e0
PP
876 BT_ASSERT(cs);
877 stream->discarded_events_state.beginning_cs =
878 bt_clock_snapshot_get_value(cs);
9b24b6aa 879 cs = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(
0cbc2c33 880 msg);
15fe47e0 881 BT_ASSERT(cs);
ffb5c13c 882 stream->discarded_events_state.end_cs = bt_clock_snapshot_get_value(cs);
15fe47e0
PP
883 }
884
885 avail = bt_message_discarded_events_get_count(msg, &count);
886 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
491c35cc
PP
887 /*
888 * There's no specific count of discarded events: set it
889 * to 1 so that we know that we at least discarded
890 * something.
891 */
15fe47e0
PP
892 count = 1;
893 }
894
895 stream->packet_state.discarded_events_counter += count;
896
897end:
898 return status;
899}
900
901static inline
d24d5663 902bt_component_class_sink_consume_method_status handle_discarded_packets_msg(
15fe47e0
PP
903 struct fs_sink_comp *fs_sink, const bt_message *msg)
904{
d24d5663
PP
905 bt_component_class_sink_consume_method_status status =
906 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
15fe47e0
PP
907 const bt_stream *ir_stream =
908 bt_message_discarded_packets_borrow_stream_const(msg);
909 struct fs_sink_stream *stream;
910 const bt_clock_snapshot *cs = NULL;
911 bt_property_availability avail;
912 uint64_t count;
913
914 stream = borrow_stream(fs_sink, ir_stream);
915 if (!stream) {
05ad2afe
SM
916 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
917 "Failed to borrow stream.");
d24d5663 918 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
919 goto end;
920 }
921
922 if (fs_sink->ignore_discarded_packets) {
aa1a7452 923 BT_COMP_LOGI("Ignoring discarded packets message: "
15fe47e0
PP
924 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
925 "trace-name=\"%s\", path=\"%s/%s\"",
926 bt_stream_get_id(ir_stream),
927 bt_stream_get_name(ir_stream),
928 bt_trace_get_name(
929 bt_stream_borrow_trace_const(ir_stream)),
930 stream->trace->path->str, stream->file_name->str);
931 goto end;
932 }
933
934 if (stream->discarded_packets_state.in_range) {
05ad2afe
SM
935 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
936 "Unsupported contiguous discarded packets message: "
15fe47e0
PP
937 "stream-id=%" PRIu64 ", stream-name=\"%s\", "
938 "trace-name=\"%s\", path=\"%s/%s\"",
939 bt_stream_get_id(ir_stream),
940 bt_stream_get_name(ir_stream),
941 bt_trace_get_name(
942 bt_stream_borrow_trace_const(ir_stream)),
943 stream->trace->path->str, stream->file_name->str);
d24d5663 944 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
15fe47e0
PP
945 goto end;
946 }
947
491c35cc
PP
948 /*
949 * Discarded packets messages are guaranteed to occur between
950 * packets.
951 */
ffb5c13c 952 BT_ASSERT(!stream->packet_state.is_open);
15fe47e0 953
ffb5c13c 954 if (stream->sc->discarded_packets_has_ts) {
491c35cc
PP
955 /*
956 * Make the stream's state be in the time range of a
957 * discarded packets message since we have the message's
958 * time range (`stream->sc->discarded_packets_has_ts`).
959 */
ffb5c13c 960 stream->discarded_packets_state.in_range = true;
15fe47e0 961
15fe47e0
PP
962 /*
963 * The clock snapshot values will be validated when
491c35cc
PP
964 * handling the next packet beginning message (next call
965 * to handle_packet_beginning_msg()).
15fe47e0 966 */
9b24b6aa 967 cs = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
0cbc2c33 968 msg);
15fe47e0
PP
969 BT_ASSERT(cs);
970 stream->discarded_packets_state.beginning_cs =
971 bt_clock_snapshot_get_value(cs);
9b24b6aa 972 cs = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(
0cbc2c33 973 msg);
15fe47e0
PP
974 BT_ASSERT(cs);
975 stream->discarded_packets_state.end_cs =
976 bt_clock_snapshot_get_value(cs);
15fe47e0
PP
977 }
978
979 avail = bt_message_discarded_packets_get_count(msg, &count);
980 if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) {
491c35cc
PP
981 /*
982 * There's no specific count of discarded packets: set
983 * it to 1 so that we know that we at least discarded
984 * something.
985 */
15fe47e0
PP
986 count = 1;
987 }
988
989 stream->packet_state.seq_num += count;
990
991end:
992 return status;
993}
994
995static inline
996void put_messages(bt_message_array_const msgs, uint64_t count)
997{
998 uint64_t i;
999
1000 for (i = 0; i < count; i++) {
1001 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
1002 }
1003}
1004
1005BT_HIDDEN
d24d5663
PP
1006bt_component_class_sink_consume_method_status ctf_fs_sink_consume(
1007 bt_self_component_sink *self_comp)
15fe47e0 1008{
d24d5663
PP
1009 bt_component_class_sink_consume_method_status status =
1010 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
15fe47e0 1011 struct fs_sink_comp *fs_sink;
d24d5663 1012 bt_message_iterator_next_status next_status;
15fe47e0
PP
1013 uint64_t msg_count = 0;
1014 bt_message_array_const msgs;
1015
087cd0f5 1016 fs_sink = (fs_sink_comp *) bt_self_component_get_data(
15fe47e0 1017 bt_self_component_sink_as_self_component(self_comp));
98b15851
PP
1018 BT_ASSERT_DBG(fs_sink);
1019 BT_ASSERT_DBG(fs_sink->upstream_iter);
15fe47e0
PP
1020
1021 /* Consume messages */
9a2c8b8e 1022 next_status = bt_message_iterator_next(
15fe47e0 1023 fs_sink->upstream_iter, &msgs, &msg_count);
d24d5663 1024 if (next_status < 0) {
087cd0f5 1025 status = (bt_component_class_sink_consume_method_status) next_status;
05ad2afe
SM
1026 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
1027 "Failed to get next message from upstream iterator.");
15fe47e0
PP
1028 goto end;
1029 }
1030
d24d5663
PP
1031 switch (next_status) {
1032 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK:
15fe47e0
PP
1033 {
1034 uint64_t i;
1035
1036 for (i = 0; i < msg_count; i++) {
1037 const bt_message *msg = msgs[i];
1038
98b15851 1039 BT_ASSERT_DBG(msg);
15fe47e0
PP
1040
1041 switch (bt_message_get_type(msg)) {
1042 case BT_MESSAGE_TYPE_EVENT:
1043 status = handle_event_msg(fs_sink, msg);
1044 break;
1045 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
1046 status = handle_packet_beginning_msg(
1047 fs_sink, msg);
1048 break;
1049 case BT_MESSAGE_TYPE_PACKET_END:
1050 status = handle_packet_end_msg(
1051 fs_sink, msg);
1052 break;
1053 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
1054 /* Ignore */
aa1a7452 1055 BT_COMP_LOGD_STR("Ignoring message iterator inactivity message.");
15fe47e0
PP
1056 break;
1057 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1058 status = handle_stream_beginning_msg(
1059 fs_sink, msg);
1060 break;
1061 case BT_MESSAGE_TYPE_STREAM_END:
1062 status = handle_stream_end_msg(
1063 fs_sink, msg);
1064 break;
15fe47e0
PP
1065 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1066 status = handle_discarded_events_msg(
1067 fs_sink, msg);
1068 break;
1069 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1070 status = handle_discarded_packets_msg(
1071 fs_sink, msg);
1072 break;
1073 default:
498e7994 1074 bt_common_abort();
15fe47e0
PP
1075 }
1076
1077 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
1078
d24d5663 1079 if (status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK) {
05ad2afe
SM
1080 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
1081 "Failed to handle message: "
15fe47e0
PP
1082 "generated CTF traces could be incomplete: "
1083 "output-dir-path=\"%s\"",
1084 fs_sink->output_dir_path->str);
1085 goto error;
1086 }
1087 }
1088
1089 break;
1090 }
d24d5663
PP
1091 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN:
1092 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN;
15fe47e0 1093 break;
d24d5663 1094 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END:
15fe47e0 1095 /* TODO: Finalize all traces (should already be done?) */
d24d5663 1096 status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END;
15fe47e0 1097 break;
15fe47e0
PP
1098 default:
1099 break;
1100 }
1101
1102 goto end;
1103
1104error:
d24d5663 1105 BT_ASSERT(status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK);
15fe47e0
PP
1106 put_messages(msgs, msg_count);
1107
1108end:
1109 return status;
1110}
1111
1112BT_HIDDEN
e803df70
SM
1113bt_component_class_sink_graph_is_configured_method_status
1114ctf_fs_sink_graph_is_configured(
15fe47e0
PP
1115 bt_self_component_sink *self_comp)
1116{
e803df70 1117 bt_component_class_sink_graph_is_configured_method_status status;
9a2c8b8e 1118 bt_message_iterator_create_from_sink_component_status
e803df70 1119 msg_iter_status;
087cd0f5 1120 fs_sink_comp *fs_sink = (fs_sink_comp *) bt_self_component_get_data(
15fe47e0
PP
1121 bt_self_component_sink_as_self_component(self_comp));
1122
e803df70 1123 msg_iter_status =
9a2c8b8e 1124 bt_message_iterator_create_from_sink_component(
ca02df0a 1125 self_comp,
15fe47e0 1126 bt_self_component_sink_borrow_input_port_by_name(
e803df70 1127 self_comp, in_port_name), &fs_sink->upstream_iter);
9a2c8b8e 1128 if (msg_iter_status != BT_MESSAGE_ITERATOR_CREATE_FROM_SINK_COMPONENT_STATUS_OK) {
087cd0f5 1129 status = (bt_component_class_sink_graph_is_configured_method_status) msg_iter_status;
05ad2afe
SM
1130 BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp,
1131 "Failed to create upstream iterator.");
15fe47e0
PP
1132 goto end;
1133 }
1134
e803df70 1135 status = BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK;
15fe47e0
PP
1136end:
1137 return status;
1138}
1139
1140BT_HIDDEN
1141void ctf_fs_sink_finalize(bt_self_component_sink *self_comp)
1142{
087cd0f5 1143 fs_sink_comp *fs_sink = (fs_sink_comp *) bt_self_component_get_data(
15fe47e0
PP
1144 bt_self_component_sink_as_self_component(self_comp));
1145
1146 destroy_fs_sink_comp(fs_sink);
1147}
This page took 0.129412 seconds and 4 git commands to generate.