X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fplugins%2Fctf%2Ffs-sink%2Ffs-sink.c;h=4b077507b7a4bd9f71a2c99d1f63a163185b9df8;hb=516bf0a77e025cfccce2fa400b757e94dc0bf1d8;hp=f6b5bab41f8d8afb9f59f46f930627aa0e61853d;hpb=ffa3b2b342beb89fff9f31dd81d4f04ac5546fd5;p=babeltrace.git diff --git a/src/plugins/ctf/fs-sink/fs-sink.c b/src/plugins/ctf/fs-sink/fs-sink.c index f6b5bab4..4b077507 100644 --- a/src/plugins/ctf/fs-sink/fs-sink.c +++ b/src/plugins/ctf/fs-sink/fs-sink.c @@ -1,28 +1,13 @@ /* - * Copyright 2019 Philippe Proulx - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: + * SPDX-License-Identifier: MIT * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. + * Copyright 2019 Philippe Proulx */ +#define BT_COMP_LOG_SELF_COMP (fs_sink->self_comp) #define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level) #define BT_LOG_TAG "PLUGIN/SINK.CTF.FS" -#include "logging/log.h" +#include "logging/comp-logging.h" #include #include @@ -30,6 +15,7 @@ #include #include "common/assert.h" #include "ctfser/ctfser.h" +#include "plugins/common/param-validation/param-validation.h" #include "fs-sink.h" #include "fs-sink-trace.h" @@ -42,18 +28,20 @@ static const char * const in_port_name = "in"; static -bt_self_component_status ensure_output_dir_exists( +bt_component_class_initialize_method_status ensure_output_dir_exists( struct fs_sink_comp *fs_sink) { - bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK; + bt_component_class_initialize_method_status status = + BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK; int ret; ret = g_mkdir_with_parents(fs_sink->output_dir_path->str, 0755); if (ret) { - BT_LOGE_ERRNO("Cannot create directories for output directory", + BT_COMP_LOGE_APPEND_CAUSE_ERRNO(fs_sink->self_comp, + "Cannot create directories for output directory", ": output-dir-path=\"%s\"", fs_sink->output_dir_path->str); - status = BT_SELF_COMPONENT_STATUS_ERROR; + status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR; goto end; } @@ -61,49 +49,49 @@ end: return status; } +static struct bt_param_validation_map_value_entry_descr fs_sink_params_descr[] = { + { "path", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY, { .type = BT_VALUE_TYPE_STRING } }, + { "assume-single-trace", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .type = BT_VALUE_TYPE_BOOL } }, + { "ignore-discarded-events", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .type = BT_VALUE_TYPE_BOOL } }, + { "ignore-discarded-packets", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .type = BT_VALUE_TYPE_BOOL } }, + { "quiet", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .type = BT_VALUE_TYPE_BOOL } }, + BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END +}; + static -bt_self_component_status configure_component(struct fs_sink_comp *fs_sink, - const bt_value *params) +bt_component_class_initialize_method_status +configure_component(struct fs_sink_comp *fs_sink, const bt_value *params) { - bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK; + bt_component_class_initialize_method_status status; const bt_value *value; - - value = bt_value_map_borrow_entry_value_const(params, "path"); - if (!value) { - BT_LOGE_STR("Missing mandatory `path` parameter."); - status = BT_SELF_COMPONENT_STATUS_ERROR; + enum bt_param_validation_status validation_status; + gchar *validation_error; + + validation_status = bt_param_validation_validate(params, + fs_sink_params_descr, &validation_error); + if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) { + status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR; + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "%s", validation_error); goto end; - } - - if (!bt_value_is_string(value)) { - BT_LOGE_STR("`path` parameter: expecting a string."); - status = BT_SELF_COMPONENT_STATUS_ERROR; + } else if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) { + status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; goto end; } + value = bt_value_map_borrow_entry_value_const(params, "path"); g_string_assign(fs_sink->output_dir_path, bt_value_string_get(value)); + value = bt_value_map_borrow_entry_value_const(params, "assume-single-trace"); if (value) { - if (!bt_value_is_bool(value)) { - BT_LOGE_STR("`assume-single-trace` parameter: expecting a boolean."); - status = BT_SELF_COMPONENT_STATUS_ERROR; - goto end; - } - fs_sink->assume_single_trace = (bool) bt_value_bool_get(value); } value = bt_value_map_borrow_entry_value_const(params, "ignore-discarded-events"); if (value) { - if (!bt_value_is_bool(value)) { - BT_LOGE_STR("`ignore-discarded-events` parameter: expecting a boolean."); - status = BT_SELF_COMPONENT_STATUS_ERROR; - goto end; - } - fs_sink->ignore_discarded_events = (bool) bt_value_bool_get(value); } @@ -111,12 +99,6 @@ bt_self_component_status configure_component(struct fs_sink_comp *fs_sink, value = bt_value_map_borrow_entry_value_const(params, "ignore-discarded-packets"); if (value) { - if (!bt_value_is_bool(value)) { - BT_LOGE_STR("`ignore-discarded-packets` parameter: expecting a boolean."); - status = BT_SELF_COMPONENT_STATUS_ERROR; - goto end; - } - fs_sink->ignore_discarded_packets = (bool) bt_value_bool_get(value); } @@ -124,16 +106,13 @@ bt_self_component_status configure_component(struct fs_sink_comp *fs_sink, value = bt_value_map_borrow_entry_value_const(params, "quiet"); if (value) { - if (!bt_value_is_bool(value)) { - BT_LOGE_STR("`quiet` parameter: expecting a boolean."); - status = BT_SELF_COMPONENT_STATUS_ERROR; - goto end; - } - fs_sink->quiet = (bool) bt_value_bool_get(value); } + status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK; + end: + g_free(validation_error); return status; } @@ -154,7 +133,7 @@ void destroy_fs_sink_comp(struct fs_sink_comp *fs_sink) fs_sink->traces = NULL; } - BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET( + BT_MESSAGE_ITERATOR_PUT_REF_AND_RESET( fs_sink->upstream_iter); g_free(fs_sink); @@ -163,11 +142,14 @@ end: } BT_HIDDEN -bt_self_component_status ctf_fs_sink_init( - bt_self_component_sink *self_comp_sink, const bt_value *params, +bt_component_class_initialize_method_status ctf_fs_sink_init( + bt_self_component_sink *self_comp_sink, + bt_self_component_sink_configuration *config, + const bt_value *params, void *init_method_data) { - bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK; + bt_component_class_initialize_method_status status; + bt_self_component_add_port_status add_port_status; struct fs_sink_comp *fs_sink = NULL; bt_self_component *self_comp = bt_self_component_sink_as_self_component(self_comp_sink); @@ -176,17 +158,19 @@ bt_self_component_status ctf_fs_sink_init( fs_sink = g_new0(struct fs_sink_comp, 1); if (!fs_sink) { - BT_LOG_WRITE_CUR_LVL(BT_LOG_ERROR, log_level, BT_LOG_TAG, + BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR, log_level, self_comp, "Failed to allocate one CTF FS sink structure."); - status = BT_SELF_COMPONENT_STATUS_NOMEM; + BT_CURRENT_THREAD_ERROR_APPEND_CAUSE_FROM_COMPONENT( + self_comp, "Failed to allocate one CTF FS sink structure."); + status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; goto end; } fs_sink->log_level = log_level; + fs_sink->self_comp = self_comp; fs_sink->output_dir_path = g_string_new(NULL); - fs_sink->self_comp = self_comp_sink; status = configure_component(fs_sink, params); - if (status != BT_SELF_COMPONENT_STATUS_OK) { + if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) { /* configure_component() logs errors */ goto end; } @@ -194,14 +178,15 @@ bt_self_component_status ctf_fs_sink_init( if (fs_sink->assume_single_trace && g_file_test(fs_sink->output_dir_path->str, G_FILE_TEST_EXISTS)) { - BT_LOGE("Single trace mode, but output path exists: " - "output-path=\"%s\"", fs_sink->output_dir_path->str); - status = BT_SELF_COMPONENT_STATUS_ERROR; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Single trace mode, but output path exists: output-path=\"%s\"", + fs_sink->output_dir_path->str); + status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR; goto end; } status = ensure_output_dir_exists(fs_sink); - if (status != BT_SELF_COMPONENT_STATUS_OK) { + if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) { /* ensure_output_dir_exists() logs errors */ goto end; } @@ -209,21 +194,23 @@ bt_self_component_status ctf_fs_sink_init( fs_sink->traces = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) fs_sink_trace_destroy); if (!fs_sink->traces) { - BT_LOGE_STR("Failed to allocate one GHashTable."); - status = BT_SELF_COMPONENT_STATUS_NOMEM; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate one GHashTable."); + status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; goto end; } - status = bt_self_component_sink_add_input_port(self_comp_sink, - in_port_name, NULL, NULL); - if (status != BT_SELF_COMPONENT_STATUS_OK) { + add_port_status = bt_self_component_sink_add_input_port( + self_comp_sink, in_port_name, NULL, NULL); + if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) { + status = (int) add_port_status; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to add input port."); goto end; } bt_self_component_set_data(self_comp, fs_sink); end: - if (status != BT_SELF_COMPONENT_STATUS_OK) { + if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) { destroy_fs_sink_comp(fs_sink); } @@ -242,7 +229,8 @@ struct fs_sink_stream *borrow_stream(struct fs_sink_comp *fs_sink, if (G_UNLIKELY(!trace)) { if (fs_sink->assume_single_trace && g_hash_table_size(fs_sink->traces) > 0) { - BT_LOGE("Single trace mode, but getting more than one trace: " + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Single trace mode, but getting more than one trace: " "stream-name=\"%s\"", bt_stream_get_name(ir_stream)); goto end; @@ -267,11 +255,12 @@ end: } static inline -bt_self_component_status handle_event_msg(struct fs_sink_comp *fs_sink, - const bt_message *msg) +bt_component_class_sink_consume_method_status handle_event_msg( + struct fs_sink_comp *fs_sink, const bt_message *msg) { int ret; - bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK; + bt_component_class_sink_consume_method_status status = + BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK; const bt_event *ir_event = bt_message_event_borrow_event_const(msg); const bt_stream *ir_stream = bt_event_borrow_stream_const(ir_event); struct fs_sink_stream *stream; @@ -280,28 +269,73 @@ bt_self_component_status handle_event_msg(struct fs_sink_comp *fs_sink, stream = borrow_stream(fs_sink, ir_stream); if (G_UNLIKELY(!stream)) { - status = BT_SELF_COMPONENT_STATUS_ERROR; + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Failed to borrow stream."); + status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; goto end; } - ret = try_translate_event_class_trace_ir_to_ctf_ir(stream->sc, - bt_event_borrow_class_const(ir_event), &ec, - fs_sink->log_level); + ret = try_translate_event_class_trace_ir_to_ctf_ir(fs_sink, + stream->sc, bt_event_borrow_class_const(ir_event), &ec); if (ret) { - status = BT_SELF_COMPONENT_STATUS_ERROR; + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Failed to translate event class to CTF IR."); + status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; goto end; } - BT_ASSERT(ec); + BT_ASSERT_DBG(ec); if (stream->sc->default_clock_class) { cs = bt_message_event_borrow_default_clock_snapshot_const( msg); } + /* + * If this event's stream does not support packets, then we + * lazily create artificial packets. + * + * The size of an artificial packet is arbitrarily at least + * 4 MiB (it usually is greater because we close it when + * comes the time to write a new event and the packet's content + * size is >= 4 MiB), except the last one which can be smaller. + */ + if (G_UNLIKELY(!stream->sc->has_packets)) { + if (stream->packet_state.is_open && + bt_ctfser_get_offset_in_current_packet_bits(&stream->ctfser) / 8 >= + 4 * 1024 * 1024) { + /* + * Stream's current packet is larger than 4 MiB: + * close it. A new packet will be opened just + * below. + */ + ret = fs_sink_stream_close_packet(stream, NULL); + if (ret) { + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Failed to close packet."); + status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; + goto end; + } + } + + if (!stream->packet_state.is_open) { + /* Stream's packet is not currently opened: open it */ + ret = fs_sink_stream_open_packet(stream, NULL, NULL); + if (ret) { + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Failed to open packet."); + status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; + goto end; + } + } + } + + BT_ASSERT_DBG(stream->packet_state.is_open); ret = fs_sink_stream_write_event(stream, cs, ir_event, ec); if (G_UNLIKELY(ret)) { - status = BT_SELF_COMPONENT_STATUS_ERROR; + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Failed to write event."); + status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; goto end; } @@ -310,11 +344,12 @@ end: } static inline -bt_self_component_status handle_packet_beginning_msg( +bt_component_class_sink_consume_method_status handle_packet_beginning_msg( struct fs_sink_comp *fs_sink, const bt_message *msg) { int ret; - bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK; + bt_component_class_sink_consume_method_status status = + BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK; const bt_packet *ir_packet = bt_message_packet_beginning_borrow_packet_const(msg); const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet); @@ -323,7 +358,9 @@ bt_self_component_status handle_packet_beginning_msg( stream = borrow_stream(fs_sink, ir_stream); if (G_UNLIKELY(!stream)) { - status = BT_SELF_COMPONENT_STATUS_ERROR; + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Failed to borrow stream."); + status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; goto end; } @@ -374,7 +411,8 @@ bt_self_component_status handle_packet_beginning_msg( if (stream->discarded_events_state.beginning_cs != expected_cs) { - BT_LOGE("Incompatible discarded events message: " + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Incompatible discarded events message: " "unexpected beginning time: " "beginning-cs-val=%" PRIu64 ", " "expected-beginning-cs-val=%" PRIu64 ", " @@ -387,7 +425,7 @@ bt_self_component_status handle_packet_beginning_msg( bt_trace_get_name( bt_stream_borrow_trace_const(ir_stream)), stream->trace->path->str, stream->file_name->str); - status = BT_SELF_COMPONENT_STATUS_ERROR; + status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; goto end; } } @@ -424,8 +462,9 @@ bt_self_component_status handle_packet_beginning_msg( * this case. */ if (stream->prev_packet_state.end_cs == UINT64_C(-1)) { - BT_LOGE("Incompatible discarded packets message " - "occuring before the stream's first packet: " + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Incompatible discarded packets message " + "occurring before the stream's first packet: " "stream-id=%" PRIu64 ", stream-name=\"%s\", " "trace-name=\"%s\", path=\"%s/%s\"", bt_stream_get_id(ir_stream), @@ -433,13 +472,14 @@ bt_self_component_status handle_packet_beginning_msg( bt_trace_get_name( bt_stream_borrow_trace_const(ir_stream)), stream->trace->path->str, stream->file_name->str); - status = BT_SELF_COMPONENT_STATUS_ERROR; + status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; goto end; } if (stream->discarded_packets_state.beginning_cs != stream->prev_packet_state.end_cs) { - BT_LOGE("Incompatible discarded packets message: " + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Incompatible discarded packets message: " "unexpected beginning time: " "beginning-cs-val=%" PRIu64 ", " "expected-beginning-cs-val=%" PRIu64 ", " @@ -452,7 +492,7 @@ bt_self_component_status handle_packet_beginning_msg( bt_trace_get_name( bt_stream_borrow_trace_const(ir_stream)), stream->trace->path->str, stream->file_name->str); - status = BT_SELF_COMPONENT_STATUS_ERROR; + status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; goto end; } @@ -460,7 +500,8 @@ bt_self_component_status handle_packet_beginning_msg( if (stream->discarded_packets_state.end_cs != expected_end_cs) { - BT_LOGE("Incompatible discarded packets message: " + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Incompatible discarded packets message: " "unexpected end time: " "end-cs-val=%" PRIu64 ", " "expected-end-cs-val=%" PRIu64 ", " @@ -473,7 +514,7 @@ bt_self_component_status handle_packet_beginning_msg( bt_trace_get_name( bt_stream_borrow_trace_const(ir_stream)), stream->trace->path->str, stream->file_name->str); - status = BT_SELF_COMPONENT_STATUS_ERROR; + status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; goto end; } } @@ -488,7 +529,9 @@ bt_self_component_status handle_packet_beginning_msg( ret = fs_sink_stream_open_packet(stream, cs, ir_packet); if (ret) { - status = BT_SELF_COMPONENT_STATUS_ERROR; + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Failed to open packet."); + status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; goto end; } @@ -497,11 +540,12 @@ end: } static inline -bt_self_component_status handle_packet_end_msg( +bt_component_class_sink_consume_method_status handle_packet_end_msg( struct fs_sink_comp *fs_sink, const bt_message *msg) { int ret; - bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK; + bt_component_class_sink_consume_method_status status = + BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK; const bt_packet *ir_packet = bt_message_packet_end_borrow_packet_const(msg); const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet); @@ -510,7 +554,9 @@ bt_self_component_status handle_packet_end_msg( stream = borrow_stream(fs_sink, ir_stream); if (G_UNLIKELY(!stream)) { - status = BT_SELF_COMPONENT_STATUS_ERROR; + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Failed to borrow stream."); + status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; goto end; } @@ -549,7 +595,8 @@ bt_self_component_status handle_packet_end_msg( expected_cs = bt_clock_snapshot_get_value(cs); if (stream->discarded_events_state.end_cs != expected_cs) { - BT_LOGE("Incompatible discarded events message: " + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Incompatible discarded events message: " "unexpected end time: " "end-cs-val=%" PRIu64 ", " "expected-end-cs-val=%" PRIu64 ", " @@ -562,14 +609,16 @@ bt_self_component_status handle_packet_end_msg( bt_trace_get_name( bt_stream_borrow_trace_const(ir_stream)), stream->trace->path->str, stream->file_name->str); - status = BT_SELF_COMPONENT_STATUS_ERROR; + status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; goto end; } } ret = fs_sink_stream_close_packet(stream, cs); if (ret) { - status = BT_SELF_COMPONENT_STATUS_ERROR; + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Failed to close packet."); + status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; goto end; } @@ -586,10 +635,11 @@ end: } static inline -bt_self_component_status handle_stream_beginning_msg( +bt_component_class_sink_consume_method_status handle_stream_beginning_msg( struct fs_sink_comp *fs_sink, const bt_message *msg) { - bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK; + bt_component_class_sink_consume_method_status status = + BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK; const bt_stream *ir_stream = bt_message_stream_beginning_borrow_stream_const(msg); const bt_stream_class *ir_sc = @@ -599,6 +649,30 @@ bt_self_component_status handle_stream_beginning_msg( bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc) && bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc); + /* + * Not supported: discarded events or discarded packets support + * without packets support. Packets are the way to know where + * discarded events/packets occurred in CTF 1.8. + */ + if (!bt_stream_class_supports_packets(ir_sc)) { + BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc)); + + if (!fs_sink->ignore_discarded_events && + bt_stream_class_supports_discarded_events(ir_sc)) { + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Unsupported stream: " + "stream does not support packets, " + "but supports discarded events: " + "stream-addr=%p, " + "stream-id=%" PRIu64 ", " + "stream-name=\"%s\"", + ir_stream, bt_stream_get_id(ir_stream), + bt_stream_get_name(ir_stream)); + status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; + goto end; + } + } + /* * Not supported: discarded events with default clock snapshots, * but packet beginning/end without default clock snapshot. @@ -606,7 +680,8 @@ bt_self_component_status handle_stream_beginning_msg( if (!fs_sink->ignore_discarded_events && bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc) && !packets_have_beginning_end_cs) { - BT_LOGE("Unsupported stream: discarded events have " + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Unsupported stream: discarded events have " "default clock snapshots, but packets have no " "beginning and/or end default clock snapshots: " "stream-addr=%p, " @@ -614,7 +689,7 @@ bt_self_component_status handle_stream_beginning_msg( "stream-name=\"%s\"", ir_stream, bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream)); - status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; + status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; goto end; } @@ -626,7 +701,8 @@ bt_self_component_status handle_stream_beginning_msg( if (!fs_sink->ignore_discarded_packets && bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc) && !packets_have_beginning_end_cs) { - BT_LOGE("Unsupported stream: discarded packets have " + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Unsupported stream: discarded packets have " "default clock snapshots, but packets have no " "beginning and/or end default clock snapshots: " "stream-addr=%p, " @@ -634,17 +710,19 @@ bt_self_component_status handle_stream_beginning_msg( "stream-name=\"%s\"", ir_stream, bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream)); - status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; + status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; goto end; } stream = borrow_stream(fs_sink, ir_stream); if (!stream) { - status = BT_SELF_COMPONENT_STATUS_ERROR; + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Failed to borrow stream."); + status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; goto end; } - BT_LOGI("Created new, empty stream file: " + BT_COMP_LOGI("Created new, empty stream file: " "stream-id=%" PRIu64 ", stream-name=\"%s\", " "trace-name=\"%s\", path=\"%s/%s\"", bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream), @@ -656,21 +734,37 @@ end: } static inline -bt_self_component_status handle_stream_end_msg(struct fs_sink_comp *fs_sink, - const bt_message *msg) +bt_component_class_sink_consume_method_status handle_stream_end_msg( + struct fs_sink_comp *fs_sink, const bt_message *msg) { - bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK; + bt_component_class_sink_consume_method_status status = + BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK; const bt_stream *ir_stream = bt_message_stream_end_borrow_stream_const(msg); struct fs_sink_stream *stream; stream = borrow_stream(fs_sink, ir_stream); if (!stream) { - status = BT_SELF_COMPONENT_STATUS_ERROR; + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Failed to borrow stream."); + status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; goto end; } - BT_LOGI("Closing stream file: " + if (G_UNLIKELY(!stream->sc->has_packets && + stream->packet_state.is_open)) { + /* Close stream's current artificial packet */ + int ret = fs_sink_stream_close_packet(stream, NULL); + + if (ret) { + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Failed to close packet."); + status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; + goto end; + } + } + + BT_COMP_LOGI("Closing stream file: " "stream-id=%" PRIu64 ", stream-name=\"%s\", " "trace-name=\"%s\", path=\"%s/%s\"", bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream), @@ -688,10 +782,11 @@ end: } static inline -bt_self_component_status handle_discarded_events_msg( +bt_component_class_sink_consume_method_status handle_discarded_events_msg( struct fs_sink_comp *fs_sink, const bt_message *msg) { - bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK; + bt_component_class_sink_consume_method_status status = + BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK; const bt_stream *ir_stream = bt_message_discarded_events_borrow_stream_const(msg); struct fs_sink_stream *stream; @@ -701,12 +796,14 @@ bt_self_component_status handle_discarded_events_msg( stream = borrow_stream(fs_sink, ir_stream); if (!stream) { - status = BT_SELF_COMPONENT_STATUS_ERROR; + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Failed to borrow stream."); + status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; goto end; } if (fs_sink->ignore_discarded_events) { - BT_LOGI("Ignoring discarded events message: " + BT_COMP_LOGI("Ignoring discarded events message: " "stream-id=%" PRIu64 ", stream-name=\"%s\", " "trace-name=\"%s\", path=\"%s/%s\"", bt_stream_get_id(ir_stream), @@ -718,7 +815,8 @@ bt_self_component_status handle_discarded_events_msg( } if (stream->discarded_events_state.in_range) { - BT_LOGE("Unsupported contiguous discarded events message: " + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Unsupported contiguous discarded events message: " "stream-id=%" PRIu64 ", stream-name=\"%s\", " "trace-name=\"%s\", path=\"%s/%s\"", bt_stream_get_id(ir_stream), @@ -726,7 +824,7 @@ bt_self_component_status handle_discarded_events_msg( bt_trace_get_name( bt_stream_borrow_trace_const(ir_stream)), stream->trace->path->str, stream->file_name->str); - status = BT_SELF_COMPONENT_STATUS_ERROR; + status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; goto end; } @@ -740,8 +838,9 @@ bt_self_component_status handle_discarded_events_msg( */ if (stream->packet_state.is_open && stream->sc->discarded_events_has_ts) { - BT_LOGE("Unsupported discarded events message with " - "default clock snapshots occuring within a packet: " + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Unsupported discarded events message with " + "default clock snapshots occurring within a packet: " "stream-id=%" PRIu64 ", stream-name=\"%s\", " "trace-name=\"%s\", path=\"%s/%s\"", bt_stream_get_id(ir_stream), @@ -749,7 +848,7 @@ bt_self_component_status handle_discarded_events_msg( bt_trace_get_name( bt_stream_borrow_trace_const(ir_stream)), stream->trace->path->str, stream->file_name->str); - status = BT_SELF_COMPONENT_STATUS_ERROR; + status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; goto end; } @@ -795,10 +894,11 @@ end: } static inline -bt_self_component_status handle_discarded_packets_msg( +bt_component_class_sink_consume_method_status handle_discarded_packets_msg( struct fs_sink_comp *fs_sink, const bt_message *msg) { - bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK; + bt_component_class_sink_consume_method_status status = + BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK; const bt_stream *ir_stream = bt_message_discarded_packets_borrow_stream_const(msg); struct fs_sink_stream *stream; @@ -808,12 +908,14 @@ bt_self_component_status handle_discarded_packets_msg( stream = borrow_stream(fs_sink, ir_stream); if (!stream) { - status = BT_SELF_COMPONENT_STATUS_ERROR; + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Failed to borrow stream."); + status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; goto end; } if (fs_sink->ignore_discarded_packets) { - BT_LOGI("Ignoring discarded packets message: " + BT_COMP_LOGI("Ignoring discarded packets message: " "stream-id=%" PRIu64 ", stream-name=\"%s\", " "trace-name=\"%s\", path=\"%s/%s\"", bt_stream_get_id(ir_stream), @@ -825,7 +927,8 @@ bt_self_component_status handle_discarded_packets_msg( } if (stream->discarded_packets_state.in_range) { - BT_LOGE("Unsupported contiguous discarded packets message: " + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Unsupported contiguous discarded packets message: " "stream-id=%" PRIu64 ", stream-name=\"%s\", " "trace-name=\"%s\", path=\"%s/%s\"", bt_stream_get_id(ir_stream), @@ -833,7 +936,7 @@ bt_self_component_status handle_discarded_packets_msg( bt_trace_get_name( bt_stream_borrow_trace_const(ir_stream)), stream->trace->path->str, stream->file_name->str); - status = BT_SELF_COMPONENT_STATUS_ERROR; + status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR; goto end; } @@ -895,36 +998,40 @@ void put_messages(bt_message_array_const msgs, uint64_t count) } BT_HIDDEN -bt_self_component_status ctf_fs_sink_consume(bt_self_component_sink *self_comp) +bt_component_class_sink_consume_method_status ctf_fs_sink_consume( + bt_self_component_sink *self_comp) { - bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK; + bt_component_class_sink_consume_method_status status = + BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK; struct fs_sink_comp *fs_sink; - bt_message_iterator_status it_status; + bt_message_iterator_next_status next_status; uint64_t msg_count = 0; bt_message_array_const msgs; fs_sink = bt_self_component_get_data( bt_self_component_sink_as_self_component(self_comp)); - BT_ASSERT(fs_sink); - BT_ASSERT(fs_sink->upstream_iter); + BT_ASSERT_DBG(fs_sink); + BT_ASSERT_DBG(fs_sink->upstream_iter); /* Consume messages */ - it_status = bt_self_component_port_input_message_iterator_next( + next_status = bt_message_iterator_next( fs_sink->upstream_iter, &msgs, &msg_count); - if (it_status < 0) { - status = BT_SELF_COMPONENT_STATUS_ERROR; + if (next_status < 0) { + status = (int) next_status; + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Failed to get next message from upstream iterator."); goto end; } - switch (it_status) { - case BT_MESSAGE_ITERATOR_STATUS_OK: + switch (next_status) { + case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK: { uint64_t i; for (i = 0; i < msg_count; i++) { const bt_message *msg = msgs[i]; - BT_ASSERT(msg); + BT_ASSERT_DBG(msg); switch (bt_message_get_type(msg)) { case BT_MESSAGE_TYPE_EVENT: @@ -940,7 +1047,7 @@ bt_self_component_status ctf_fs_sink_consume(bt_self_component_sink *self_comp) break; case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY: /* Ignore */ - BT_LOGD_STR("Ignoring message iterator inactivity message."); + BT_COMP_LOGD_STR("Ignoring message iterator inactivity message."); break; case BT_MESSAGE_TYPE_STREAM_BEGINNING: status = handle_stream_beginning_msg( @@ -950,11 +1057,6 @@ bt_self_component_status ctf_fs_sink_consume(bt_self_component_sink *self_comp) status = handle_stream_end_msg( fs_sink, msg); break; - case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING: - case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END: - /* Not supported by CTF 1.8 */ - BT_LOGD_STR("Ignoring stream activity message."); - break; case BT_MESSAGE_TYPE_DISCARDED_EVENTS: status = handle_discarded_events_msg( fs_sink, msg); @@ -964,13 +1066,14 @@ bt_self_component_status ctf_fs_sink_consume(bt_self_component_sink *self_comp) fs_sink, msg); break; default: - abort(); + bt_common_abort(); } BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]); - if (status != BT_SELF_COMPONENT_STATUS_OK) { - BT_LOGE("Failed to handle message: " + if (status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK) { + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Failed to handle message: " "generated CTF traces could be incomplete: " "output-dir-path=\"%s\"", fs_sink->output_dir_path->str); @@ -980,18 +1083,12 @@ bt_self_component_status ctf_fs_sink_consume(bt_self_component_sink *self_comp) break; } - case BT_MESSAGE_ITERATOR_STATUS_AGAIN: - status = BT_SELF_COMPONENT_STATUS_AGAIN; + case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN: + status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN; break; - case BT_MESSAGE_ITERATOR_STATUS_END: + case BT_MESSAGE_ITERATOR_NEXT_STATUS_END: /* TODO: Finalize all traces (should already be done?) */ - status = BT_SELF_COMPONENT_STATUS_END; - break; - case BT_MESSAGE_ITERATOR_STATUS_NOMEM: - status = BT_SELF_COMPONENT_STATUS_NOMEM; - break; - case BT_MESSAGE_ITERATOR_STATUS_ERROR: - status = BT_SELF_COMPONENT_STATUS_NOMEM; + status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END; break; default: break; @@ -1000,7 +1097,7 @@ bt_self_component_status ctf_fs_sink_consume(bt_self_component_sink *self_comp) goto end; error: - BT_ASSERT(status != BT_SELF_COMPONENT_STATUS_OK); + BT_ASSERT(status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK); put_messages(msgs, msg_count); end: @@ -1008,22 +1105,29 @@ end: } BT_HIDDEN -bt_self_component_status ctf_fs_sink_graph_is_configured( +bt_component_class_sink_graph_is_configured_method_status +ctf_fs_sink_graph_is_configured( bt_self_component_sink *self_comp) { - bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK; + bt_component_class_sink_graph_is_configured_method_status status; + bt_message_iterator_create_from_sink_component_status + msg_iter_status; struct fs_sink_comp *fs_sink = bt_self_component_get_data( bt_self_component_sink_as_self_component(self_comp)); - fs_sink->upstream_iter = - bt_self_component_port_input_message_iterator_create( + msg_iter_status = + bt_message_iterator_create_from_sink_component( + self_comp, bt_self_component_sink_borrow_input_port_by_name( - self_comp, in_port_name)); - if (!fs_sink->upstream_iter) { - status = BT_SELF_COMPONENT_STATUS_NOMEM; + self_comp, in_port_name), &fs_sink->upstream_iter); + if (msg_iter_status != BT_MESSAGE_ITERATOR_CREATE_FROM_SINK_COMPONENT_STATUS_OK) { + status = (int) msg_iter_status; + BT_COMP_LOGE_APPEND_CAUSE(fs_sink->self_comp, + "Failed to create upstream iterator."); goto end; } + status = BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK; end: return status; }