X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fplugins%2Futils%2Fmuxer%2Fmuxer.c;h=fa049cf0dd939b65272dc1d02dc1ab1f82bfdb42;hb=30799132cd92de929a90ae6e366bfe5032cfd241;hp=9510d07a5e45a865d33f7855fbdc02534f384268;hpb=3625612b8d1f68582f54f4113e375fdf5c66fc09;p=babeltrace.git diff --git a/src/plugins/utils/muxer/muxer.c b/src/plugins/utils/muxer/muxer.c index 9510d07a..fa049cf0 100644 --- a/src/plugins/utils/muxer/muxer.c +++ b/src/plugins/utils/muxer/muxer.c @@ -1,29 +1,13 @@ /* - * Copyright 2017 Philippe Proulx - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. + * SPDX-License-Identifier: MIT * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. + * Copyright 2017 Philippe Proulx */ #define BT_COMP_LOG_SELF_COMP (muxer_comp->self_comp) #define BT_LOG_OUTPUT_LEVEL (muxer_comp->log_level) #define BT_LOG_TAG "PLUGIN/FLT.UTILS.MUXER" -#include "plugins/comp-logging.h" +#include "logging/comp-logging.h" #include "common/macros.h" #include "common/uuid.h" @@ -36,9 +20,10 @@ #include #include -#include "muxer.h" +#include "plugins/common/muxing/muxing.h" +#include "plugins/common/param-validation/param-validation.h" -#define ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME "assume-absolute-clock-classes" +#include "muxer.h" struct muxer_comp { /* Weak refs */ @@ -48,7 +33,6 @@ struct muxer_comp { unsigned int next_port_num; size_t available_input_ports; bool initializing_muxer_msg_iter; - bool assume_absolute_clock_classes; bt_logging_level log_level; }; @@ -56,10 +40,13 @@ struct muxer_upstream_msg_iter { struct muxer_comp *muxer_comp; /* Owned by this, NULL if ended */ - bt_self_component_port_input_message_iterator *msg_iter; + bt_message_iterator *msg_iter; /* Contains `const bt_message *`, owned by this */ - GQueue *msgs; + GPtrArray *msgs; + + /* Index of the next message in `msgs` to return */ + guint next_msg; }; enum muxer_msg_iter_clock_class_expectation { @@ -73,6 +60,9 @@ enum muxer_msg_iter_clock_class_expectation { struct muxer_msg_iter { struct muxer_comp *muxer_comp; + /* Weak */ + bt_self_message_iterator *self_msg_iter; + /* * Array of struct muxer_upstream_msg_iter * (owned by this). * @@ -104,16 +94,20 @@ struct muxer_msg_iter { * MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID. */ bt_uuid_t expected_clock_class_uuid; + + /* + * Saved error. If we hit an error in the _next method, but have some + * messages ready to return, we save the error here and return it on + * the next _next call. + */ + bt_message_iterator_class_next_method_status next_saved_status; + const struct bt_error *next_saved_error; }; static void empty_message_queue(struct muxer_upstream_msg_iter *upstream_msg_iter) { - const bt_message *msg; - - while ((msg = g_queue_pop_head(upstream_msg_iter->msgs))) { - bt_message_put_ref(msg); - } + g_ptr_array_set_size(upstream_msg_iter->msgs, 0); } static @@ -128,16 +122,17 @@ void destroy_muxer_upstream_msg_iter( muxer_comp = muxer_upstream_msg_iter->muxer_comp; BT_COMP_LOGD("Destroying muxer's upstream message iterator wrapper: " - "addr=%p, msg-iter-addr=%p, queue-len=%u", + "addr=%p, msg-iter-addr=%p, queue-len=%u, next-msg=%u", muxer_upstream_msg_iter, muxer_upstream_msg_iter->msg_iter, - muxer_upstream_msg_iter->msgs->length); - bt_self_component_port_input_message_iterator_put_ref( + muxer_upstream_msg_iter->msgs->len, + muxer_upstream_msg_iter->next_msg); + + bt_message_iterator_put_ref( muxer_upstream_msg_iter->msg_iter); if (muxer_upstream_msg_iter->msgs) { - empty_message_queue(muxer_upstream_msg_iter); - g_queue_free(muxer_upstream_msg_iter->msgs); + g_ptr_array_free(muxer_upstream_msg_iter->msgs, TRUE); } g_free(muxer_upstream_msg_iter); @@ -145,7 +140,7 @@ void destroy_muxer_upstream_msg_iter( static int muxer_msg_iter_add_upstream_msg_iter(struct muxer_msg_iter *muxer_msg_iter, - bt_self_component_port_input_message_iterator *self_msg_iter) + bt_message_iterator *self_msg_iter) { int ret = 0; struct muxer_upstream_msg_iter *muxer_upstream_msg_iter = @@ -153,16 +148,19 @@ int muxer_msg_iter_add_upstream_msg_iter(struct muxer_msg_iter *muxer_msg_iter, struct muxer_comp *muxer_comp = muxer_msg_iter->muxer_comp; if (!muxer_upstream_msg_iter) { - BT_COMP_LOGE_STR("Failed to allocate one muxer's upstream message iterator wrapper."); + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Failed to allocate one muxer's upstream message iterator wrapper."); goto error; } muxer_upstream_msg_iter->muxer_comp = muxer_comp; muxer_upstream_msg_iter->msg_iter = self_msg_iter; - bt_self_component_port_input_message_iterator_get_ref(muxer_upstream_msg_iter->msg_iter); - muxer_upstream_msg_iter->msgs = g_queue_new(); + bt_message_iterator_get_ref(muxer_upstream_msg_iter->msg_iter); + muxer_upstream_msg_iter->msgs = + g_ptr_array_new_with_free_func((GDestroyNotify) bt_message_put_ref); if (!muxer_upstream_msg_iter->msgs) { - BT_COMP_LOGE_STR("Failed to allocate a GQueue."); + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Failed to allocate a GPtrArray."); goto error; } @@ -176,7 +174,7 @@ int muxer_msg_iter_add_upstream_msg_iter(struct muxer_msg_iter *muxer_msg_iter, goto end; error: - g_free(muxer_upstream_msg_iter); + destroy_muxer_upstream_msg_iter(muxer_upstream_msg_iter); ret = -1; end: @@ -196,7 +194,7 @@ bt_self_component_add_port_status add_available_input_port( BT_ASSERT(muxer_comp); port_name = g_string_new("in"); if (!port_name) { - BT_COMP_LOGE_STR("Failed to allocate a GString."); + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, "Failed to allocate a GString."); status = BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR; goto end; } @@ -205,7 +203,8 @@ bt_self_component_add_port_status add_available_input_port( status = bt_self_component_filter_add_input_port( self_comp, port_name->str, NULL, NULL); if (status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) { - BT_COMP_LOGE("Cannot add input port to muxer component: " + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Cannot add input port to muxer component: " "port-name=\"%s\", comp-addr=%p, status=%s", port_name->str, self_comp, bt_common_func_status_string(status)); @@ -245,153 +244,76 @@ void destroy_muxer_comp(struct muxer_comp *muxer_comp) } static -bt_value *get_default_params(struct muxer_comp *muxer_comp) -{ - bt_value *params; - int ret; - - params = bt_value_map_create(); - if (!params) { - BT_COMP_LOGE_STR("Cannot create a map value object."); - goto error; - } - - ret = bt_value_map_insert_bool_entry(params, - ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME, false); - if (ret) { - BT_COMP_LOGE_STR("Cannot add boolean value to map value object."); - goto error; - } - - goto end; - -error: - BT_VALUE_PUT_REF_AND_RESET(params); - -end: - return params; -} - -static -int configure_muxer_comp(struct muxer_comp *muxer_comp, - const bt_value *params) -{ - bt_value *default_params = NULL; - bt_value *real_params = NULL; - const bt_value *assume_absolute_clock_classes = NULL; - int ret = 0; - bt_bool bool_val; - - default_params = get_default_params(muxer_comp); - if (!default_params) { - BT_COMP_LOGE("Cannot get default parameters: " - "muxer-comp-addr=%p", muxer_comp); - goto error; - } - - ret = bt_value_map_extend(default_params, params, &real_params); - if (ret) { - BT_COMP_LOGE("Cannot extend default parameters map value: " - "muxer-comp-addr=%p, def-params-addr=%p, " - "params-addr=%p", muxer_comp, default_params, - params); - goto error; - } - - assume_absolute_clock_classes = bt_value_map_borrow_entry_value(real_params, - ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME); - if (assume_absolute_clock_classes && - !bt_value_is_bool(assume_absolute_clock_classes)) { - BT_COMP_LOGE("Expecting a boolean value for the `%s` parameter: " - "muxer-comp-addr=%p, value-type=%s", - ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME, muxer_comp, - bt_common_value_type_string( - bt_value_get_type(assume_absolute_clock_classes))); - goto error; - } - - bool_val = bt_value_bool_get(assume_absolute_clock_classes); - muxer_comp->assume_absolute_clock_classes = (bool) bool_val; - BT_COMP_LOGI("Configured muxer component: muxer-comp-addr=%p, " - "assume-absolute-clock-classes=%d", - muxer_comp, muxer_comp->assume_absolute_clock_classes); - goto end; - -error: - ret = -1; - -end: - bt_value_put_ref(default_params); - bt_value_put_ref(real_params); - return ret; -} +struct bt_param_validation_map_value_entry_descr muxer_params[] = { + BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END +}; BT_HIDDEN -bt_component_class_init_method_status muxer_init( +bt_component_class_initialize_method_status muxer_init( bt_self_component_filter *self_comp_flt, + bt_self_component_filter_configuration *config, const bt_value *params, void *init_data) { - int ret; - bt_component_class_init_method_status status = - BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK; + bt_component_class_initialize_method_status status; bt_self_component_add_port_status add_port_status; bt_self_component *self_comp = bt_self_component_filter_as_self_component(self_comp_flt); struct muxer_comp *muxer_comp = g_new0(struct muxer_comp, 1); bt_logging_level log_level = bt_component_get_logging_level( bt_self_component_as_component(self_comp)); + enum bt_param_validation_status validation_status; + gchar *validate_error = NULL; BT_COMP_LOG_CUR_LVL(BT_LOG_INFO, log_level, self_comp, "Initializing muxer component: " "comp-addr=%p, params-addr=%p", self_comp, params); if (!muxer_comp) { + /* + * Don't use BT_COMP_LOGE_APPEND_CAUSE, as `muxer_comp` is not + * initialized. + */ BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR, log_level, self_comp, "Failed to allocate one muxer component."); + BT_CURRENT_THREAD_ERROR_APPEND_CAUSE_FROM_COMPONENT(self_comp, + "Failed to allocate one muxer component."); + status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; goto error; } muxer_comp->log_level = log_level; muxer_comp->self_comp = self_comp; muxer_comp->self_comp_flt = self_comp_flt; - ret = configure_muxer_comp(muxer_comp, params); - if (ret) { - BT_COMP_LOGE("Cannot configure muxer component: " - "muxer-comp-addr=%p, params-addr=%p", - muxer_comp, params); + + validation_status = bt_param_validation_validate(params, + muxer_params, &validate_error); + if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) { + status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; + goto error; + } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) { + status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, "%s", validate_error); goto error; } bt_self_component_set_data(self_comp, muxer_comp); add_port_status = add_available_input_port(self_comp_flt); if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) { - BT_COMP_LOGE("Cannot ensure that at least one muxer component's input port is available: " + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Cannot ensure that at least one muxer component's input port is available: " "muxer-comp-addr=%p, status=%s", - muxer_comp, - bt_common_func_status_string(add_port_status)); - if (add_port_status == - BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR) { - status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR; - } else { - status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR; - } - + muxer_comp, bt_common_func_status_string(add_port_status)); + status = (int) add_port_status; goto error; } add_port_status = create_output_port(self_comp_flt); if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) { - BT_COMP_LOGE("Cannot create muxer component's output port: " + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Cannot create muxer component's output port: " "muxer-comp-addr=%p, status=%s", - muxer_comp, - bt_common_func_status_string(add_port_status)); - if (add_port_status == - BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR) { - status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR; - } else { - status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR; - } - + muxer_comp, bt_common_func_status_string(add_port_status)); + status = (int) add_port_status; goto error; } @@ -399,17 +321,15 @@ bt_component_class_init_method_status muxer_init( "comp-addr=%p, params-addr=%p, muxer-comp-addr=%p", self_comp, params, muxer_comp); + status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK; goto end; error: destroy_muxer_comp(muxer_comp); bt_self_component_set_data(self_comp, NULL); - if (status == BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK) { - status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR; - } - end: + g_free(validate_error); return status; } @@ -425,15 +345,17 @@ void muxer_finalize(bt_self_component_filter *self_comp) } static -bt_self_component_port_input_message_iterator * +bt_message_iterator_create_from_message_iterator_status create_msg_iter_on_input_port(struct muxer_comp *muxer_comp, - bt_self_component_port_input *self_port) + struct muxer_msg_iter *muxer_msg_iter, + bt_self_component_port_input *self_port, + bt_message_iterator **msg_iter) { const bt_port *port = bt_self_component_port_as_port( bt_self_component_port_input_as_self_component_port( self_port)); - bt_self_component_port_input_message_iterator *msg_iter = - NULL; + bt_message_iterator_create_from_message_iterator_status + status; BT_ASSERT(port); BT_ASSERT(bt_port_is_connected(port)); @@ -441,10 +363,11 @@ create_msg_iter_on_input_port(struct muxer_comp *muxer_comp, // TODO: Advance the iterator to >= the time of the latest // returned message by the muxer message // iterator which creates it. - msg_iter = bt_self_component_port_input_message_iterator_create( - self_port); - if (!msg_iter) { - BT_COMP_LOGE("Cannot create upstream message iterator on input port: " + status = bt_message_iterator_create_from_message_iterator( + muxer_msg_iter->self_msg_iter, self_port, msg_iter); + if (status != BT_MESSAGE_ITERATOR_CREATE_FROM_MESSAGE_ITERATOR_STATUS_OK) { + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Cannot create upstream message iterator on input port: " "port-addr=%p, port-name=\"%s\"", port, bt_port_get_name(port)); goto end; @@ -455,17 +378,16 @@ create_msg_iter_on_input_port(struct muxer_comp *muxer_comp, port, bt_port_get_name(port), msg_iter); end: - return msg_iter; + return status; } static -bt_component_class_message_iterator_next_method_status muxer_upstream_msg_iter_next( +bt_message_iterator_class_next_method_status muxer_upstream_msg_iter_next( struct muxer_upstream_msg_iter *muxer_upstream_msg_iter, bool *is_ended) { - struct muxer_comp *muxer_comp = - muxer_upstream_msg_iter->muxer_comp; - bt_component_class_message_iterator_next_method_status status; + struct muxer_comp *muxer_comp = muxer_upstream_msg_iter->muxer_comp; + bt_message_iterator_class_next_method_status status; bt_message_iterator_next_status input_port_iter_status; bt_message_array_const msgs; uint64_t i; @@ -475,7 +397,7 @@ bt_component_class_message_iterator_next_method_status muxer_upstream_msg_iter_n "muxer-upstream-msg-iter-wrap-addr=%p, msg-iter-addr=%p", muxer_upstream_msg_iter, muxer_upstream_msg_iter->msg_iter); - input_port_iter_status = bt_self_component_port_input_message_iterator_next( + input_port_iter_status = bt_message_iterator_next( muxer_upstream_msg_iter->msg_iter, &msgs, &count); BT_COMP_LOGD("Upstream message iterator's \"next\" method returned: " "status=%s", @@ -488,7 +410,10 @@ bt_component_class_message_iterator_next_method_status muxer_upstream_msg_iter_n * valid: it must be considered for muxing operations. */ BT_COMP_LOGD_STR("Validated upstream message iterator wrapper."); - BT_ASSERT(count > 0); + BT_ASSERT_DBG(count > 0); + + g_ptr_array_set_size(muxer_upstream_msg_iter->msgs, count); + muxer_upstream_msg_iter->next_msg = 0; /* Move messages to our queue */ for (i = 0; i < count; i++) { @@ -497,10 +422,10 @@ bt_component_class_message_iterator_next_method_status muxer_upstream_msg_iter_n * (muxer_msg_iter_do_next_one()) consumes * from the head first. */ - g_queue_push_tail(muxer_upstream_msg_iter->msgs, - (void *) msgs[i]); + g_ptr_array_index(muxer_upstream_msg_iter->msgs, i) + = (gpointer *) msgs[i]; } - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; break; case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN: /* @@ -508,7 +433,7 @@ bt_component_class_message_iterator_next_method_status muxer_upstream_msg_iter_n * valid anymore. Return * BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN immediately. */ - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_AGAIN; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN; break; case BT_MESSAGE_ITERATOR_NEXT_STATUS_END: /* Fall-through. */ /* @@ -517,13 +442,22 @@ bt_component_class_message_iterator_next_method_status muxer_upstream_msg_iter_n * message. */ *is_ended = true; - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; + break; + case BT_MESSAGE_ITERATOR_NEXT_STATUS_ERROR: + case BT_MESSAGE_ITERATOR_NEXT_STATUS_MEMORY_ERROR: + /* Error status code */ + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Upstream iterator's next method returned an error: status=%s", + bt_common_func_status_string(input_port_iter_status)); + status = (int) input_port_iter_status; break; default: - /* Error or unsupported status code */ - BT_COMP_LOGE("Error or unsupported status code: " - "status-code=%d", input_port_iter_status); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; + /* Unsupported status code */ + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Unsupported status code: status=%s", + bt_common_func_status_string(input_port_iter_status)); + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; break; } @@ -541,8 +475,8 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, const bt_stream_class *stream_class = NULL; bt_message_type msg_type; - BT_ASSERT(msg); - BT_ASSERT(ts_ns); + BT_ASSERT_DBG(msg); + BT_ASSERT_DBG(ts_ns); BT_COMP_LOGD("Getting message's timestamp: " "muxer-msg-iter-addr=%p, msg-addr=%p, " "last-returned-ts=%" PRId64, @@ -576,7 +510,7 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, switch (msg_type) { case BT_MESSAGE_TYPE_EVENT: - BT_ASSERT(bt_message_event_borrow_stream_class_default_clock_class_const( + BT_ASSERT_DBG(bt_message_event_borrow_stream_class_default_clock_class_const( msg)); clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const( msg); @@ -644,7 +578,7 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, break; case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY: - clock_snapshot = bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const( + clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const( msg); break; default: @@ -656,7 +590,8 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns); if (ret) { - BT_COMP_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: " + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Cannot get nanoseconds from Epoch of clock snapshot: " "clock-snapshot-addr=%p", clock_snapshot); goto error; } @@ -693,18 +628,16 @@ int validate_clock_class(struct muxer_msg_iter *muxer_msg_iter, const uint8_t *cc_uuid; const char *cc_name; - BT_ASSERT(clock_class); + BT_ASSERT_DBG(clock_class); cc_uuid = bt_clock_class_get_uuid(clock_class); cc_name = bt_clock_class_get_name(clock_class); if (muxer_msg_iter->clock_class_expectation == MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY) { /* - * This is the first clock class that this muxer - * message iterator encounters. Its properties - * determine what to expect for the whole lifetime of - * the iterator without a true - * `assume-absolute-clock-classes` parameter. + * This is the first clock class that this muxer message + * iterator encounters. Its properties determine what to expect + * for the whole lifetime of the iterator. */ if (bt_clock_class_origin_is_unix_epoch(clock_class)) { /* Expect absolute clock classes */ @@ -730,76 +663,81 @@ int validate_clock_class(struct muxer_msg_iter *muxer_msg_iter, } } - if (!muxer_comp->assume_absolute_clock_classes) { - switch (muxer_msg_iter->clock_class_expectation) { - case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE: - if (!bt_clock_class_origin_is_unix_epoch(clock_class)) { - BT_COMP_LOGE("Expecting an absolute clock class, " - "but got a non-absolute one: " - "clock-class-addr=%p, clock-class-name=\"%s\"", - clock_class, cc_name); - goto error; - } - break; - case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID: - if (bt_clock_class_origin_is_unix_epoch(clock_class)) { - BT_COMP_LOGE("Expecting a non-absolute clock class with no UUID, " - "but got an absolute one: " - "clock-class-addr=%p, clock-class-name=\"%s\"", - clock_class, cc_name); - goto error; - } - - if (cc_uuid) { - BT_COMP_LOGE("Expecting a non-absolute clock class with no UUID, " - "but got one with a UUID: " - "clock-class-addr=%p, clock-class-name=\"%s\", " - "uuid=\"" BT_UUID_FMT "\"", - clock_class, cc_name, BT_UUID_FMT_VALUES(cc_uuid)); - goto error; - } - break; - case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID: - if (bt_clock_class_origin_is_unix_epoch(clock_class)) { - BT_COMP_LOGE("Expecting a non-absolute clock class with a specific UUID, " - "but got an absolute one: " - "clock-class-addr=%p, clock-class-name=\"%s\"", - clock_class, cc_name); - goto error; - } + switch (muxer_msg_iter->clock_class_expectation) { + case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE: + if (!bt_clock_class_origin_is_unix_epoch(clock_class)) { + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Expecting an absolute clock class, " + "but got a non-absolute one: " + "clock-class-addr=%p, clock-class-name=\"%s\"", + clock_class, cc_name); + goto error; + } + break; + case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID: + if (bt_clock_class_origin_is_unix_epoch(clock_class)) { + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Expecting a non-absolute clock class with no UUID, " + "but got an absolute one: " + "clock-class-addr=%p, clock-class-name=\"%s\"", + clock_class, cc_name); + goto error; + } - if (!cc_uuid) { - BT_COMP_LOGE("Expecting a non-absolute clock class with a specific UUID, " - "but got one with no UUID: " - "clock-class-addr=%p, clock-class-name=\"%s\"", - clock_class, cc_name); - goto error; - } + if (cc_uuid) { + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Expecting a non-absolute clock class with no UUID, " + "but got one with a UUID: " + "clock-class-addr=%p, clock-class-name=\"%s\", " + "uuid=\"" BT_UUID_FMT "\"", + clock_class, cc_name, BT_UUID_FMT_VALUES(cc_uuid)); + goto error; + } + break; + case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID: + if (bt_clock_class_origin_is_unix_epoch(clock_class)) { + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Expecting a non-absolute clock class with a specific UUID, " + "but got an absolute one: " + "clock-class-addr=%p, clock-class-name=\"%s\"", + clock_class, cc_name); + goto error; + } - if (bt_uuid_compare(muxer_msg_iter->expected_clock_class_uuid, cc_uuid) != 0) { - BT_COMP_LOGE("Expecting a non-absolute clock class with a specific UUID, " - "but got one with different UUID: " - "clock-class-addr=%p, clock-class-name=\"%s\", " - "expected-uuid=\"" BT_UUID_FMT "\", " - "uuid=\"" BT_UUID_FMT "\"", - clock_class, cc_name, - BT_UUID_FMT_VALUES(muxer_msg_iter->expected_clock_class_uuid), - BT_UUID_FMT_VALUES(cc_uuid)); - goto error; - } - break; - case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE: - BT_COMP_LOGE("Expecting no clock class, but got one: " + if (!cc_uuid) { + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Expecting a non-absolute clock class with a specific UUID, " + "but got one with no UUID: " "clock-class-addr=%p, clock-class-name=\"%s\"", clock_class, cc_name); goto error; - default: - /* Unexpected */ - BT_COMP_LOGF("Unexpected clock class expectation: " - "expectation-code=%d", - muxer_msg_iter->clock_class_expectation); - abort(); } + + if (bt_uuid_compare(muxer_msg_iter->expected_clock_class_uuid, cc_uuid) != 0) { + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Expecting a non-absolute clock class with a specific UUID, " + "but got one with different UUID: " + "clock-class-addr=%p, clock-class-name=\"%s\", " + "expected-uuid=\"" BT_UUID_FMT "\", " + "uuid=\"" BT_UUID_FMT "\"", + clock_class, cc_name, + BT_UUID_FMT_VALUES(muxer_msg_iter->expected_clock_class_uuid), + BT_UUID_FMT_VALUES(cc_uuid)); + goto error; + } + break; + case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE: + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Expecting no clock class, but got one: " + "clock-class-addr=%p, clock-class-name=\"%s\"", + clock_class, cc_name); + goto error; + default: + /* Unexpected */ + BT_COMP_LOGF("Unexpected clock class expectation: " + "expectation-code=%d", + muxer_msg_iter->clock_class_expectation); + bt_common_abort(); } goto end; @@ -827,8 +765,10 @@ int validate_new_stream_clock_class(struct muxer_msg_iter *muxer_msg_iter, /* Expect no clock class */ muxer_msg_iter->clock_class_expectation = MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE; - } else { - BT_COMP_LOGE("Expecting stream class with a default clock class: " + } else if (muxer_msg_iter->clock_class_expectation != + MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE) { + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Expecting stream class without a default clock class: " "stream-class-addr=%p, stream-class-name=\"%s\", " "stream-class-id=%" PRIu64, stream_class, bt_stream_class_get_name(stream_class), @@ -862,7 +802,7 @@ end: * the youngest, and sets *ts_ns to its time. */ static -bt_component_class_message_iterator_next_method_status +bt_message_iterator_class_next_method_status muxer_msg_iter_youngest_upstream_msg_iter( struct muxer_comp *muxer_comp, struct muxer_msg_iter *muxer_msg_iter, @@ -872,12 +812,12 @@ muxer_msg_iter_youngest_upstream_msg_iter( size_t i; int ret; int64_t youngest_ts_ns = INT64_MAX; - bt_component_class_message_iterator_next_method_status status = - BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + bt_message_iterator_class_next_method_status status = + BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; - BT_ASSERT(muxer_comp); - BT_ASSERT(muxer_msg_iter); - BT_ASSERT(muxer_upstream_msg_iter); + BT_ASSERT_DBG(muxer_comp); + BT_ASSERT_DBG(muxer_msg_iter); + BT_ASSERT_DBG(muxer_upstream_msg_iter); *muxer_upstream_msg_iter = NULL; for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len; @@ -897,9 +837,11 @@ muxer_msg_iter_youngest_upstream_msg_iter( continue; } - BT_ASSERT(cur_muxer_upstream_msg_iter->msgs->length > 0); - msg = g_queue_peek_head(cur_muxer_upstream_msg_iter->msgs); - BT_ASSERT(msg); + BT_ASSERT_DBG(cur_muxer_upstream_msg_iter->next_msg < + cur_muxer_upstream_msg_iter->msgs->len); + msg = g_ptr_array_index(cur_muxer_upstream_msg_iter->msgs, + cur_muxer_upstream_msg_iter->next_msg); + BT_ASSERT_DBG(msg); if (G_UNLIKELY(bt_message_get_type(msg) == BT_MESSAGE_TYPE_STREAM_BEGINNING)) { @@ -912,20 +854,20 @@ muxer_msg_iter_youngest_upstream_msg_iter( * validate_new_stream_clock_class() logs * errors. */ - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; goto end; } } else if (G_UNLIKELY(bt_message_get_type(msg) == BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY)) { const bt_clock_snapshot *cs; - cs = bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const( + cs = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const( msg); ret = validate_clock_class(muxer_msg_iter, muxer_comp, bt_clock_snapshot_borrow_clock_class_const(cs)); if (ret) { /* validate_clock_class() logs errors */ - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; goto end; } } @@ -935,20 +877,62 @@ muxer_msg_iter_youngest_upstream_msg_iter( if (ret) { /* get_msg_ts_ns() logs errors */ *muxer_upstream_msg_iter = NULL; - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; goto end; } - if (msg_ts_ns <= youngest_ts_ns) { + /* + * Update the current message iterator if it has not been set + * yet, or if its current message has a timestamp smaller than + * the previously selected youngest message. + */ + if (G_UNLIKELY(*muxer_upstream_msg_iter == NULL) || + msg_ts_ns < youngest_ts_ns) { *muxer_upstream_msg_iter = cur_muxer_upstream_msg_iter; youngest_ts_ns = msg_ts_ns; *ts_ns = youngest_ts_ns; + } else if (msg_ts_ns == youngest_ts_ns) { + /* + * The currently selected message to be sent downstream + * next has the exact same timestamp that of the + * current candidate message. We must break the tie + * in a predictable manner. + */ + BT_ASSERT_DBG((*muxer_upstream_msg_iter)->next_msg < + (*muxer_upstream_msg_iter)->msgs->len); + const bt_message *selected_msg = + g_ptr_array_index((*muxer_upstream_msg_iter)->msgs, + (*muxer_upstream_msg_iter)->next_msg); + BT_COMP_LOGD_STR("Two of the next message candidates have the same timestamps, pick one deterministically."); + + /* + * Order the messages in an arbitrary but determinitic + * way. + */ + ret = common_muxing_compare_messages(msg, selected_msg); + if (ret < 0) { + /* + * The `msg` should go first. Update the next + * iterator and the current timestamp. + */ + *muxer_upstream_msg_iter = + cur_muxer_upstream_msg_iter; + youngest_ts_ns = msg_ts_ns; + *ts_ns = youngest_ts_ns; + } else if (ret == 0) { + /* Unable to pick which one should go first. */ + BT_COMP_LOGW("Cannot deterministically pick next upstream message iterator because they have identical next messages: " + "muxer-upstream-msg-iter-wrap-addr=%p" + "cur-muxer-upstream-msg-iter-wrap-addr=%p", + *muxer_upstream_msg_iter, + cur_muxer_upstream_msg_iter); + } } } if (!*muxer_upstream_msg_iter) { - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END; *ts_ns = INT64_MIN; } @@ -957,26 +941,26 @@ end: } static -bt_component_class_message_iterator_next_method_status +bt_message_iterator_class_next_method_status validate_muxer_upstream_msg_iter( struct muxer_upstream_msg_iter *muxer_upstream_msg_iter, bool *is_ended) { - struct muxer_comp *muxer_comp = - muxer_upstream_msg_iter->muxer_comp; - bt_component_class_message_iterator_next_method_status status = - BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + struct muxer_comp *muxer_comp = muxer_upstream_msg_iter->muxer_comp; + bt_message_iterator_class_next_method_status status; BT_COMP_LOGD("Validating muxer's upstream message iterator wrapper: " "muxer-upstream-msg-iter-wrap-addr=%p", muxer_upstream_msg_iter); - if (muxer_upstream_msg_iter->msgs->length > 0 || + if (muxer_upstream_msg_iter->next_msg < muxer_upstream_msg_iter->msgs->len || !muxer_upstream_msg_iter->msg_iter) { BT_COMP_LOGD("Already valid or not considered: " - "queue-len=%u, upstream-msg-iter-addr=%p", - muxer_upstream_msg_iter->msgs->length, + "queue-len=%u, next-msg=%u, upstream-msg-iter-addr=%p", + muxer_upstream_msg_iter->msgs->len, + muxer_upstream_msg_iter->next_msg, muxer_upstream_msg_iter->msg_iter); + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; goto end; } @@ -989,13 +973,12 @@ end: } static -bt_component_class_message_iterator_next_method_status +bt_message_iterator_class_next_method_status validate_muxer_upstream_msg_iters( struct muxer_msg_iter *muxer_msg_iter) { struct muxer_comp *muxer_comp = muxer_msg_iter->muxer_comp; - bt_component_class_message_iterator_next_method_status status = - BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + bt_message_iterator_class_next_method_status status; size_t i; BT_COMP_LOGD("Validating muxer's upstream message iterator wrappers: " @@ -1011,9 +994,10 @@ validate_muxer_upstream_msg_iters( status = validate_muxer_upstream_msg_iter( muxer_upstream_msg_iter, &is_ended); - if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { + if (status != BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) { if (status < 0) { - BT_COMP_LOGE("Cannot validate muxer's upstream message iterator wrapper: " + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Cannot validate muxer's upstream message iterator wrapper: " "muxer-msg-iter-addr=%p, " "muxer-upstream-msg-iter-wrap-addr=%p", muxer_msg_iter, @@ -1054,22 +1038,25 @@ validate_muxer_upstream_msg_iters( } } + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; + end: return status; } static inline -bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next_one( +bt_message_iterator_class_next_method_status muxer_msg_iter_do_next_one( struct muxer_comp *muxer_comp, struct muxer_msg_iter *muxer_msg_iter, const bt_message **msg) { - bt_component_class_message_iterator_next_method_status status; + bt_message_iterator_class_next_method_status status; struct muxer_upstream_msg_iter *muxer_upstream_msg_iter = NULL; - int64_t next_return_ts; + /* Initialize to avoid -Wmaybe-uninitialized warning with gcc 4.8. */ + int64_t next_return_ts = 0; status = validate_muxer_upstream_msg_iters(muxer_msg_iter); - if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { + if (status != BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) { /* validate_muxer_upstream_msg_iters() logs details */ goto end; } @@ -1083,9 +1070,10 @@ bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next_on status = muxer_msg_iter_youngest_upstream_msg_iter(muxer_comp, muxer_msg_iter, &muxer_upstream_msg_iter, &next_return_ts); - if (status < 0 || status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END) { + if (status < 0 || status == BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END) { if (status < 0) { - BT_COMP_LOGE("Cannot find the youngest upstream message iterator wrapper: " + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Cannot find the youngest upstream message iterator wrapper: " "status=%s", bt_common_func_status_string(status)); } else { @@ -1098,12 +1086,13 @@ bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next_on } if (next_return_ts < muxer_msg_iter->last_returned_ts_ns) { - BT_COMP_LOGE("Youngest upstream message iterator wrapper's timestamp is less than muxer's message iterator's last returned timestamp: " + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Youngest upstream message iterator wrapper's timestamp is less than muxer's message iterator's last returned timestamp: " "muxer-msg-iter-addr=%p, ts=%" PRId64 ", " "last-returned-ts=%" PRId64, muxer_msg_iter, next_return_ts, muxer_msg_iter->last_returned_ts_ns); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; goto end; } @@ -1112,15 +1101,20 @@ bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next_on "muxer-upstream-msg-iter-wrap-addr=%p, " "ts=%" PRId64, muxer_msg_iter, muxer_upstream_msg_iter, next_return_ts); - BT_ASSERT(status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK); - BT_ASSERT(muxer_upstream_msg_iter); + BT_ASSERT_DBG(status == + BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK); + BT_ASSERT_DBG(muxer_upstream_msg_iter); /* * Consume from the queue's head: other side * (muxer_upstream_msg_iter_next()) writes to the tail. */ - *msg = g_queue_pop_head(muxer_upstream_msg_iter->msgs); - BT_ASSERT(*msg); + *msg = g_ptr_array_index(muxer_upstream_msg_iter->msgs, + muxer_upstream_msg_iter->next_msg); + g_ptr_array_index(muxer_upstream_msg_iter->msgs, + muxer_upstream_msg_iter->next_msg) = NULL; + ++muxer_upstream_msg_iter->next_msg; + BT_ASSERT_DBG(*msg); muxer_msg_iter->last_returned_ts_ns = next_return_ts; end: @@ -1128,23 +1122,33 @@ end: } static -bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next( +bt_message_iterator_class_next_method_status muxer_msg_iter_do_next( struct muxer_comp *muxer_comp, struct muxer_msg_iter *muxer_msg_iter, bt_message_array_const msgs, uint64_t capacity, uint64_t *count) { - bt_component_class_message_iterator_next_method_status status = - BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + bt_message_iterator_class_next_method_status status; uint64_t i = 0; - while (i < capacity && status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { + if (G_UNLIKELY(muxer_msg_iter->next_saved_error)) { + /* + * Last time we were called, we hit an error but had some + * messages to deliver, so we stashed the error here. Return + * it now. + */ + BT_CURRENT_THREAD_MOVE_ERROR_AND_RESET(muxer_msg_iter->next_saved_error); + status = muxer_msg_iter->next_saved_status; + goto end; + } + + do { status = muxer_msg_iter_do_next_one(muxer_comp, muxer_msg_iter, &msgs[i]); - if (status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { + if (status == BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) { i++; } - } + } while (i < capacity && status == BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK); if (i > 0) { /* @@ -1159,10 +1163,23 @@ bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next( * called, possibly without any accumulated * message, in which case we'll return it. */ + if (status < 0) { + /* + * Save this error for the next _next call. Assume that + * this component always appends error causes when + * returning an error status code, which will cause the + * current thread error to be non-NULL. + */ + muxer_msg_iter->next_saved_error = bt_current_thread_take_error(); + BT_ASSERT(muxer_msg_iter->next_saved_error); + muxer_msg_iter->next_saved_status = status; + } + *count = i; - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; } +end: return status; } @@ -1195,12 +1212,15 @@ void destroy_muxer_msg_iter(struct muxer_msg_iter *muxer_msg_iter) } static -int muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, - struct muxer_msg_iter *muxer_msg_iter) +bt_message_iterator_class_initialize_method_status +muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, + struct muxer_msg_iter *muxer_msg_iter, + struct bt_self_message_iterator_configuration *config) { int64_t count; int64_t i; - int ret = 0; + bt_message_iterator_class_initialize_method_status status; + bool can_seek_forward = true; count = bt_component_filter_get_input_port_count( bt_self_component_filter_as_component_filter( @@ -1209,15 +1229,19 @@ int muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, BT_COMP_LOGD("No input port to initialize for muxer component's message iterator: " "muxer-comp-addr=%p, muxer-msg-iter-addr=%p", muxer_comp, muxer_msg_iter); + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK; goto end; } for (i = 0; i < count; i++) { - bt_self_component_port_input_message_iterator *upstream_msg_iter; + bt_message_iterator *upstream_msg_iter; bt_self_component_port_input *self_port = bt_self_component_filter_borrow_input_port_by_index( muxer_comp->self_comp_flt, i); const bt_port *port; + bt_message_iterator_create_from_message_iterator_status + msg_iter_status; + int int_status; BT_ASSERT(self_port); port = bt_self_component_port_as_port( @@ -1230,43 +1254,55 @@ int muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, continue; } - upstream_msg_iter = create_msg_iter_on_input_port(muxer_comp, - self_port); - if (!upstream_msg_iter) { + msg_iter_status = create_msg_iter_on_input_port(muxer_comp, + muxer_msg_iter, self_port, &upstream_msg_iter); + if (msg_iter_status != BT_MESSAGE_ITERATOR_CREATE_FROM_MESSAGE_ITERATOR_STATUS_OK) { /* create_msg_iter_on_input_port() logs errors */ - BT_ASSERT(!upstream_msg_iter); - ret = -1; + status = (int) msg_iter_status; goto end; } - ret = muxer_msg_iter_add_upstream_msg_iter(muxer_msg_iter, + int_status = muxer_msg_iter_add_upstream_msg_iter(muxer_msg_iter, upstream_msg_iter); - bt_self_component_port_input_message_iterator_put_ref( + bt_message_iterator_put_ref( upstream_msg_iter); - if (ret) { + if (int_status) { + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR; /* muxer_msg_iter_add_upstream_msg_iter() logs errors */ goto end; } + + can_seek_forward = can_seek_forward && + bt_message_iterator_can_seek_forward( + upstream_msg_iter); } + /* + * This iterator can seek forward if all of its iterators can seek + * forward. + */ + bt_self_message_iterator_configuration_set_can_seek_forward( + config, can_seek_forward); + + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK; + end: - return ret; + return status; } BT_HIDDEN -bt_component_class_message_iterator_init_method_status muxer_msg_iter_init( +bt_message_iterator_class_initialize_method_status muxer_msg_iter_init( bt_self_message_iterator *self_msg_iter, - bt_self_component_filter *self_comp, + bt_self_message_iterator_configuration *config, bt_self_component_port_output *port) { struct muxer_comp *muxer_comp = NULL; struct muxer_msg_iter *muxer_msg_iter = NULL; - bt_component_class_message_iterator_init_method_status status = - BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_OK; - int ret; + bt_message_iterator_class_initialize_method_status status; + bt_self_component *self_comp = + bt_self_message_iterator_borrow_component(self_msg_iter); - muxer_comp = bt_self_component_get_data( - bt_self_component_filter_as_self_component(self_comp)); + muxer_comp = bt_self_component_get_data(self_comp); BT_ASSERT(muxer_comp); BT_COMP_LOGD("Initializing muxer component's message iterator: " "comp-addr=%p, muxer-comp-addr=%p, msg-iter-addr=%p", @@ -1278,26 +1314,32 @@ bt_component_class_message_iterator_init_method_status muxer_msg_iter_init( * creates a muxer message iterator while creating * another muxer message iterator (same component). */ - BT_COMP_LOGE("Recursive initialization of muxer component's message iterator: " + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Recursive initialization of muxer component's message iterator: " "comp-addr=%p, muxer-comp-addr=%p, msg-iter-addr=%p", self_comp, muxer_comp, self_msg_iter); + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR; goto error; } muxer_comp->initializing_muxer_msg_iter = true; muxer_msg_iter = g_new0(struct muxer_msg_iter, 1); if (!muxer_msg_iter) { - BT_COMP_LOGE_STR("Failed to allocate one muxer component's message iterator."); + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Failed to allocate one muxer component's message iterator."); + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; goto error; } muxer_msg_iter->muxer_comp = muxer_comp; + muxer_msg_iter->self_msg_iter = self_msg_iter; muxer_msg_iter->last_returned_ts_ns = INT64_MIN; muxer_msg_iter->active_muxer_upstream_msg_iters = g_ptr_array_new_with_free_func( (GDestroyNotify) destroy_muxer_upstream_msg_iter); if (!muxer_msg_iter->active_muxer_upstream_msg_iters) { - BT_COMP_LOGE_STR("Failed to allocate a GPtrArray."); + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, "Failed to allocate a GPtrArray."); + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; goto error; } @@ -1305,18 +1347,20 @@ bt_component_class_message_iterator_init_method_status muxer_msg_iter_init( g_ptr_array_new_with_free_func( (GDestroyNotify) destroy_muxer_upstream_msg_iter); if (!muxer_msg_iter->ended_muxer_upstream_msg_iters) { - BT_COMP_LOGE_STR("Failed to allocate a GPtrArray."); + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, "Failed to allocate a GPtrArray."); + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; goto error; } - ret = muxer_msg_iter_init_upstream_iterators(muxer_comp, - muxer_msg_iter); - if (ret) { - BT_COMP_LOGE("Cannot initialize connected input ports for muxer component's message iterator: " + status = muxer_msg_iter_init_upstream_iterators(muxer_comp, + muxer_msg_iter, config); + if (status) { + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Cannot initialize connected input ports for muxer component's message iterator: " "comp-addr=%p, muxer-comp-addr=%p, " "muxer-msg-iter-addr=%p, msg-iter-addr=%p, ret=%d", self_comp, muxer_comp, muxer_msg_iter, - self_msg_iter, ret); + self_msg_iter, status); goto error; } @@ -1330,7 +1374,6 @@ bt_component_class_message_iterator_init_method_status muxer_msg_iter_init( error: destroy_muxer_msg_iter(muxer_msg_iter); bt_self_message_iterator_set_data(self_msg_iter, NULL); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_ERROR; end: muxer_comp->initializing_muxer_msg_iter = false; @@ -1360,23 +1403,23 @@ void muxer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter) } BT_HIDDEN -bt_component_class_message_iterator_next_method_status muxer_msg_iter_next( +bt_message_iterator_class_next_method_status muxer_msg_iter_next( bt_self_message_iterator *self_msg_iter, bt_message_array_const msgs, uint64_t capacity, uint64_t *count) { - bt_component_class_message_iterator_next_method_status status; + bt_message_iterator_class_next_method_status status; struct muxer_msg_iter *muxer_msg_iter = bt_self_message_iterator_get_data(self_msg_iter); bt_self_component *self_comp = NULL; struct muxer_comp *muxer_comp = NULL; - BT_ASSERT(muxer_msg_iter); + BT_ASSERT_DBG(muxer_msg_iter); self_comp = bt_self_message_iterator_borrow_component( self_msg_iter); - BT_ASSERT(self_comp); + BT_ASSERT_DBG(self_comp); muxer_comp = bt_self_component_get_data(self_comp); - BT_ASSERT(muxer_comp); + BT_ASSERT_DBG(muxer_comp); BT_COMP_LOGT("Muxer component's message iterator's \"next\" method called: " "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, " "msg-iter-addr=%p", @@ -1385,7 +1428,8 @@ bt_component_class_message_iterator_next_method_status muxer_msg_iter_next( status = muxer_msg_iter_do_next(muxer_comp, muxer_msg_iter, msgs, capacity, count); if (status < 0) { - BT_COMP_LOGE("Cannot get next message: " + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Cannot get next message: " "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, " "msg-iter-addr=%p, status=%s", self_comp, muxer_comp, muxer_msg_iter, self_msg_iter, @@ -1413,9 +1457,9 @@ bt_component_class_port_connected_method_status muxer_input_port_connected( add_port_status = add_available_input_port(self_comp); if (add_port_status) { - BT_COMP_LOGE("Cannot add one muxer component's input port: " - "status=%s", - bt_common_func_status_string(status)); + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Cannot add one muxer component's input port: status=%s", + bt_common_func_status_string(add_port_status)); if (add_port_status == BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR) { @@ -1432,59 +1476,74 @@ end: } static inline -bt_bool muxer_upstream_msg_iters_can_all_seek_beginning( - GPtrArray *muxer_upstream_msg_iters) +bt_message_iterator_class_can_seek_beginning_method_status +muxer_upstream_msg_iters_can_all_seek_beginning( + struct muxer_comp *muxer_comp, + GPtrArray *muxer_upstream_msg_iters, bt_bool *can_seek) { + bt_message_iterator_class_can_seek_beginning_method_status status = + BT_MESSAGE_ITERATOR_CLASS_CAN_SEEK_BEGINNING_METHOD_STATUS_OK; uint64_t i; - bt_bool ret = BT_TRUE; for (i = 0; i < muxer_upstream_msg_iters->len; i++) { struct muxer_upstream_msg_iter *upstream_msg_iter = muxer_upstream_msg_iters->pdata[i]; + status = (int) bt_message_iterator_can_seek_beginning( + upstream_msg_iter->msg_iter, can_seek); + if (status != BT_MESSAGE_ITERATOR_CLASS_CAN_SEEK_BEGINNING_METHOD_STATUS_OK) { + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Failed to determine whether upstream message iterator can seek beginning: " + "msg-iter-addr=%p", upstream_msg_iter->msg_iter); + goto end; + } - if (!bt_self_component_port_input_message_iterator_can_seek_beginning( - upstream_msg_iter->msg_iter)) { - ret = BT_FALSE; + if (!*can_seek) { goto end; } } + *can_seek = BT_TRUE; + end: - return ret; + return status; } BT_HIDDEN -bt_bool muxer_msg_iter_can_seek_beginning( - bt_self_message_iterator *self_msg_iter) +bt_message_iterator_class_can_seek_beginning_method_status +muxer_msg_iter_can_seek_beginning( + bt_self_message_iterator *self_msg_iter, bt_bool *can_seek) { struct muxer_msg_iter *muxer_msg_iter = bt_self_message_iterator_get_data(self_msg_iter); - bt_bool ret = BT_TRUE; + bt_message_iterator_class_can_seek_beginning_method_status status; - if (!muxer_upstream_msg_iters_can_all_seek_beginning( - muxer_msg_iter->active_muxer_upstream_msg_iters)) { - ret = BT_FALSE; + status = muxer_upstream_msg_iters_can_all_seek_beginning( + muxer_msg_iter->muxer_comp, + muxer_msg_iter->active_muxer_upstream_msg_iters, can_seek); + if (status != BT_MESSAGE_ITERATOR_CLASS_CAN_SEEK_BEGINNING_METHOD_STATUS_OK) { goto end; } - if (!muxer_upstream_msg_iters_can_all_seek_beginning( - muxer_msg_iter->ended_muxer_upstream_msg_iters)) { - ret = BT_FALSE; + if (!*can_seek) { goto end; } + status = muxer_upstream_msg_iters_can_all_seek_beginning( + muxer_msg_iter->muxer_comp, + muxer_msg_iter->ended_muxer_upstream_msg_iters, can_seek); + end: - return ret; + return status; } BT_HIDDEN -bt_component_class_message_iterator_seek_beginning_method_status muxer_msg_iter_seek_beginning( +bt_message_iterator_class_seek_beginning_method_status muxer_msg_iter_seek_beginning( bt_self_message_iterator *self_msg_iter) { struct muxer_msg_iter *muxer_msg_iter = bt_self_message_iterator_get_data(self_msg_iter); - bt_component_class_message_iterator_seek_beginning_method_status status = - BT_COMPONENT_CLASS_MESSAGE_ITERATOR_SEEK_BEGINNING_METHOD_STATUS_OK; + bt_message_iterator_class_seek_beginning_method_status status = + BT_MESSAGE_ITERATOR_CLASS_SEEK_BEGINNING_METHOD_STATUS_OK; bt_message_iterator_seek_beginning_status seek_beg_status; uint64_t i; @@ -1494,7 +1553,7 @@ bt_component_class_message_iterator_seek_beginning_method_status muxer_msg_iter_ struct muxer_upstream_msg_iter *upstream_msg_iter = muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i]; - seek_beg_status = bt_self_component_port_input_message_iterator_seek_beginning( + seek_beg_status = bt_message_iterator_seek_beginning( upstream_msg_iter->msg_iter); if (seek_beg_status != BT_MESSAGE_ITERATOR_SEEK_BEGINNING_STATUS_OK) { status = (int) seek_beg_status; @@ -1510,7 +1569,7 @@ bt_component_class_message_iterator_seek_beginning_method_status muxer_msg_iter_ struct muxer_upstream_msg_iter *upstream_msg_iter = muxer_msg_iter->active_muxer_upstream_msg_iters->pdata[i]; - seek_beg_status = bt_self_component_port_input_message_iterator_seek_beginning( + seek_beg_status = bt_message_iterator_seek_beginning( upstream_msg_iter->msg_iter); if (seek_beg_status != BT_MESSAGE_ITERATOR_SEEK_BEGINNING_STATUS_OK) { status = (int) seek_beg_status;