Commit | Line | Data |
---|---|---|
15fe47e0 | 1 | /* |
0235b0db | 2 | * SPDX-License-Identifier: MIT |
15fe47e0 | 3 | * |
0235b0db | 4 | * Copyright 2019 Philippe Proulx <pproulx@efficios.com> |
15fe47e0 PP |
5 | */ |
6 | ||
aa1a7452 | 7 | #define BT_COMP_LOG_SELF_COMP (fs_sink->self_comp) |
4164020e SM |
8 | #define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level) |
9 | #define BT_LOG_TAG "PLUGIN/SINK.CTF.FS" | |
d9c39b0a | 10 | #include "logging/comp-logging.h" |
15fe47e0 | 11 | |
3fadfbc0 | 12 | #include <babeltrace2/babeltrace.h> |
15fe47e0 PP |
13 | #include <stdio.h> |
14 | #include <stdbool.h> | |
15 | #include <glib.h> | |
578e048b MJ |
16 | #include "common/assert.h" |
17 | #include "ctfser/ctfser.h" | |
ce67f561 | 18 | #include "plugins/common/param-validation/param-validation.h" |
15fe47e0 | 19 | |
087cd0f5 SM |
20 | #include "fs-sink.hpp" |
21 | #include "fs-sink-trace.hpp" | |
22 | #include "fs-sink-stream.hpp" | |
23 | #include "fs-sink-ctf-meta.hpp" | |
24 | #include "translate-trace-ir-to-ctf-ir.hpp" | |
25 | #include "translate-ctf-ir-to-tsdl.hpp" | |
15fe47e0 | 26 | |
4164020e | 27 | static const char * const in_port_name = "in"; |
15fe47e0 | 28 | |
4164020e SM |
29 | static bt_component_class_initialize_method_status |
30 | ensure_output_dir_exists(struct fs_sink_comp *fs_sink) | |
15fe47e0 | 31 | { |
4164020e SM |
32 | bt_component_class_initialize_method_status status = |
33 | BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK; | |
34 | int ret; | |
35 | ||
36 | ret = g_mkdir_with_parents(fs_sink->output_dir_path->str, 0755); | |
37 | if (ret) { | |
38 | BT_COMP_LOGE_APPEND_CAUSE_ERRNO(fs_sink->self_comp, | |
39 | "Cannot create directories for output directory", | |
40 | ": output-dir-path=\"%s\"", fs_sink->output_dir_path->str); | |
41 | status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR; | |
42 | goto end; | |
43 | } | |
15fe47e0 PP |
44 | |
45 | end: | |
4164020e | 46 | return status; |
15fe47e0 PP |
47 | } |
48 | ||
087cd0f5 | 49 | static bt_param_validation_map_value_entry_descr fs_sink_params_descr[] = { |
4164020e SM |
50 | {"path", |
51 | BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY, | |
52 | {bt_param_validation_value_descr::string_t}}, | |
53 | {"assume-single-trace", | |
54 | BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, | |
55 | {bt_param_validation_value_descr::bool_t}}, | |
56 | {"ignore-discarded-events", | |
57 | BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, | |
58 | {bt_param_validation_value_descr::bool_t}}, | |
59 | {"ignore-discarded-packets", | |
60 | BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, | |
61 | {bt_param_validation_value_descr::bool_t}}, | |
62 | {"quiet", | |
63 | BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, | |
64 | {bt_param_validation_value_descr::bool_t}}, | |
65 | BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END}; | |
66 | ||
67 | static bt_component_class_initialize_method_status configure_component(struct fs_sink_comp *fs_sink, | |
68 | const bt_value *params) | |
15fe47e0 | 69 | { |
4164020e SM |
70 | bt_component_class_initialize_method_status status; |
71 | const bt_value *value; | |
72 | enum bt_param_validation_status validation_status; | |
73 | gchar *validation_error; | |
74 | ||
75 | validation_status = | |
76 | bt_param_validation_validate(params, fs_sink_params_descr, &validation_error); | |
77 | if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) { | |
78 | status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR; | |
79 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "%s", validation_error); | |
80 | goto end; | |
81 | } else if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) { | |
82 | status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; | |
83 | goto end; | |
84 | } | |
85 | ||
86 | value = bt_value_map_borrow_entry_value_const(params, "path"); | |
87 | g_string_assign(fs_sink->output_dir_path, bt_value_string_get(value)); | |
88 | ||
89 | value = bt_value_map_borrow_entry_value_const(params, "assume-single-trace"); | |
90 | if (value) { | |
91 | fs_sink->assume_single_trace = (bool) bt_value_bool_get(value); | |
92 | } | |
93 | ||
94 | value = bt_value_map_borrow_entry_value_const(params, "ignore-discarded-events"); | |
95 | if (value) { | |
96 | fs_sink->ignore_discarded_events = (bool) bt_value_bool_get(value); | |
97 | } | |
98 | ||
99 | value = bt_value_map_borrow_entry_value_const(params, "ignore-discarded-packets"); | |
100 | if (value) { | |
101 | fs_sink->ignore_discarded_packets = (bool) bt_value_bool_get(value); | |
102 | } | |
103 | ||
104 | value = bt_value_map_borrow_entry_value_const(params, "quiet"); | |
105 | if (value) { | |
106 | fs_sink->quiet = (bool) bt_value_bool_get(value); | |
107 | } | |
108 | ||
109 | status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK; | |
ce67f561 | 110 | |
15fe47e0 | 111 | end: |
4164020e SM |
112 | g_free(validation_error); |
113 | return status; | |
15fe47e0 PP |
114 | } |
115 | ||
4164020e | 116 | static void destroy_fs_sink_comp(struct fs_sink_comp *fs_sink) |
15fe47e0 | 117 | { |
4164020e SM |
118 | if (!fs_sink) { |
119 | goto end; | |
120 | } | |
15fe47e0 | 121 | |
4164020e SM |
122 | if (fs_sink->output_dir_path) { |
123 | g_string_free(fs_sink->output_dir_path, TRUE); | |
124 | fs_sink->output_dir_path = NULL; | |
125 | } | |
15fe47e0 | 126 | |
4164020e SM |
127 | if (fs_sink->traces) { |
128 | g_hash_table_destroy(fs_sink->traces); | |
129 | fs_sink->traces = NULL; | |
130 | } | |
15fe47e0 | 131 | |
4164020e SM |
132 | BT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(fs_sink->upstream_iter); |
133 | g_free(fs_sink); | |
15fe47e0 PP |
134 | |
135 | end: | |
4164020e | 136 | return; |
15fe47e0 PP |
137 | } |
138 | ||
139 | BT_HIDDEN | |
4164020e SM |
140 | bt_component_class_initialize_method_status |
141 | ctf_fs_sink_init(bt_self_component_sink *self_comp_sink, | |
142 | bt_self_component_sink_configuration *config, const bt_value *params, | |
143 | void *init_method_data) | |
15fe47e0 | 144 | { |
4164020e SM |
145 | bt_component_class_initialize_method_status status; |
146 | bt_self_component_add_port_status add_port_status; | |
147 | struct fs_sink_comp *fs_sink = NULL; | |
148 | bt_self_component *self_comp = bt_self_component_sink_as_self_component(self_comp_sink); | |
149 | bt_logging_level log_level = | |
150 | bt_component_get_logging_level(bt_self_component_as_component(self_comp)); | |
151 | ||
152 | fs_sink = g_new0(struct fs_sink_comp, 1); | |
153 | if (!fs_sink) { | |
154 | BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR, log_level, self_comp, | |
155 | "Failed to allocate one CTF FS sink structure."); | |
156 | BT_CURRENT_THREAD_ERROR_APPEND_CAUSE_FROM_COMPONENT( | |
157 | self_comp, "Failed to allocate one CTF FS sink structure."); | |
158 | status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; | |
159 | goto end; | |
160 | } | |
161 | ||
162 | fs_sink->log_level = log_level; | |
163 | fs_sink->self_comp = self_comp; | |
164 | fs_sink->output_dir_path = g_string_new(NULL); | |
165 | status = configure_component(fs_sink, params); | |
166 | if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) { | |
167 | /* configure_component() logs errors */ | |
168 | goto end; | |
169 | } | |
170 | ||
171 | if (fs_sink->assume_single_trace && | |
172 | g_file_test(fs_sink->output_dir_path->str, G_FILE_TEST_EXISTS)) { | |
173 | BT_COMP_LOGE_APPEND_CAUSE(self_comp, | |
174 | "Single trace mode, but output path exists: output-path=\"%s\"", | |
175 | fs_sink->output_dir_path->str); | |
176 | status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR; | |
177 | goto end; | |
178 | } | |
179 | ||
180 | status = ensure_output_dir_exists(fs_sink); | |
181 | if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) { | |
182 | /* ensure_output_dir_exists() logs errors */ | |
183 | goto end; | |
184 | } | |
185 | ||
186 | fs_sink->traces = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, | |
187 | (GDestroyNotify) fs_sink_trace_destroy); | |
188 | if (!fs_sink->traces) { | |
189 | BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate one GHashTable."); | |
190 | status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; | |
191 | goto end; | |
192 | } | |
193 | ||
194 | add_port_status = | |
195 | bt_self_component_sink_add_input_port(self_comp_sink, in_port_name, NULL, NULL); | |
196 | if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) { | |
197 | status = (bt_component_class_initialize_method_status) add_port_status; | |
198 | BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to add input port."); | |
199 | goto end; | |
200 | } | |
201 | ||
202 | bt_self_component_set_data(self_comp, fs_sink); | |
15fe47e0 PP |
203 | |
204 | end: | |
4164020e SM |
205 | if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) { |
206 | destroy_fs_sink_comp(fs_sink); | |
207 | } | |
15fe47e0 | 208 | |
4164020e | 209 | return status; |
15fe47e0 PP |
210 | } |
211 | ||
4164020e SM |
212 | static inline struct fs_sink_stream *borrow_stream(struct fs_sink_comp *fs_sink, |
213 | const bt_stream *ir_stream) | |
15fe47e0 | 214 | { |
4164020e SM |
215 | const bt_trace *ir_trace = bt_stream_borrow_trace_const(ir_stream); |
216 | struct fs_sink_trace *trace; | |
217 | struct fs_sink_stream *stream = NULL; | |
218 | ||
219 | trace = (fs_sink_trace *) g_hash_table_lookup(fs_sink->traces, ir_trace); | |
220 | if (G_UNLIKELY(!trace)) { | |
221 | if (fs_sink->assume_single_trace && g_hash_table_size(fs_sink->traces) > 0) { | |
222 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, | |
223 | "Single trace mode, but getting more than one trace: " | |
224 | "stream-name=\"%s\"", | |
225 | bt_stream_get_name(ir_stream)); | |
226 | goto end; | |
227 | } | |
228 | ||
229 | trace = fs_sink_trace_create(fs_sink, ir_trace); | |
230 | if (!trace) { | |
231 | goto end; | |
232 | } | |
233 | } | |
234 | ||
235 | stream = (fs_sink_stream *) g_hash_table_lookup(trace->streams, ir_stream); | |
236 | if (G_UNLIKELY(!stream)) { | |
237 | stream = fs_sink_stream_create(trace, ir_stream); | |
238 | if (!stream) { | |
239 | goto end; | |
240 | } | |
241 | } | |
15fe47e0 PP |
242 | |
243 | end: | |
4164020e | 244 | return stream; |
15fe47e0 PP |
245 | } |
246 | ||
4164020e SM |
247 | static inline bt_component_class_sink_consume_method_status |
248 | handle_event_msg(struct fs_sink_comp *fs_sink, const bt_message *msg) | |
15fe47e0 | 249 | { |
4164020e SM |
250 | int ret; |
251 | bt_component_class_sink_consume_method_status status = | |
252 | BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK; | |
253 | const bt_event *ir_event = bt_message_event_borrow_event_const(msg); | |
254 | const bt_stream *ir_stream = bt_event_borrow_stream_const(ir_event); | |
255 | struct fs_sink_stream *stream; | |
256 | struct fs_sink_ctf_event_class *ec = NULL; | |
257 | const bt_clock_snapshot *cs = NULL; | |
258 | ||
259 | stream = borrow_stream(fs_sink, ir_stream); | |
260 | if (G_UNLIKELY(!stream)) { | |
261 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream."); | |
262 | status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; | |
263 | goto end; | |
264 | } | |
265 | ||
266 | ret = try_translate_event_class_trace_ir_to_ctf_ir(fs_sink, stream->sc, | |
267 | bt_event_borrow_class_const(ir_event), &ec); | |
268 | if (ret) { | |
269 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to translate event class to CTF IR."); | |
270 | status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; | |
271 | goto end; | |
272 | } | |
273 | ||
274 | BT_ASSERT_DBG(ec); | |
275 | ||
276 | if (stream->sc->default_clock_class) { | |
277 | cs = bt_message_event_borrow_default_clock_snapshot_const(msg); | |
278 | } | |
279 | ||
280 | /* | |
281 | * If this event's stream does not support packets, then we | |
282 | * lazily create artificial packets. | |
283 | * | |
284 | * The size of an artificial packet is arbitrarily at least | |
285 | * 4 MiB (it usually is greater because we close it when | |
286 | * comes the time to write a new event and the packet's content | |
287 | * size is >= 4 MiB), except the last one which can be smaller. | |
288 | */ | |
289 | if (G_UNLIKELY(!stream->sc->has_packets)) { | |
290 | if (stream->packet_state.is_open && | |
291 | bt_ctfser_get_offset_in_current_packet_bits(&stream->ctfser) / 8 >= 4 * 1024 * 1024) { | |
292 | /* | |
293 | * Stream's current packet is larger than 4 MiB: | |
294 | * close it. A new packet will be opened just | |
295 | * below. | |
296 | */ | |
297 | ret = fs_sink_stream_close_packet(stream, NULL); | |
298 | if (ret) { | |
299 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to close packet."); | |
300 | status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; | |
301 | goto end; | |
302 | } | |
303 | } | |
304 | ||
305 | if (!stream->packet_state.is_open) { | |
306 | /* Stream's packet is not currently opened: open it */ | |
307 | ret = fs_sink_stream_open_packet(stream, NULL, NULL); | |
308 | if (ret) { | |
309 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to open packet."); | |
310 | status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; | |
311 | goto end; | |
312 | } | |
313 | } | |
314 | } | |
315 | ||
316 | BT_ASSERT_DBG(stream->packet_state.is_open); | |
317 | ret = fs_sink_stream_write_event(stream, cs, ir_event, ec); | |
318 | if (G_UNLIKELY(ret)) { | |
319 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to write event."); | |
320 | status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; | |
321 | goto end; | |
322 | } | |
15fe47e0 PP |
323 | |
324 | end: | |
4164020e | 325 | return status; |
15fe47e0 PP |
326 | } |
327 | ||
4164020e SM |
328 | static inline bt_component_class_sink_consume_method_status |
329 | handle_packet_beginning_msg(struct fs_sink_comp *fs_sink, const bt_message *msg) | |
15fe47e0 | 330 | { |
4164020e SM |
331 | int ret; |
332 | bt_component_class_sink_consume_method_status status = | |
333 | BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK; | |
334 | const bt_packet *ir_packet = bt_message_packet_beginning_borrow_packet_const(msg); | |
335 | const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet); | |
336 | struct fs_sink_stream *stream; | |
337 | const bt_clock_snapshot *cs = NULL; | |
338 | ||
339 | stream = borrow_stream(fs_sink, ir_stream); | |
340 | if (G_UNLIKELY(!stream)) { | |
341 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream."); | |
342 | status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; | |
343 | goto end; | |
344 | } | |
345 | ||
346 | if (stream->sc->packets_have_ts_begin) { | |
347 | cs = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg); | |
348 | BT_ASSERT(cs); | |
349 | } | |
350 | ||
351 | /* | |
352 | * If we previously received a discarded events message with | |
353 | * a time range, make sure that its beginning time matches what's | |
354 | * expected for CTF 1.8, that is: | |
355 | * | |
356 | * * Its beginning time is the previous packet's end | |
357 | * time (or the current packet's beginning time if | |
358 | * this is the first packet). | |
359 | * | |
360 | * We check this here instead of in handle_packet_end_msg() | |
361 | * because we want to catch any incompatible message as early as | |
362 | * possible to report the error. | |
363 | * | |
364 | * Validation of the discarded events message's end time is | |
365 | * performed in handle_packet_end_msg(). | |
366 | */ | |
367 | if (stream->discarded_events_state.in_range) { | |
368 | uint64_t expected_cs; | |
369 | ||
370 | /* | |
371 | * `stream->discarded_events_state.in_range` is only set | |
372 | * when the stream class's discarded events have a time | |
373 | * range. | |
374 | * | |
375 | * It is required that the packet beginning and end | |
376 | * messages for this stream class have times when | |
377 | * discarded events have a time range. | |
378 | */ | |
379 | BT_ASSERT(stream->sc->discarded_events_has_ts); | |
380 | BT_ASSERT(stream->sc->packets_have_ts_begin); | |
381 | BT_ASSERT(stream->sc->packets_have_ts_end); | |
382 | ||
383 | if (stream->prev_packet_state.end_cs == UINT64_C(-1)) { | |
384 | /* We're opening the first packet */ | |
385 | expected_cs = bt_clock_snapshot_get_value(cs); | |
386 | } else { | |
387 | expected_cs = stream->prev_packet_state.end_cs; | |
388 | } | |
389 | ||
390 | if (stream->discarded_events_state.beginning_cs != expected_cs) { | |
391 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, | |
392 | "Incompatible discarded events message: " | |
393 | "unexpected beginning time: " | |
394 | "beginning-cs-val=%" PRIu64 ", " | |
395 | "expected-beginning-cs-val=%" PRIu64 ", " | |
396 | "stream-id=%" PRIu64 ", stream-name=\"%s\", " | |
397 | "trace-name=\"%s\", path=\"%s/%s\"", | |
398 | stream->discarded_events_state.beginning_cs, expected_cs, | |
399 | bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream), | |
400 | bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)), | |
401 | stream->trace->path->str, stream->file_name->str); | |
402 | status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; | |
403 | goto end; | |
404 | } | |
405 | } | |
406 | ||
407 | /* | |
408 | * If we previously received a discarded packets message with a | |
409 | * time range, make sure that its beginning and end times match | |
410 | * what's expected for CTF 1.8, that is: | |
411 | * | |
412 | * * Its beginning time is the previous packet's end time. | |
413 | * | |
414 | * * Its end time is the current packet's beginning time. | |
415 | */ | |
416 | if (stream->discarded_packets_state.in_range) { | |
417 | uint64_t expected_end_cs; | |
418 | ||
419 | /* | |
420 | * `stream->discarded_packets_state.in_range` is only | |
421 | * set when the stream class's discarded packets have a | |
422 | * time range. | |
423 | * | |
424 | * It is required that the packet beginning and end | |
425 | * messages for this stream class have times when | |
426 | * discarded packets have a time range. | |
427 | */ | |
428 | BT_ASSERT(stream->sc->discarded_packets_has_ts); | |
429 | BT_ASSERT(stream->sc->packets_have_ts_begin); | |
430 | BT_ASSERT(stream->sc->packets_have_ts_end); | |
431 | ||
432 | /* | |
433 | * It is not supported to have a discarded packets | |
434 | * message _before_ the first packet: we cannot validate | |
435 | * that its beginning time is compatible with CTF 1.8 in | |
436 | * this case. | |
437 | */ | |
438 | if (stream->prev_packet_state.end_cs == UINT64_C(-1)) { | |
439 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, | |
440 | "Incompatible discarded packets message " | |
441 | "occurring before the stream's first packet: " | |
442 | "stream-id=%" PRIu64 ", stream-name=\"%s\", " | |
443 | "trace-name=\"%s\", path=\"%s/%s\"", | |
444 | bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream), | |
445 | bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)), | |
446 | stream->trace->path->str, stream->file_name->str); | |
447 | status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; | |
448 | goto end; | |
449 | } | |
450 | ||
451 | if (stream->discarded_packets_state.beginning_cs != stream->prev_packet_state.end_cs) { | |
452 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, | |
453 | "Incompatible discarded packets message: " | |
454 | "unexpected beginning time: " | |
455 | "beginning-cs-val=%" PRIu64 ", " | |
456 | "expected-beginning-cs-val=%" PRIu64 ", " | |
457 | "stream-id=%" PRIu64 ", stream-name=\"%s\", " | |
458 | "trace-name=\"%s\", path=\"%s/%s\"", | |
459 | stream->discarded_packets_state.beginning_cs, | |
460 | stream->prev_packet_state.end_cs, bt_stream_get_id(ir_stream), | |
461 | bt_stream_get_name(ir_stream), | |
462 | bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)), | |
463 | stream->trace->path->str, stream->file_name->str); | |
464 | status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; | |
465 | goto end; | |
466 | } | |
467 | ||
468 | expected_end_cs = bt_clock_snapshot_get_value(cs); | |
469 | ||
470 | if (stream->discarded_packets_state.end_cs != expected_end_cs) { | |
471 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, | |
472 | "Incompatible discarded packets message: " | |
473 | "unexpected end time: " | |
474 | "end-cs-val=%" PRIu64 ", " | |
475 | "expected-end-cs-val=%" PRIu64 ", " | |
476 | "stream-id=%" PRIu64 ", stream-name=\"%s\", " | |
477 | "trace-name=\"%s\", path=\"%s/%s\"", | |
478 | stream->discarded_packets_state.end_cs, expected_end_cs, | |
479 | bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream), | |
480 | bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)), | |
481 | stream->trace->path->str, stream->file_name->str); | |
482 | status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; | |
483 | goto end; | |
484 | } | |
485 | } | |
486 | ||
487 | /* | |
488 | * We're not in a discarded packets time range anymore since we | |
489 | * require that the discarded packets time ranges go from one | |
490 | * packet's end time to the next packet's beginning time, and | |
491 | * we're handling a packet beginning message here. | |
492 | */ | |
493 | stream->discarded_packets_state.in_range = false; | |
494 | ||
495 | ret = fs_sink_stream_open_packet(stream, cs, ir_packet); | |
496 | if (ret) { | |
497 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to open packet."); | |
498 | status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; | |
499 | goto end; | |
500 | } | |
15fe47e0 PP |
501 | |
502 | end: | |
4164020e | 503 | return status; |
15fe47e0 PP |
504 | } |
505 | ||
4164020e SM |
506 | static inline bt_component_class_sink_consume_method_status |
507 | handle_packet_end_msg(struct fs_sink_comp *fs_sink, const bt_message *msg) | |
15fe47e0 | 508 | { |
4164020e SM |
509 | int ret; |
510 | bt_component_class_sink_consume_method_status status = | |
511 | BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK; | |
512 | const bt_packet *ir_packet = bt_message_packet_end_borrow_packet_const(msg); | |
513 | const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet); | |
514 | struct fs_sink_stream *stream; | |
515 | const bt_clock_snapshot *cs = NULL; | |
516 | ||
517 | stream = borrow_stream(fs_sink, ir_stream); | |
518 | if (G_UNLIKELY(!stream)) { | |
519 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream."); | |
520 | status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; | |
521 | goto end; | |
522 | } | |
523 | ||
524 | if (stream->sc->packets_have_ts_end) { | |
525 | cs = bt_message_packet_end_borrow_default_clock_snapshot_const(msg); | |
526 | BT_ASSERT(cs); | |
527 | } | |
528 | ||
529 | /* | |
530 | * If we previously received a discarded events message with | |
531 | * a time range, make sure that its end time matches what's | |
532 | * expected for CTF 1.8, that is: | |
533 | * | |
534 | * * Its end time is the current packet's end time. | |
535 | * | |
536 | * Validation of the discarded events message's beginning time | |
537 | * is performed in handle_packet_beginning_msg(). | |
538 | */ | |
539 | if (stream->discarded_events_state.in_range) { | |
540 | uint64_t expected_cs; | |
541 | ||
542 | /* | |
543 | * `stream->discarded_events_state.in_range` is only set | |
544 | * when the stream class's discarded events have a time | |
545 | * range. | |
546 | * | |
547 | * It is required that the packet beginning and end | |
548 | * messages for this stream class have times when | |
549 | * discarded events have a time range. | |
550 | */ | |
551 | BT_ASSERT(stream->sc->discarded_events_has_ts); | |
552 | BT_ASSERT(stream->sc->packets_have_ts_begin); | |
553 | BT_ASSERT(stream->sc->packets_have_ts_end); | |
554 | ||
555 | expected_cs = bt_clock_snapshot_get_value(cs); | |
556 | ||
557 | if (stream->discarded_events_state.end_cs != expected_cs) { | |
558 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, | |
559 | "Incompatible discarded events message: " | |
560 | "unexpected end time: " | |
561 | "end-cs-val=%" PRIu64 ", " | |
562 | "expected-end-cs-val=%" PRIu64 ", " | |
563 | "stream-id=%" PRIu64 ", stream-name=\"%s\", " | |
564 | "trace-name=\"%s\", path=\"%s/%s\"", | |
565 | stream->discarded_events_state.end_cs, expected_cs, | |
566 | bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream), | |
567 | bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)), | |
568 | stream->trace->path->str, stream->file_name->str); | |
569 | status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; | |
570 | goto end; | |
571 | } | |
572 | } | |
573 | ||
574 | ret = fs_sink_stream_close_packet(stream, cs); | |
575 | if (ret) { | |
576 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to close packet."); | |
577 | status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; | |
578 | goto end; | |
579 | } | |
580 | ||
581 | /* | |
582 | * We're not in a discarded events time range anymore since we | |
583 | * require that the discarded events time ranges go from one | |
584 | * packet's end time to the next packet's end time, and we're | |
585 | * handling a packet end message here. | |
586 | */ | |
587 | stream->discarded_events_state.in_range = false; | |
15fe47e0 PP |
588 | |
589 | end: | |
4164020e | 590 | return status; |
15fe47e0 PP |
591 | } |
592 | ||
4164020e SM |
593 | static inline bt_component_class_sink_consume_method_status |
594 | handle_stream_beginning_msg(struct fs_sink_comp *fs_sink, const bt_message *msg) | |
15fe47e0 | 595 | { |
4164020e SM |
596 | bt_component_class_sink_consume_method_status status = |
597 | BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK; | |
598 | const bt_stream *ir_stream = bt_message_stream_beginning_borrow_stream_const(msg); | |
599 | const bt_stream_class *ir_sc = bt_stream_borrow_class_const(ir_stream); | |
600 | struct fs_sink_stream *stream; | |
601 | bool packets_have_beginning_end_cs = | |
602 | bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc) && | |
603 | bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc); | |
604 | ||
605 | /* | |
606 | * Not supported: discarded events or discarded packets support | |
607 | * without packets support. Packets are the way to know where | |
608 | * discarded events/packets occurred in CTF 1.8. | |
609 | */ | |
610 | if (!bt_stream_class_supports_packets(ir_sc)) { | |
611 | BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc)); | |
612 | ||
613 | if (!fs_sink->ignore_discarded_events && bt_stream_class_supports_discarded_events(ir_sc)) { | |
614 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, | |
615 | "Unsupported stream: " | |
616 | "stream does not support packets, " | |
617 | "but supports discarded events: " | |
618 | "stream-addr=%p, " | |
619 | "stream-id=%" PRIu64 ", " | |
620 | "stream-name=\"%s\"", | |
621 | ir_stream, bt_stream_get_id(ir_stream), | |
622 | bt_stream_get_name(ir_stream)); | |
623 | status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; | |
624 | goto end; | |
625 | } | |
626 | } | |
627 | ||
628 | /* | |
629 | * Not supported: discarded events with default clock snapshots, | |
630 | * but packet beginning/end without default clock snapshot. | |
631 | */ | |
632 | if (!fs_sink->ignore_discarded_events && | |
633 | bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc) && | |
634 | !packets_have_beginning_end_cs) { | |
635 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, | |
636 | "Unsupported stream: discarded events have " | |
637 | "default clock snapshots, but packets have no " | |
638 | "beginning and/or end default clock snapshots: " | |
639 | "stream-addr=%p, " | |
640 | "stream-id=%" PRIu64 ", " | |
641 | "stream-name=\"%s\"", | |
642 | ir_stream, bt_stream_get_id(ir_stream), | |
643 | bt_stream_get_name(ir_stream)); | |
644 | status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; | |
645 | goto end; | |
646 | } | |
647 | ||
648 | /* | |
649 | * Not supported: discarded packets with default clock | |
650 | * snapshots, but packet beginning/end without default clock | |
651 | * snapshot. | |
652 | */ | |
653 | if (!fs_sink->ignore_discarded_packets && | |
654 | bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc) && | |
655 | !packets_have_beginning_end_cs) { | |
656 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, | |
657 | "Unsupported stream: discarded packets have " | |
658 | "default clock snapshots, but packets have no " | |
659 | "beginning and/or end default clock snapshots: " | |
660 | "stream-addr=%p, " | |
661 | "stream-id=%" PRIu64 ", " | |
662 | "stream-name=\"%s\"", | |
663 | ir_stream, bt_stream_get_id(ir_stream), | |
664 | bt_stream_get_name(ir_stream)); | |
665 | status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; | |
666 | goto end; | |
667 | } | |
668 | ||
669 | stream = borrow_stream(fs_sink, ir_stream); | |
670 | if (!stream) { | |
671 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream."); | |
672 | status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; | |
673 | goto end; | |
674 | } | |
675 | ||
676 | BT_COMP_LOGI("Created new, empty stream file: " | |
677 | "stream-id=%" PRIu64 ", stream-name=\"%s\", " | |
678 | "trace-name=\"%s\", path=\"%s/%s\"", | |
679 | bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream), | |
680 | bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)), | |
681 | stream->trace->path->str, stream->file_name->str); | |
15fe47e0 PP |
682 | |
683 | end: | |
4164020e | 684 | return status; |
15fe47e0 PP |
685 | } |
686 | ||
4164020e SM |
687 | static inline bt_component_class_sink_consume_method_status |
688 | handle_stream_end_msg(struct fs_sink_comp *fs_sink, const bt_message *msg) | |
15fe47e0 | 689 | { |
4164020e SM |
690 | bt_component_class_sink_consume_method_status status = |
691 | BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK; | |
692 | const bt_stream *ir_stream = bt_message_stream_end_borrow_stream_const(msg); | |
693 | struct fs_sink_stream *stream; | |
694 | ||
695 | stream = borrow_stream(fs_sink, ir_stream); | |
696 | if (!stream) { | |
697 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream."); | |
698 | status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; | |
699 | goto end; | |
700 | } | |
701 | ||
702 | if (G_UNLIKELY(!stream->sc->has_packets && stream->packet_state.is_open)) { | |
703 | /* Close stream's current artificial packet */ | |
704 | int ret = fs_sink_stream_close_packet(stream, NULL); | |
705 | ||
706 | if (ret) { | |
707 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to close packet."); | |
708 | status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; | |
709 | goto end; | |
710 | } | |
711 | } | |
712 | ||
713 | BT_COMP_LOGI("Closing stream file: " | |
714 | "stream-id=%" PRIu64 ", stream-name=\"%s\", " | |
715 | "trace-name=\"%s\", path=\"%s/%s\"", | |
716 | bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream), | |
717 | bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)), | |
718 | stream->trace->path->str, stream->file_name->str); | |
719 | ||
720 | /* | |
721 | * This destroys the stream object and frees all its resources, | |
722 | * closing the stream file. | |
723 | */ | |
724 | g_hash_table_remove(stream->trace->streams, ir_stream); | |
15fe47e0 PP |
725 | |
726 | end: | |
4164020e | 727 | return status; |
15fe47e0 PP |
728 | } |
729 | ||
4164020e SM |
730 | static inline bt_component_class_sink_consume_method_status |
731 | handle_discarded_events_msg(struct fs_sink_comp *fs_sink, const bt_message *msg) | |
15fe47e0 | 732 | { |
4164020e SM |
733 | bt_component_class_sink_consume_method_status status = |
734 | BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK; | |
735 | const bt_stream *ir_stream = bt_message_discarded_events_borrow_stream_const(msg); | |
736 | struct fs_sink_stream *stream; | |
737 | const bt_clock_snapshot *cs = NULL; | |
738 | bt_property_availability avail; | |
739 | uint64_t count; | |
740 | ||
741 | stream = borrow_stream(fs_sink, ir_stream); | |
742 | if (!stream) { | |
743 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream."); | |
744 | status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; | |
745 | goto end; | |
746 | } | |
747 | ||
748 | if (fs_sink->ignore_discarded_events) { | |
749 | BT_COMP_LOGI("Ignoring discarded events message: " | |
750 | "stream-id=%" PRIu64 ", stream-name=\"%s\", " | |
751 | "trace-name=\"%s\", path=\"%s/%s\"", | |
752 | bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream), | |
753 | bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)), | |
754 | stream->trace->path->str, stream->file_name->str); | |
755 | goto end; | |
756 | } | |
757 | ||
758 | if (stream->discarded_events_state.in_range) { | |
759 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, | |
760 | "Unsupported contiguous discarded events message: " | |
761 | "stream-id=%" PRIu64 ", stream-name=\"%s\", " | |
762 | "trace-name=\"%s\", path=\"%s/%s\"", | |
763 | bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream), | |
764 | bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)), | |
765 | stream->trace->path->str, stream->file_name->str); | |
766 | status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; | |
767 | goto end; | |
768 | } | |
769 | ||
770 | /* | |
771 | * If we're currently in an opened packet (got a packet | |
772 | * beginning message, but no packet end message yet), we do not | |
773 | * support having a discarded events message with a time range | |
774 | * because we require that the discarded events message's time | |
775 | * range go from a packet's end time to the next packet's end | |
776 | * time. | |
777 | */ | |
778 | if (stream->packet_state.is_open && stream->sc->discarded_events_has_ts) { | |
779 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, | |
780 | "Unsupported discarded events message with " | |
781 | "default clock snapshots occurring within a packet: " | |
782 | "stream-id=%" PRIu64 ", stream-name=\"%s\", " | |
783 | "trace-name=\"%s\", path=\"%s/%s\"", | |
784 | bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream), | |
785 | bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)), | |
786 | stream->trace->path->str, stream->file_name->str); | |
787 | status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; | |
788 | goto end; | |
789 | } | |
790 | ||
791 | if (stream->sc->discarded_events_has_ts) { | |
792 | /* | |
793 | * Make the stream's state be in the time range of a | |
794 | * discarded events message since we have the message's | |
795 | * time range (`stream->sc->discarded_events_has_ts`). | |
796 | */ | |
797 | stream->discarded_events_state.in_range = true; | |
798 | ||
799 | /* | |
800 | * The clock snapshot values will be validated when | |
801 | * handling the next packet beginning and end messages | |
802 | * (next calls to handle_packet_beginning_msg() and | |
803 | * handle_packet_end_msg()). | |
804 | */ | |
805 | cs = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg); | |
806 | BT_ASSERT(cs); | |
807 | stream->discarded_events_state.beginning_cs = bt_clock_snapshot_get_value(cs); | |
808 | cs = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg); | |
809 | BT_ASSERT(cs); | |
810 | stream->discarded_events_state.end_cs = bt_clock_snapshot_get_value(cs); | |
811 | } | |
812 | ||
813 | avail = bt_message_discarded_events_get_count(msg, &count); | |
814 | if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) { | |
815 | /* | |
816 | * There's no specific count of discarded events: set it | |
817 | * to 1 so that we know that we at least discarded | |
818 | * something. | |
819 | */ | |
820 | count = 1; | |
821 | } | |
822 | ||
823 | stream->packet_state.discarded_events_counter += count; | |
15fe47e0 PP |
824 | |
825 | end: | |
4164020e | 826 | return status; |
15fe47e0 PP |
827 | } |
828 | ||
4164020e SM |
829 | static inline bt_component_class_sink_consume_method_status |
830 | handle_discarded_packets_msg(struct fs_sink_comp *fs_sink, const bt_message *msg) | |
15fe47e0 | 831 | { |
4164020e SM |
832 | bt_component_class_sink_consume_method_status status = |
833 | BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK; | |
834 | const bt_stream *ir_stream = bt_message_discarded_packets_borrow_stream_const(msg); | |
835 | struct fs_sink_stream *stream; | |
836 | const bt_clock_snapshot *cs = NULL; | |
837 | bt_property_availability avail; | |
838 | uint64_t count; | |
839 | ||
840 | stream = borrow_stream(fs_sink, ir_stream); | |
841 | if (!stream) { | |
842 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to borrow stream."); | |
843 | status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; | |
844 | goto end; | |
845 | } | |
846 | ||
847 | if (fs_sink->ignore_discarded_packets) { | |
848 | BT_COMP_LOGI("Ignoring discarded packets message: " | |
849 | "stream-id=%" PRIu64 ", stream-name=\"%s\", " | |
850 | "trace-name=\"%s\", path=\"%s/%s\"", | |
851 | bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream), | |
852 | bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)), | |
853 | stream->trace->path->str, stream->file_name->str); | |
854 | goto end; | |
855 | } | |
856 | ||
857 | if (stream->discarded_packets_state.in_range) { | |
858 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, | |
859 | "Unsupported contiguous discarded packets message: " | |
860 | "stream-id=%" PRIu64 ", stream-name=\"%s\", " | |
861 | "trace-name=\"%s\", path=\"%s/%s\"", | |
862 | bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream), | |
863 | bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream)), | |
864 | stream->trace->path->str, stream->file_name->str); | |
865 | status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; | |
866 | goto end; | |
867 | } | |
868 | ||
869 | /* | |
870 | * Discarded packets messages are guaranteed to occur between | |
871 | * packets. | |
872 | */ | |
873 | BT_ASSERT(!stream->packet_state.is_open); | |
874 | ||
875 | if (stream->sc->discarded_packets_has_ts) { | |
876 | /* | |
877 | * Make the stream's state be in the time range of a | |
878 | * discarded packets message since we have the message's | |
879 | * time range (`stream->sc->discarded_packets_has_ts`). | |
880 | */ | |
881 | stream->discarded_packets_state.in_range = true; | |
882 | ||
883 | /* | |
884 | * The clock snapshot values will be validated when | |
885 | * handling the next packet beginning message (next call | |
886 | * to handle_packet_beginning_msg()). | |
887 | */ | |
888 | cs = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg); | |
889 | BT_ASSERT(cs); | |
890 | stream->discarded_packets_state.beginning_cs = bt_clock_snapshot_get_value(cs); | |
891 | cs = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg); | |
892 | BT_ASSERT(cs); | |
893 | stream->discarded_packets_state.end_cs = bt_clock_snapshot_get_value(cs); | |
894 | } | |
895 | ||
896 | avail = bt_message_discarded_packets_get_count(msg, &count); | |
897 | if (avail != BT_PROPERTY_AVAILABILITY_AVAILABLE) { | |
898 | /* | |
899 | * There's no specific count of discarded packets: set | |
900 | * it to 1 so that we know that we at least discarded | |
901 | * something. | |
902 | */ | |
903 | count = 1; | |
904 | } | |
905 | ||
906 | stream->packet_state.seq_num += count; | |
15fe47e0 PP |
907 | |
908 | end: | |
4164020e | 909 | return status; |
15fe47e0 PP |
910 | } |
911 | ||
4164020e | 912 | static inline void put_messages(bt_message_array_const msgs, uint64_t count) |
15fe47e0 | 913 | { |
4164020e | 914 | uint64_t i; |
15fe47e0 | 915 | |
4164020e SM |
916 | for (i = 0; i < count; i++) { |
917 | BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]); | |
918 | } | |
15fe47e0 PP |
919 | } |
920 | ||
921 | BT_HIDDEN | |
4164020e | 922 | bt_component_class_sink_consume_method_status ctf_fs_sink_consume(bt_self_component_sink *self_comp) |
15fe47e0 | 923 | { |
4164020e SM |
924 | bt_component_class_sink_consume_method_status status = |
925 | BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK; | |
926 | struct fs_sink_comp *fs_sink; | |
927 | bt_message_iterator_next_status next_status; | |
928 | uint64_t msg_count = 0; | |
929 | bt_message_array_const msgs; | |
930 | ||
931 | fs_sink = (fs_sink_comp *) bt_self_component_get_data( | |
932 | bt_self_component_sink_as_self_component(self_comp)); | |
933 | BT_ASSERT_DBG(fs_sink); | |
934 | BT_ASSERT_DBG(fs_sink->upstream_iter); | |
935 | ||
936 | /* Consume messages */ | |
937 | next_status = bt_message_iterator_next(fs_sink->upstream_iter, &msgs, &msg_count); | |
938 | if (next_status < 0) { | |
939 | status = (bt_component_class_sink_consume_method_status) next_status; | |
940 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, | |
941 | "Failed to get next message from upstream iterator."); | |
942 | goto end; | |
943 | } | |
944 | ||
945 | switch (next_status) { | |
946 | case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK: | |
947 | { | |
948 | uint64_t i; | |
949 | ||
950 | for (i = 0; i < msg_count; i++) { | |
951 | const bt_message *msg = msgs[i]; | |
952 | ||
953 | BT_ASSERT_DBG(msg); | |
954 | ||
955 | switch (bt_message_get_type(msg)) { | |
956 | case BT_MESSAGE_TYPE_EVENT: | |
957 | status = handle_event_msg(fs_sink, msg); | |
958 | break; | |
959 | case BT_MESSAGE_TYPE_PACKET_BEGINNING: | |
960 | status = handle_packet_beginning_msg(fs_sink, msg); | |
961 | break; | |
962 | case BT_MESSAGE_TYPE_PACKET_END: | |
963 | status = handle_packet_end_msg(fs_sink, msg); | |
964 | break; | |
965 | case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY: | |
966 | /* Ignore */ | |
967 | BT_COMP_LOGD_STR("Ignoring message iterator inactivity message."); | |
968 | break; | |
969 | case BT_MESSAGE_TYPE_STREAM_BEGINNING: | |
970 | status = handle_stream_beginning_msg(fs_sink, msg); | |
971 | break; | |
972 | case BT_MESSAGE_TYPE_STREAM_END: | |
973 | status = handle_stream_end_msg(fs_sink, msg); | |
974 | break; | |
975 | case BT_MESSAGE_TYPE_DISCARDED_EVENTS: | |
976 | status = handle_discarded_events_msg(fs_sink, msg); | |
977 | break; | |
978 | case BT_MESSAGE_TYPE_DISCARDED_PACKETS: | |
979 | status = handle_discarded_packets_msg(fs_sink, msg); | |
980 | break; | |
981 | default: | |
982 | bt_common_abort(); | |
983 | } | |
984 | ||
985 | BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]); | |
986 | ||
987 | if (status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK) { | |
988 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, | |
989 | "Failed to handle message: " | |
990 | "generated CTF traces could be incomplete: " | |
991 | "output-dir-path=\"%s\"", | |
992 | fs_sink->output_dir_path->str); | |
993 | goto error; | |
994 | } | |
995 | } | |
996 | ||
997 | break; | |
998 | } | |
999 | case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN: | |
1000 | status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN; | |
1001 | break; | |
1002 | case BT_MESSAGE_ITERATOR_NEXT_STATUS_END: | |
1003 | /* TODO: Finalize all traces (should already be done?) */ | |
1004 | status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END; | |
1005 | break; | |
1006 | default: | |
1007 | break; | |
1008 | } | |
1009 | ||
1010 | goto end; | |
15fe47e0 PP |
1011 | |
1012 | error: | |
4164020e SM |
1013 | BT_ASSERT(status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK); |
1014 | put_messages(msgs, msg_count); | |
15fe47e0 PP |
1015 | |
1016 | end: | |
4164020e | 1017 | return status; |
15fe47e0 PP |
1018 | } |
1019 | ||
1020 | BT_HIDDEN | |
e803df70 | 1021 | bt_component_class_sink_graph_is_configured_method_status |
4164020e | 1022 | ctf_fs_sink_graph_is_configured(bt_self_component_sink *self_comp) |
15fe47e0 | 1023 | { |
4164020e SM |
1024 | bt_component_class_sink_graph_is_configured_method_status status; |
1025 | bt_message_iterator_create_from_sink_component_status msg_iter_status; | |
1026 | fs_sink_comp *fs_sink = (fs_sink_comp *) bt_self_component_get_data( | |
1027 | bt_self_component_sink_as_self_component(self_comp)); | |
1028 | ||
1029 | msg_iter_status = bt_message_iterator_create_from_sink_component( | |
1030 | self_comp, bt_self_component_sink_borrow_input_port_by_name(self_comp, in_port_name), | |
1031 | &fs_sink->upstream_iter); | |
1032 | if (msg_iter_status != BT_MESSAGE_ITERATOR_CREATE_FROM_SINK_COMPONENT_STATUS_OK) { | |
1033 | status = (bt_component_class_sink_graph_is_configured_method_status) msg_iter_status; | |
1034 | BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, "Failed to create upstream iterator."); | |
1035 | goto end; | |
1036 | } | |
1037 | ||
1038 | status = BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK; | |
15fe47e0 | 1039 | end: |
4164020e | 1040 | return status; |
15fe47e0 PP |
1041 | } |
1042 | ||
1043 | BT_HIDDEN | |
1044 | void ctf_fs_sink_finalize(bt_self_component_sink *self_comp) | |
1045 | { | |
4164020e SM |
1046 | fs_sink_comp *fs_sink = (fs_sink_comp *) bt_self_component_get_data( |
1047 | bt_self_component_sink_as_self_component(self_comp)); | |
15fe47e0 | 1048 | |
4164020e | 1049 | destroy_fs_sink_comp(fs_sink); |
15fe47e0 | 1050 | } |