Move to kernel style SPDX license identifiers
[babeltrace.git] / src / plugins / ctf / fs-sink / fs-sink.c
index 7ecc7875296844e70b0e2a0d97adfde150138d1e..235ff8de65bd6d7b520bade6fc2df63884002813 100644 (file)
@@ -1,27 +1,13 @@
 /*
- * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
+ * SPDX-License-Identifier: MIT
  *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
+ * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
  */
 
-#define BT_LOG_TAG "PLUGIN-CTF-FS-SINK"
-#include "logging.h"
+#define BT_COMP_LOG_SELF_COMP (fs_sink->self_comp)
+#define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level)
+#define BT_LOG_TAG "PLUGIN/SINK.CTF.FS"
+#include "logging/comp-logging.h"
 
 #include <babeltrace2/babeltrace.h>
 #include <stdio.h>
@@ -29,6 +15,7 @@
 #include <glib.h>
 #include "common/assert.h"
 #include "ctfser/ctfser.h"
+#include "plugins/common/param-validation/param-validation.h"
 
 #include "fs-sink.h"
 #include "fs-sink-trace.h"
@@ -41,18 +28,20 @@ static
 const char * const in_port_name = "in";
 
 static
-bt_self_component_status ensure_output_dir_exists(
+bt_component_class_initialize_method_status ensure_output_dir_exists(
                struct fs_sink_comp *fs_sink)
 {
-       bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
+       bt_component_class_initialize_method_status status =
+               BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
        int ret;
 
        ret = g_mkdir_with_parents(fs_sink->output_dir_path->str, 0755);
        if (ret) {
-               BT_LOGE_ERRNO("Cannot create directories for output directory",
+               BT_COMP_LOGE_ERRNO(
+                       "Cannot create directories for output directory",
                        ": output-dir-path=\"%s\"",
                        fs_sink->output_dir_path->str);
-               status = BT_SELF_COMPONENT_STATUS_ERROR;
+               status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
                goto end;
        }
 
@@ -60,49 +49,47 @@ end:
        return status;
 }
 
+static struct bt_param_validation_map_value_entry_descr fs_sink_params_descr[] = {
+       { "path", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY, { .type = BT_VALUE_TYPE_STRING } },
+       { "assume-single-trace", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .type = BT_VALUE_TYPE_BOOL } },
+       { "ignore-discarded-events", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .type = BT_VALUE_TYPE_BOOL } },
+       { "ignore-discarded-packets", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .type = BT_VALUE_TYPE_BOOL } },
+       { "quiet", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .type = BT_VALUE_TYPE_BOOL } },
+       BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END
+};
+
 static
-bt_self_component_status configure_component(struct fs_sink_comp *fs_sink,
-               const bt_value *params)
+bt_component_class_initialize_method_status
+configure_component(struct fs_sink_comp *fs_sink, const bt_value *params)
 {
-       bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
+       bt_component_class_initialize_method_status status;
        const bt_value *value;
+       enum bt_param_validation_status validation_status;
+       gchar *validation_error;
 
-       value = bt_value_map_borrow_entry_value_const(params, "path");
-       if (!value) {
-               BT_LOGE_STR("Missing mandatory `path` parameter.");
-               status = BT_SELF_COMPONENT_STATUS_ERROR;
+       validation_status = bt_param_validation_validate(params,
+               fs_sink_params_descr, &validation_error);
+       if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
+               status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
                goto end;
-       }
-
-       if (!bt_value_is_string(value)) {
-               BT_LOGE_STR("`path` parameter: expecting a string.");
-               status = BT_SELF_COMPONENT_STATUS_ERROR;
+       } else if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
+               status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
                goto end;
        }
 
+       value = bt_value_map_borrow_entry_value_const(params, "path");
        g_string_assign(fs_sink->output_dir_path,
                bt_value_string_get(value));
+
        value = bt_value_map_borrow_entry_value_const(params,
                "assume-single-trace");
        if (value) {
-               if (!bt_value_is_bool(value)) {
-                       BT_LOGE_STR("`assume-single-trace` parameter: expecting a boolean.");
-                       status = BT_SELF_COMPONENT_STATUS_ERROR;
-                       goto end;
-               }
-
                fs_sink->assume_single_trace = (bool) bt_value_bool_get(value);
        }
 
        value = bt_value_map_borrow_entry_value_const(params,
                "ignore-discarded-events");
        if (value) {
-               if (!bt_value_is_bool(value)) {
-                       BT_LOGE_STR("`ignore-discarded-events` parameter: expecting a boolean.");
-                       status = BT_SELF_COMPONENT_STATUS_ERROR;
-                       goto end;
-               }
-
                fs_sink->ignore_discarded_events =
                        (bool) bt_value_bool_get(value);
        }
@@ -110,12 +97,6 @@ bt_self_component_status configure_component(struct fs_sink_comp *fs_sink,
        value = bt_value_map_borrow_entry_value_const(params,
                "ignore-discarded-packets");
        if (value) {
-               if (!bt_value_is_bool(value)) {
-                       BT_LOGE_STR("`ignore-discarded-packets` parameter: expecting a boolean.");
-                       status = BT_SELF_COMPONENT_STATUS_ERROR;
-                       goto end;
-               }
-
                fs_sink->ignore_discarded_packets =
                        (bool) bt_value_bool_get(value);
        }
@@ -123,16 +104,13 @@ bt_self_component_status configure_component(struct fs_sink_comp *fs_sink,
        value = bt_value_map_borrow_entry_value_const(params,
                "quiet");
        if (value) {
-               if (!bt_value_is_bool(value)) {
-                       BT_LOGE_STR("`quiet` parameter: expecting a boolean.");
-                       status = BT_SELF_COMPONENT_STATUS_ERROR;
-                       goto end;
-               }
-
                fs_sink->quiet = (bool) bt_value_bool_get(value);
        }
 
+       status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
+
 end:
+       g_free(validation_error);
        return status;
 }
 
@@ -153,7 +131,7 @@ void destroy_fs_sink_comp(struct fs_sink_comp *fs_sink)
                fs_sink->traces = NULL;
        }
 
-       BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(
+       BT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(
                fs_sink->upstream_iter);
        g_free(fs_sink);
 
@@ -162,24 +140,34 @@ end:
 }
 
 BT_HIDDEN
-bt_self_component_status ctf_fs_sink_init(
-               bt_self_component_sink *self_comp, const bt_value *params,
+bt_component_class_initialize_method_status ctf_fs_sink_init(
+               bt_self_component_sink *self_comp_sink,
+               bt_self_component_sink_configuration *config,
+               const bt_value *params,
                void *init_method_data)
 {
-       bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
+       bt_component_class_initialize_method_status status =
+               BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
+       bt_self_component_add_port_status add_port_status;
        struct fs_sink_comp *fs_sink = NULL;
+       bt_self_component *self_comp =
+               bt_self_component_sink_as_self_component(self_comp_sink);
+       bt_logging_level log_level = bt_component_get_logging_level(
+               bt_self_component_as_component(self_comp));
 
        fs_sink = g_new0(struct fs_sink_comp, 1);
        if (!fs_sink) {
-               BT_LOGE_STR("Failed to allocate one CTF FS sink structure.");
-               status = BT_SELF_COMPONENT_STATUS_NOMEM;
+               BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR, log_level, self_comp,
+                       "Failed to allocate one CTF FS sink structure.");
+               status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
                goto end;
        }
 
-       fs_sink->output_dir_path = g_string_new(NULL);
+       fs_sink->log_level = log_level;
        fs_sink->self_comp = self_comp;
+       fs_sink->output_dir_path = g_string_new(NULL);
        status = configure_component(fs_sink, params);
-       if (status != BT_SELF_COMPONENT_STATUS_OK) {
+       if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
                /* configure_component() logs errors */
                goto end;
        }
@@ -187,14 +175,14 @@ bt_self_component_status ctf_fs_sink_init(
        if (fs_sink->assume_single_trace &&
                        g_file_test(fs_sink->output_dir_path->str,
                                G_FILE_TEST_EXISTS)) {
-               BT_LOGE("Single trace mode, but output path exists: "
+               BT_COMP_LOGE("Single trace mode, but output path exists: "
                        "output-path=\"%s\"", fs_sink->output_dir_path->str);
-               status = BT_SELF_COMPONENT_STATUS_ERROR;
+               status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
                goto end;
        }
 
        status = ensure_output_dir_exists(fs_sink);
-       if (status != BT_SELF_COMPONENT_STATUS_OK) {
+       if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
                /* ensure_output_dir_exists() logs errors */
                goto end;
        }
@@ -202,22 +190,28 @@ bt_self_component_status ctf_fs_sink_init(
        fs_sink->traces = g_hash_table_new_full(g_direct_hash, g_direct_equal,
                NULL, (GDestroyNotify) fs_sink_trace_destroy);
        if (!fs_sink->traces) {
-               BT_LOGE_STR("Failed to allocate one GHashTable.");
-               status = BT_SELF_COMPONENT_STATUS_NOMEM;
+               BT_COMP_LOGE_STR("Failed to allocate one GHashTable.");
+               status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
                goto end;
        }
 
-       status = bt_self_component_sink_add_input_port(self_comp, in_port_name,
-               NULL, NULL);
-       if (status != BT_SELF_COMPONENT_STATUS_OK) {
+       add_port_status = bt_self_component_sink_add_input_port(
+               self_comp_sink, in_port_name, NULL, NULL);
+       switch (add_port_status) {
+       case BT_SELF_COMPONENT_ADD_PORT_STATUS_ERROR:
+               status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
                goto end;
+       case BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR:
+               status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
+               goto end;
+       default:
+               break;
        }
 
-       bt_self_component_set_data(
-               bt_self_component_sink_as_self_component(self_comp), fs_sink);
+       bt_self_component_set_data(self_comp, fs_sink);
 
 end:
-       if (status != BT_SELF_COMPONENT_STATUS_OK) {
+       if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
                destroy_fs_sink_comp(fs_sink);
        }
 
@@ -236,7 +230,7 @@ struct fs_sink_stream *borrow_stream(struct fs_sink_comp *fs_sink,
        if (G_UNLIKELY(!trace)) {
                if (fs_sink->assume_single_trace &&
                                g_hash_table_size(fs_sink->traces) > 0) {
-                       BT_LOGE("Single trace mode, but getting more than one trace: "
+                       BT_COMP_LOGE("Single trace mode, but getting more than one trace: "
                                "stream-name=\"%s\"",
                                bt_stream_get_name(ir_stream));
                        goto end;
@@ -261,11 +255,12 @@ end:
 }
 
 static inline
-bt_self_component_status handle_event_msg(struct fs_sink_comp *fs_sink,
-               const bt_message *msg)
+bt_component_class_sink_consume_method_status handle_event_msg(
+               struct fs_sink_comp *fs_sink, const bt_message *msg)
 {
        int ret;
-       bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
+       bt_component_class_sink_consume_method_status status =
+               BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
        const bt_event *ir_event = bt_message_event_borrow_event_const(msg);
        const bt_stream *ir_stream = bt_event_borrow_stream_const(ir_event);
        struct fs_sink_stream *stream;
@@ -274,27 +269,63 @@ bt_self_component_status handle_event_msg(struct fs_sink_comp *fs_sink,
 
        stream = borrow_stream(fs_sink, ir_stream);
        if (G_UNLIKELY(!stream)) {
-               status = BT_SELF_COMPONENT_STATUS_ERROR;
+               status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
                goto end;
        }
 
-       ret = try_translate_event_class_trace_ir_to_ctf_ir(stream->sc,
-               bt_event_borrow_class_const(ir_event), &ec);
+       ret = try_translate_event_class_trace_ir_to_ctf_ir(fs_sink,
+               stream->sc, bt_event_borrow_class_const(ir_event), &ec);
        if (ret) {
-               status = BT_SELF_COMPONENT_STATUS_ERROR;
+               status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
                goto end;
        }
 
-       BT_ASSERT(ec);
+       BT_ASSERT_DBG(ec);
 
        if (stream->sc->default_clock_class) {
                cs = bt_message_event_borrow_default_clock_snapshot_const(
                        msg);
        }
 
+       /*
+        * If this event's stream does not support packets, then we
+        * lazily create artificial packets.
+        *
+        * The size of an artificial packet is arbitrarily at least
+        * 4 MiB (it usually is greater because we close it when
+        * comes the time to write a new event and the packet's content
+        * size is >= 4 MiB), except the last one which can be smaller.
+        */
+       if (G_UNLIKELY(!stream->sc->has_packets)) {
+               if (stream->packet_state.is_open &&
+                               bt_ctfser_get_offset_in_current_packet_bits(&stream->ctfser) / 8 >=
+                               4 * 1024 * 1024) {
+                       /*
+                        * Stream's current packet is larger than 4 MiB:
+                        * close it. A new packet will be opened just
+                        * below.
+                        */
+                       ret = fs_sink_stream_close_packet(stream, NULL);
+                       if (ret) {
+                               status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
+                               goto end;
+                       }
+               }
+
+               if (!stream->packet_state.is_open) {
+                       /* Stream's packet is not currently opened: open it */
+                       ret = fs_sink_stream_open_packet(stream, NULL, NULL);
+                       if (ret) {
+                               status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
+                               goto end;
+                       }
+               }
+       }
+
+       BT_ASSERT_DBG(stream->packet_state.is_open);
        ret = fs_sink_stream_write_event(stream, cs, ir_event, ec);
        if (G_UNLIKELY(ret)) {
-               status = BT_SELF_COMPONENT_STATUS_ERROR;
+               status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
                goto end;
        }
 
@@ -303,11 +334,12 @@ end:
 }
 
 static inline
-bt_self_component_status handle_packet_beginning_msg(
+bt_component_class_sink_consume_method_status handle_packet_beginning_msg(
                struct fs_sink_comp *fs_sink, const bt_message *msg)
 {
        int ret;
-       bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
+       bt_component_class_sink_consume_method_status status =
+               BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
        const bt_packet *ir_packet =
                bt_message_packet_beginning_borrow_packet_const(msg);
        const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
@@ -316,7 +348,7 @@ bt_self_component_status handle_packet_beginning_msg(
 
        stream = borrow_stream(fs_sink, ir_stream);
        if (G_UNLIKELY(!stream)) {
-               status = BT_SELF_COMPONENT_STATUS_ERROR;
+               status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
                goto end;
        }
 
@@ -367,7 +399,7 @@ bt_self_component_status handle_packet_beginning_msg(
 
                if (stream->discarded_events_state.beginning_cs !=
                                expected_cs) {
-                       BT_LOGE("Incompatible discarded events message: "
+                       BT_COMP_LOGE("Incompatible discarded events message: "
                                "unexpected beginning time: "
                                "beginning-cs-val=%" PRIu64 ", "
                                "expected-beginning-cs-val=%" PRIu64 ", "
@@ -380,7 +412,7 @@ bt_self_component_status handle_packet_beginning_msg(
                                bt_trace_get_name(
                                        bt_stream_borrow_trace_const(ir_stream)),
                                stream->trace->path->str, stream->file_name->str);
-                       status = BT_SELF_COMPONENT_STATUS_ERROR;
+                       status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
                        goto end;
                }
        }
@@ -417,8 +449,8 @@ bt_self_component_status handle_packet_beginning_msg(
                 * this case.
                 */
                if (stream->prev_packet_state.end_cs == UINT64_C(-1)) {
-                       BT_LOGE("Incompatible discarded packets message "
-                               "occuring before the stream's first packet: "
+                       BT_COMP_LOGE("Incompatible discarded packets message "
+                               "occurring before the stream's first packet: "
                                "stream-id=%" PRIu64 ", stream-name=\"%s\", "
                                "trace-name=\"%s\", path=\"%s/%s\"",
                                bt_stream_get_id(ir_stream),
@@ -426,13 +458,13 @@ bt_self_component_status handle_packet_beginning_msg(
                                bt_trace_get_name(
                                        bt_stream_borrow_trace_const(ir_stream)),
                                stream->trace->path->str, stream->file_name->str);
-                       status = BT_SELF_COMPONENT_STATUS_ERROR;
+                       status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
                        goto end;
                }
 
                if (stream->discarded_packets_state.beginning_cs !=
                                stream->prev_packet_state.end_cs) {
-                       BT_LOGE("Incompatible discarded packets message: "
+                       BT_COMP_LOGE("Incompatible discarded packets message: "
                                "unexpected beginning time: "
                                "beginning-cs-val=%" PRIu64 ", "
                                "expected-beginning-cs-val=%" PRIu64 ", "
@@ -445,7 +477,7 @@ bt_self_component_status handle_packet_beginning_msg(
                                bt_trace_get_name(
                                        bt_stream_borrow_trace_const(ir_stream)),
                                stream->trace->path->str, stream->file_name->str);
-                       status = BT_SELF_COMPONENT_STATUS_ERROR;
+                       status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
                        goto end;
                }
 
@@ -453,7 +485,7 @@ bt_self_component_status handle_packet_beginning_msg(
 
                if (stream->discarded_packets_state.end_cs !=
                                expected_end_cs) {
-                       BT_LOGE("Incompatible discarded packets message: "
+                       BT_COMP_LOGE("Incompatible discarded packets message: "
                                "unexpected end time: "
                                "end-cs-val=%" PRIu64 ", "
                                "expected-end-cs-val=%" PRIu64 ", "
@@ -466,7 +498,7 @@ bt_self_component_status handle_packet_beginning_msg(
                                bt_trace_get_name(
                                        bt_stream_borrow_trace_const(ir_stream)),
                                stream->trace->path->str, stream->file_name->str);
-                       status = BT_SELF_COMPONENT_STATUS_ERROR;
+                       status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
                        goto end;
                }
        }
@@ -481,7 +513,7 @@ bt_self_component_status handle_packet_beginning_msg(
 
        ret = fs_sink_stream_open_packet(stream, cs, ir_packet);
        if (ret) {
-               status = BT_SELF_COMPONENT_STATUS_ERROR;
+               status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
                goto end;
        }
 
@@ -490,11 +522,12 @@ end:
 }
 
 static inline
-bt_self_component_status handle_packet_end_msg(
+bt_component_class_sink_consume_method_status handle_packet_end_msg(
                struct fs_sink_comp *fs_sink, const bt_message *msg)
 {
        int ret;
-       bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
+       bt_component_class_sink_consume_method_status status =
+               BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
        const bt_packet *ir_packet =
                bt_message_packet_end_borrow_packet_const(msg);
        const bt_stream *ir_stream = bt_packet_borrow_stream_const(ir_packet);
@@ -503,7 +536,7 @@ bt_self_component_status handle_packet_end_msg(
 
        stream = borrow_stream(fs_sink, ir_stream);
        if (G_UNLIKELY(!stream)) {
-               status = BT_SELF_COMPONENT_STATUS_ERROR;
+               status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
                goto end;
        }
 
@@ -542,7 +575,7 @@ bt_self_component_status handle_packet_end_msg(
                expected_cs = bt_clock_snapshot_get_value(cs);
 
                if (stream->discarded_events_state.end_cs != expected_cs) {
-                       BT_LOGE("Incompatible discarded events message: "
+                       BT_COMP_LOGE("Incompatible discarded events message: "
                                "unexpected end time: "
                                "end-cs-val=%" PRIu64 ", "
                                "expected-end-cs-val=%" PRIu64 ", "
@@ -555,14 +588,14 @@ bt_self_component_status handle_packet_end_msg(
                                bt_trace_get_name(
                                        bt_stream_borrow_trace_const(ir_stream)),
                                stream->trace->path->str, stream->file_name->str);
-                       status = BT_SELF_COMPONENT_STATUS_ERROR;
+                       status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
                        goto end;
                }
        }
 
        ret = fs_sink_stream_close_packet(stream, cs);
        if (ret) {
-               status = BT_SELF_COMPONENT_STATUS_ERROR;
+               status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
                goto end;
        }
 
@@ -579,10 +612,11 @@ end:
 }
 
 static inline
-bt_self_component_status handle_stream_beginning_msg(
+bt_component_class_sink_consume_method_status handle_stream_beginning_msg(
                struct fs_sink_comp *fs_sink, const bt_message *msg)
 {
-       bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
+       bt_component_class_sink_consume_method_status status =
+               BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
        const bt_stream *ir_stream =
                bt_message_stream_beginning_borrow_stream_const(msg);
        const bt_stream_class *ir_sc =
@@ -592,6 +626,29 @@ bt_self_component_status handle_stream_beginning_msg(
                bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc) &&
                bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc);
 
+       /*
+        * Not supported: discarded events or discarded packets support
+        * without packets support. Packets are the way to know where
+        * discarded events/packets occurred in CTF 1.8.
+        */
+       if (!bt_stream_class_supports_packets(ir_sc)) {
+               BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc));
+
+               if (!fs_sink->ignore_discarded_events &&
+                               bt_stream_class_supports_discarded_events(ir_sc)) {
+                       BT_COMP_LOGE("Unsupported stream: "
+                               "stream does not support packets, "
+                               "but supports discarded events: "
+                               "stream-addr=%p, "
+                               "stream-id=%" PRIu64 ", "
+                               "stream-name=\"%s\"",
+                               ir_stream, bt_stream_get_id(ir_stream),
+                               bt_stream_get_name(ir_stream));
+                       status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
+                       goto end;
+               }
+       }
+
        /*
         * Not supported: discarded events with default clock snapshots,
         * but packet beginning/end without default clock snapshot.
@@ -599,7 +656,7 @@ bt_self_component_status handle_stream_beginning_msg(
        if (!fs_sink->ignore_discarded_events &&
                        bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc) &&
                        !packets_have_beginning_end_cs) {
-               BT_LOGE("Unsupported stream: discarded events have "
+               BT_COMP_LOGE("Unsupported stream: discarded events have "
                        "default clock snapshots, but packets have no "
                        "beginning and/or end default clock snapshots: "
                        "stream-addr=%p, "
@@ -607,7 +664,7 @@ bt_self_component_status handle_stream_beginning_msg(
                        "stream-name=\"%s\"",
                        ir_stream, bt_stream_get_id(ir_stream),
                        bt_stream_get_name(ir_stream));
-               status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+               status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
                goto end;
        }
 
@@ -619,7 +676,7 @@ bt_self_component_status handle_stream_beginning_msg(
        if (!fs_sink->ignore_discarded_packets &&
                        bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc) &&
                        !packets_have_beginning_end_cs) {
-               BT_LOGE("Unsupported stream: discarded packets have "
+               BT_COMP_LOGE("Unsupported stream: discarded packets have "
                        "default clock snapshots, but packets have no "
                        "beginning and/or end default clock snapshots: "
                        "stream-addr=%p, "
@@ -627,17 +684,17 @@ bt_self_component_status handle_stream_beginning_msg(
                        "stream-name=\"%s\"",
                        ir_stream, bt_stream_get_id(ir_stream),
                        bt_stream_get_name(ir_stream));
-               status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+               status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
                goto end;
        }
 
        stream = borrow_stream(fs_sink, ir_stream);
        if (!stream) {
-               status = BT_SELF_COMPONENT_STATUS_ERROR;
+               status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
                goto end;
        }
 
-       BT_LOGI("Created new, empty stream file: "
+       BT_COMP_LOGI("Created new, empty stream file: "
                "stream-id=%" PRIu64 ", stream-name=\"%s\", "
                "trace-name=\"%s\", path=\"%s/%s\"",
                bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
@@ -649,21 +706,33 @@ end:
 }
 
 static inline
-bt_self_component_status handle_stream_end_msg(struct fs_sink_comp *fs_sink,
-               const bt_message *msg)
+bt_component_class_sink_consume_method_status handle_stream_end_msg(
+               struct fs_sink_comp *fs_sink, const bt_message *msg)
 {
-       bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
+       bt_component_class_sink_consume_method_status status =
+               BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
        const bt_stream *ir_stream =
                bt_message_stream_end_borrow_stream_const(msg);
        struct fs_sink_stream *stream;
 
        stream = borrow_stream(fs_sink, ir_stream);
        if (!stream) {
-               status = BT_SELF_COMPONENT_STATUS_ERROR;
+               status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
                goto end;
        }
 
-       BT_LOGI("Closing stream file: "
+       if (G_UNLIKELY(!stream->sc->has_packets &&
+                       stream->packet_state.is_open)) {
+               /* Close stream's current artificial packet */
+               int ret = fs_sink_stream_close_packet(stream, NULL);
+
+               if (ret) {
+                       status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
+                       goto end;
+               }
+       }
+
+       BT_COMP_LOGI("Closing stream file: "
                "stream-id=%" PRIu64 ", stream-name=\"%s\", "
                "trace-name=\"%s\", path=\"%s/%s\"",
                bt_stream_get_id(ir_stream), bt_stream_get_name(ir_stream),
@@ -681,10 +750,11 @@ end:
 }
 
 static inline
-bt_self_component_status handle_discarded_events_msg(
+bt_component_class_sink_consume_method_status handle_discarded_events_msg(
                struct fs_sink_comp *fs_sink, const bt_message *msg)
 {
-       bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
+       bt_component_class_sink_consume_method_status status =
+               BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
        const bt_stream *ir_stream =
                bt_message_discarded_events_borrow_stream_const(msg);
        struct fs_sink_stream *stream;
@@ -694,12 +764,12 @@ bt_self_component_status handle_discarded_events_msg(
 
        stream = borrow_stream(fs_sink, ir_stream);
        if (!stream) {
-               status = BT_SELF_COMPONENT_STATUS_ERROR;
+               status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
                goto end;
        }
 
        if (fs_sink->ignore_discarded_events) {
-               BT_LOGI("Ignoring discarded events message: "
+               BT_COMP_LOGI("Ignoring discarded events message: "
                        "stream-id=%" PRIu64 ", stream-name=\"%s\", "
                        "trace-name=\"%s\", path=\"%s/%s\"",
                        bt_stream_get_id(ir_stream),
@@ -711,7 +781,7 @@ bt_self_component_status handle_discarded_events_msg(
        }
 
        if (stream->discarded_events_state.in_range) {
-               BT_LOGE("Unsupported contiguous discarded events message: "
+               BT_COMP_LOGE("Unsupported contiguous discarded events message: "
                        "stream-id=%" PRIu64 ", stream-name=\"%s\", "
                        "trace-name=\"%s\", path=\"%s/%s\"",
                        bt_stream_get_id(ir_stream),
@@ -719,7 +789,7 @@ bt_self_component_status handle_discarded_events_msg(
                        bt_trace_get_name(
                                bt_stream_borrow_trace_const(ir_stream)),
                        stream->trace->path->str, stream->file_name->str);
-               status = BT_SELF_COMPONENT_STATUS_ERROR;
+               status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
                goto end;
        }
 
@@ -733,8 +803,8 @@ bt_self_component_status handle_discarded_events_msg(
         */
        if (stream->packet_state.is_open &&
                        stream->sc->discarded_events_has_ts) {
-               BT_LOGE("Unsupported discarded events message with "
-                       "default clock snapshots occuring within a packet: "
+               BT_COMP_LOGE("Unsupported discarded events message with "
+                       "default clock snapshots occurring within a packet: "
                        "stream-id=%" PRIu64 ", stream-name=\"%s\", "
                        "trace-name=\"%s\", path=\"%s/%s\"",
                        bt_stream_get_id(ir_stream),
@@ -742,7 +812,7 @@ bt_self_component_status handle_discarded_events_msg(
                        bt_trace_get_name(
                                bt_stream_borrow_trace_const(ir_stream)),
                        stream->trace->path->str, stream->file_name->str);
-               status = BT_SELF_COMPONENT_STATUS_ERROR;
+               status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
                goto end;
        }
 
@@ -788,10 +858,11 @@ end:
 }
 
 static inline
-bt_self_component_status handle_discarded_packets_msg(
+bt_component_class_sink_consume_method_status handle_discarded_packets_msg(
                struct fs_sink_comp *fs_sink, const bt_message *msg)
 {
-       bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
+       bt_component_class_sink_consume_method_status status =
+               BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
        const bt_stream *ir_stream =
                bt_message_discarded_packets_borrow_stream_const(msg);
        struct fs_sink_stream *stream;
@@ -801,12 +872,12 @@ bt_self_component_status handle_discarded_packets_msg(
 
        stream = borrow_stream(fs_sink, ir_stream);
        if (!stream) {
-               status = BT_SELF_COMPONENT_STATUS_ERROR;
+               status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
                goto end;
        }
 
        if (fs_sink->ignore_discarded_packets) {
-               BT_LOGI("Ignoring discarded packets message: "
+               BT_COMP_LOGI("Ignoring discarded packets message: "
                        "stream-id=%" PRIu64 ", stream-name=\"%s\", "
                        "trace-name=\"%s\", path=\"%s/%s\"",
                        bt_stream_get_id(ir_stream),
@@ -818,7 +889,7 @@ bt_self_component_status handle_discarded_packets_msg(
        }
 
        if (stream->discarded_packets_state.in_range) {
-               BT_LOGE("Unsupported contiguous discarded packets message: "
+               BT_COMP_LOGE("Unsupported contiguous discarded packets message: "
                        "stream-id=%" PRIu64 ", stream-name=\"%s\", "
                        "trace-name=\"%s\", path=\"%s/%s\"",
                        bt_stream_get_id(ir_stream),
@@ -826,7 +897,7 @@ bt_self_component_status handle_discarded_packets_msg(
                        bt_trace_get_name(
                                bt_stream_borrow_trace_const(ir_stream)),
                        stream->trace->path->str, stream->file_name->str);
-               status = BT_SELF_COMPONENT_STATUS_ERROR;
+               status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR;
                goto end;
        }
 
@@ -888,36 +959,38 @@ void put_messages(bt_message_array_const msgs, uint64_t count)
 }
 
 BT_HIDDEN
-bt_self_component_status ctf_fs_sink_consume(bt_self_component_sink *self_comp)
+bt_component_class_sink_consume_method_status ctf_fs_sink_consume(
+               bt_self_component_sink *self_comp)
 {
-       bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
+       bt_component_class_sink_consume_method_status status =
+               BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK;
        struct fs_sink_comp *fs_sink;
-       bt_message_iterator_status it_status;
+       bt_message_iterator_next_status next_status;
        uint64_t msg_count = 0;
        bt_message_array_const msgs;
 
        fs_sink = bt_self_component_get_data(
                        bt_self_component_sink_as_self_component(self_comp));
-       BT_ASSERT(fs_sink);
-       BT_ASSERT(fs_sink->upstream_iter);
+       BT_ASSERT_DBG(fs_sink);
+       BT_ASSERT_DBG(fs_sink->upstream_iter);
 
        /* Consume messages */
-       it_status = bt_self_component_port_input_message_iterator_next(
+       next_status = bt_message_iterator_next(
                fs_sink->upstream_iter, &msgs, &msg_count);
-       if (it_status < 0) {
-               status = BT_SELF_COMPONENT_STATUS_ERROR;
+       if (next_status < 0) {
+               status = (int) next_status;
                goto end;
        }
 
-       switch (it_status) {
-       case BT_MESSAGE_ITERATOR_STATUS_OK:
+       switch (next_status) {
+       case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK:
        {
                uint64_t i;
 
                for (i = 0; i < msg_count; i++) {
                        const bt_message *msg = msgs[i];
 
-                       BT_ASSERT(msg);
+                       BT_ASSERT_DBG(msg);
 
                        switch (bt_message_get_type(msg)) {
                        case BT_MESSAGE_TYPE_EVENT:
@@ -933,7 +1006,7 @@ bt_self_component_status ctf_fs_sink_consume(bt_self_component_sink *self_comp)
                                break;
                        case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
                                /* Ignore */
-                               BT_LOGD_STR("Ignoring message iterator inactivity message.");
+                               BT_COMP_LOGD_STR("Ignoring message iterator inactivity message.");
                                break;
                        case BT_MESSAGE_TYPE_STREAM_BEGINNING:
                                status = handle_stream_beginning_msg(
@@ -943,11 +1016,6 @@ bt_self_component_status ctf_fs_sink_consume(bt_self_component_sink *self_comp)
                                status = handle_stream_end_msg(
                                        fs_sink, msg);
                                break;
-                       case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
-                       case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
-                               /* Not supported by CTF 1.8 */
-                               BT_LOGD_STR("Ignoring stream activity message.");
-                               break;
                        case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
                                status = handle_discarded_events_msg(
                                        fs_sink, msg);
@@ -957,13 +1025,13 @@ bt_self_component_status ctf_fs_sink_consume(bt_self_component_sink *self_comp)
                                        fs_sink, msg);
                                break;
                        default:
-                               abort();
+                               bt_common_abort();
                        }
 
                        BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
 
-                       if (status != BT_SELF_COMPONENT_STATUS_OK) {
-                               BT_LOGE("Failed to handle message: "
+                       if (status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK) {
+                               BT_COMP_LOGE("Failed to handle message: "
                                        "generated CTF traces could be incomplete: "
                                        "output-dir-path=\"%s\"",
                                        fs_sink->output_dir_path->str);
@@ -973,18 +1041,18 @@ bt_self_component_status ctf_fs_sink_consume(bt_self_component_sink *self_comp)
 
                break;
        }
-       case BT_MESSAGE_ITERATOR_STATUS_AGAIN:
-               status = BT_SELF_COMPONENT_STATUS_AGAIN;
+       case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN:
+               status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN;
                break;
-       case BT_MESSAGE_ITERATOR_STATUS_END:
+       case BT_MESSAGE_ITERATOR_NEXT_STATUS_END:
                /* TODO: Finalize all traces (should already be done?) */
-               status = BT_SELF_COMPONENT_STATUS_END;
+               status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END;
                break;
-       case BT_MESSAGE_ITERATOR_STATUS_NOMEM:
-               status = BT_SELF_COMPONENT_STATUS_NOMEM;
+       case BT_MESSAGE_ITERATOR_NEXT_STATUS_MEMORY_ERROR:
+               status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_MEMORY_ERROR;
                break;
-       case BT_MESSAGE_ITERATOR_STATUS_ERROR:
-               status = BT_SELF_COMPONENT_STATUS_NOMEM;
+       case BT_MESSAGE_ITERATOR_NEXT_STATUS_ERROR:
+               status = BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_MEMORY_ERROR;
                break;
        default:
                break;
@@ -993,7 +1061,7 @@ bt_self_component_status ctf_fs_sink_consume(bt_self_component_sink *self_comp)
        goto end;
 
 error:
-       BT_ASSERT(status != BT_SELF_COMPONENT_STATUS_OK);
+       BT_ASSERT(status != BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK);
        put_messages(msgs, msg_count);
 
 end:
@@ -1001,22 +1069,27 @@ end:
 }
 
 BT_HIDDEN
-bt_self_component_status ctf_fs_sink_graph_is_configured(
+bt_component_class_sink_graph_is_configured_method_status
+ctf_fs_sink_graph_is_configured(
                bt_self_component_sink *self_comp)
 {
-       bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
+       bt_component_class_sink_graph_is_configured_method_status status;
+       bt_message_iterator_create_from_sink_component_status
+               msg_iter_status;
        struct fs_sink_comp *fs_sink = bt_self_component_get_data(
                        bt_self_component_sink_as_self_component(self_comp));
 
-       fs_sink->upstream_iter =
-               bt_self_component_port_input_message_iterator_create(
+       msg_iter_status =
+               bt_message_iterator_create_from_sink_component(
+                       self_comp,
                        bt_self_component_sink_borrow_input_port_by_name(
-                               self_comp, in_port_name));
-       if (!fs_sink->upstream_iter) {
-               status = BT_SELF_COMPONENT_STATUS_NOMEM;
+                               self_comp, in_port_name), &fs_sink->upstream_iter);
+       if (msg_iter_status != BT_MESSAGE_ITERATOR_CREATE_FROM_SINK_COMPONENT_STATUS_OK) {
+               status = (int) msg_iter_status;
                goto end;
        }
 
+       status = BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK;
 end:
        return status;
 }
This page took 0.038675 seconds and 4 git commands to generate.