X-Git-Url: http://git.efficios.com/?p=babeltrace.git;a=blobdiff_plain;f=src%2Fplugins%2Futils%2Fmuxer%2Fmuxer.c;h=182dc84d66d25ef781bee781020900040150282e;hp=a2db3cb9ee398dd3e523a9d4d6dbc91cd042e5ab;hb=0235b0db7de5bcacdb3650c92461f2ce5eb2143d;hpb=498e7994d60bd0e9f63c3d5c0fd00eec77ba7c34 diff --git a/src/plugins/utils/muxer/muxer.c b/src/plugins/utils/muxer/muxer.c index a2db3cb9..182dc84d 100644 --- a/src/plugins/utils/muxer/muxer.c +++ b/src/plugins/utils/muxer/muxer.c @@ -1,23 +1,7 @@ /* - * Copyright 2017 Philippe Proulx - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: + * SPDX-License-Identifier: MIT * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. + * Copyright 2017 Philippe Proulx */ #define BT_COMP_LOG_SELF_COMP (muxer_comp->self_comp) @@ -56,7 +40,7 @@ struct muxer_upstream_msg_iter { struct muxer_comp *muxer_comp; /* Owned by this, NULL if ended */ - bt_self_component_port_input_message_iterator *msg_iter; + bt_message_iterator *msg_iter; /* Contains `const bt_message *`, owned by this */ GQueue *msgs; @@ -107,6 +91,14 @@ struct muxer_msg_iter { * MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID. */ bt_uuid_t expected_clock_class_uuid; + + /* + * Saved error. If we hit an error in the _next method, but have some + * messages ready to return, we save the error here and return it on + * the next _next call. + */ + bt_message_iterator_class_next_method_status next_saved_status; + const struct bt_error *next_saved_error; }; static @@ -135,7 +127,7 @@ void destroy_muxer_upstream_msg_iter( muxer_upstream_msg_iter, muxer_upstream_msg_iter->msg_iter, muxer_upstream_msg_iter->msgs->length); - bt_self_component_port_input_message_iterator_put_ref( + bt_message_iterator_put_ref( muxer_upstream_msg_iter->msg_iter); if (muxer_upstream_msg_iter->msgs) { @@ -148,7 +140,7 @@ void destroy_muxer_upstream_msg_iter( static int muxer_msg_iter_add_upstream_msg_iter(struct muxer_msg_iter *muxer_msg_iter, - bt_self_component_port_input_message_iterator *self_msg_iter) + bt_message_iterator *self_msg_iter) { int ret = 0; struct muxer_upstream_msg_iter *muxer_upstream_msg_iter = @@ -162,7 +154,7 @@ int muxer_msg_iter_add_upstream_msg_iter(struct muxer_msg_iter *muxer_msg_iter, muxer_upstream_msg_iter->muxer_comp = muxer_comp; muxer_upstream_msg_iter->msg_iter = self_msg_iter; - bt_self_component_port_input_message_iterator_get_ref(muxer_upstream_msg_iter->msg_iter); + bt_message_iterator_get_ref(muxer_upstream_msg_iter->msg_iter); muxer_upstream_msg_iter->msgs = g_queue_new(); if (!muxer_upstream_msg_iter->msgs) { BT_COMP_LOGE_STR("Failed to allocate a GQueue."); @@ -247,6 +239,7 @@ void destroy_muxer_comp(struct muxer_comp *muxer_comp) g_free(muxer_comp); } +static struct bt_param_validation_map_value_entry_descr muxer_params[] = { BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END }; @@ -342,16 +335,16 @@ void muxer_finalize(bt_self_component_filter *self_comp) } static -bt_self_component_port_input_message_iterator_create_from_message_iterator_status +bt_message_iterator_create_from_message_iterator_status create_msg_iter_on_input_port(struct muxer_comp *muxer_comp, struct muxer_msg_iter *muxer_msg_iter, bt_self_component_port_input *self_port, - bt_self_component_port_input_message_iterator **msg_iter) + bt_message_iterator **msg_iter) { const bt_port *port = bt_self_component_port_as_port( bt_self_component_port_input_as_self_component_port( self_port)); - bt_self_component_port_input_message_iterator_create_from_message_iterator_status + bt_message_iterator_create_from_message_iterator_status status; BT_ASSERT(port); @@ -360,9 +353,9 @@ create_msg_iter_on_input_port(struct muxer_comp *muxer_comp, // TODO: Advance the iterator to >= the time of the latest // returned message by the muxer message // iterator which creates it. - status = bt_self_component_port_input_message_iterator_create_from_message_iterator( + status = bt_message_iterator_create_from_message_iterator( muxer_msg_iter->self_msg_iter, self_port, msg_iter); - if (status != BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_CREATE_FROM_MESSAGE_ITERATOR_STATUS_OK) { + if (status != BT_MESSAGE_ITERATOR_CREATE_FROM_MESSAGE_ITERATOR_STATUS_OK) { BT_COMP_LOGE("Cannot create upstream message iterator on input port: " "port-addr=%p, port-name=\"%s\"", port, bt_port_get_name(port)); @@ -378,13 +371,12 @@ end: } static -bt_component_class_message_iterator_next_method_status muxer_upstream_msg_iter_next( +bt_message_iterator_class_next_method_status muxer_upstream_msg_iter_next( struct muxer_upstream_msg_iter *muxer_upstream_msg_iter, bool *is_ended) { - struct muxer_comp *muxer_comp = - muxer_upstream_msg_iter->muxer_comp; - bt_component_class_message_iterator_next_method_status status; + struct muxer_comp *muxer_comp = muxer_upstream_msg_iter->muxer_comp; + bt_message_iterator_class_next_method_status status; bt_message_iterator_next_status input_port_iter_status; bt_message_array_const msgs; uint64_t i; @@ -394,7 +386,7 @@ bt_component_class_message_iterator_next_method_status muxer_upstream_msg_iter_n "muxer-upstream-msg-iter-wrap-addr=%p, msg-iter-addr=%p", muxer_upstream_msg_iter, muxer_upstream_msg_iter->msg_iter); - input_port_iter_status = bt_self_component_port_input_message_iterator_next( + input_port_iter_status = bt_message_iterator_next( muxer_upstream_msg_iter->msg_iter, &msgs, &count); BT_COMP_LOGD("Upstream message iterator's \"next\" method returned: " "status=%s", @@ -419,7 +411,7 @@ bt_component_class_message_iterator_next_method_status muxer_upstream_msg_iter_n g_queue_push_tail(muxer_upstream_msg_iter->msgs, (void *) msgs[i]); } - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; break; case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN: /* @@ -427,7 +419,7 @@ bt_component_class_message_iterator_next_method_status muxer_upstream_msg_iter_n * valid anymore. Return * BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN immediately. */ - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_AGAIN; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN; break; case BT_MESSAGE_ITERATOR_NEXT_STATUS_END: /* Fall-through. */ /* @@ -436,13 +428,22 @@ bt_component_class_message_iterator_next_method_status muxer_upstream_msg_iter_n * message. */ *is_ended = true; - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; + break; + case BT_MESSAGE_ITERATOR_NEXT_STATUS_ERROR: + case BT_MESSAGE_ITERATOR_NEXT_STATUS_MEMORY_ERROR: + /* Error status code */ + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Upstream iterator's next method returned an error: status=%s", + bt_common_func_status_string(input_port_iter_status)); + status = (int) input_port_iter_status; break; default: - /* Error or unsupported status code */ - BT_COMP_LOGE("Error or unsupported status code: " - "status-code=%d", input_port_iter_status); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; + /* Unsupported status code */ + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Unsupported status code: status=%s", + bt_common_func_status_string(input_port_iter_status)); + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; break; } @@ -563,7 +564,7 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, break; case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY: - clock_snapshot = bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const( + clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const( msg); break; default: @@ -778,7 +779,7 @@ end: * the youngest, and sets *ts_ns to its time. */ static -bt_component_class_message_iterator_next_method_status +bt_message_iterator_class_next_method_status muxer_msg_iter_youngest_upstream_msg_iter( struct muxer_comp *muxer_comp, struct muxer_msg_iter *muxer_msg_iter, @@ -788,8 +789,8 @@ muxer_msg_iter_youngest_upstream_msg_iter( size_t i; int ret; int64_t youngest_ts_ns = INT64_MAX; - bt_component_class_message_iterator_next_method_status status = - BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + bt_message_iterator_class_next_method_status status = + BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; BT_ASSERT_DBG(muxer_comp); BT_ASSERT_DBG(muxer_msg_iter); @@ -828,20 +829,20 @@ muxer_msg_iter_youngest_upstream_msg_iter( * validate_new_stream_clock_class() logs * errors. */ - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; goto end; } } else if (G_UNLIKELY(bt_message_get_type(msg) == BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY)) { const bt_clock_snapshot *cs; - cs = bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const( + cs = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const( msg); ret = validate_clock_class(muxer_msg_iter, muxer_comp, bt_clock_snapshot_borrow_clock_class_const(cs)); if (ret) { /* validate_clock_class() logs errors */ - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; goto end; } } @@ -851,7 +852,7 @@ muxer_msg_iter_youngest_upstream_msg_iter( if (ret) { /* get_msg_ts_ns() logs errors */ *muxer_upstream_msg_iter = NULL; - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; goto end; } @@ -903,7 +904,7 @@ muxer_msg_iter_youngest_upstream_msg_iter( } if (!*muxer_upstream_msg_iter) { - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END; *ts_ns = INT64_MIN; } @@ -912,15 +913,13 @@ end: } static -bt_component_class_message_iterator_next_method_status +bt_message_iterator_class_next_method_status validate_muxer_upstream_msg_iter( struct muxer_upstream_msg_iter *muxer_upstream_msg_iter, bool *is_ended) { - struct muxer_comp *muxer_comp = - muxer_upstream_msg_iter->muxer_comp; - bt_component_class_message_iterator_next_method_status status = - BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + struct muxer_comp *muxer_comp = muxer_upstream_msg_iter->muxer_comp; + bt_message_iterator_class_next_method_status status; BT_COMP_LOGD("Validating muxer's upstream message iterator wrapper: " "muxer-upstream-msg-iter-wrap-addr=%p", @@ -932,6 +931,7 @@ validate_muxer_upstream_msg_iter( "queue-len=%u, upstream-msg-iter-addr=%p", muxer_upstream_msg_iter->msgs->length, muxer_upstream_msg_iter->msg_iter); + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; goto end; } @@ -944,13 +944,12 @@ end: } static -bt_component_class_message_iterator_next_method_status +bt_message_iterator_class_next_method_status validate_muxer_upstream_msg_iters( struct muxer_msg_iter *muxer_msg_iter) { struct muxer_comp *muxer_comp = muxer_msg_iter->muxer_comp; - bt_component_class_message_iterator_next_method_status status = - BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + bt_message_iterator_class_next_method_status status; size_t i; BT_COMP_LOGD("Validating muxer's upstream message iterator wrappers: " @@ -966,9 +965,10 @@ validate_muxer_upstream_msg_iters( status = validate_muxer_upstream_msg_iter( muxer_upstream_msg_iter, &is_ended); - if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { + if (status != BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) { if (status < 0) { - BT_COMP_LOGE("Cannot validate muxer's upstream message iterator wrapper: " + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Cannot validate muxer's upstream message iterator wrapper: " "muxer-msg-iter-addr=%p, " "muxer-upstream-msg-iter-wrap-addr=%p", muxer_msg_iter, @@ -1009,22 +1009,25 @@ validate_muxer_upstream_msg_iters( } } + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; + end: return status; } static inline -bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next_one( +bt_message_iterator_class_next_method_status muxer_msg_iter_do_next_one( struct muxer_comp *muxer_comp, struct muxer_msg_iter *muxer_msg_iter, const bt_message **msg) { - bt_component_class_message_iterator_next_method_status status; + bt_message_iterator_class_next_method_status status; struct muxer_upstream_msg_iter *muxer_upstream_msg_iter = NULL; - int64_t next_return_ts; + /* Initialize to avoid -Wmaybe-uninitialized warning with gcc 4.8. */ + int64_t next_return_ts = 0; status = validate_muxer_upstream_msg_iters(muxer_msg_iter); - if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { + if (status != BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) { /* validate_muxer_upstream_msg_iters() logs details */ goto end; } @@ -1038,9 +1041,10 @@ bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next_on status = muxer_msg_iter_youngest_upstream_msg_iter(muxer_comp, muxer_msg_iter, &muxer_upstream_msg_iter, &next_return_ts); - if (status < 0 || status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END) { + if (status < 0 || status == BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END) { if (status < 0) { - BT_COMP_LOGE("Cannot find the youngest upstream message iterator wrapper: " + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Cannot find the youngest upstream message iterator wrapper: " "status=%s", bt_common_func_status_string(status)); } else { @@ -1053,12 +1057,13 @@ bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next_on } if (next_return_ts < muxer_msg_iter->last_returned_ts_ns) { - BT_COMP_LOGE("Youngest upstream message iterator wrapper's timestamp is less than muxer's message iterator's last returned timestamp: " + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Youngest upstream message iterator wrapper's timestamp is less than muxer's message iterator's last returned timestamp: " "muxer-msg-iter-addr=%p, ts=%" PRId64 ", " "last-returned-ts=%" PRId64, muxer_msg_iter, next_return_ts, muxer_msg_iter->last_returned_ts_ns); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; goto end; } @@ -1068,7 +1073,7 @@ bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next_on "ts=%" PRId64, muxer_msg_iter, muxer_upstream_msg_iter, next_return_ts); BT_ASSERT_DBG(status == - BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK); + BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK); BT_ASSERT_DBG(muxer_upstream_msg_iter); /* @@ -1084,23 +1089,33 @@ end: } static -bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next( +bt_message_iterator_class_next_method_status muxer_msg_iter_do_next( struct muxer_comp *muxer_comp, struct muxer_msg_iter *muxer_msg_iter, bt_message_array_const msgs, uint64_t capacity, uint64_t *count) { - bt_component_class_message_iterator_next_method_status status = - BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + bt_message_iterator_class_next_method_status status; uint64_t i = 0; - while (i < capacity && status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { + if (G_UNLIKELY(muxer_msg_iter->next_saved_error)) { + /* + * Last time we were called, we hit an error but had some + * messages to deliver, so we stashed the error here. Return + * it now. + */ + BT_CURRENT_THREAD_MOVE_ERROR_AND_RESET(muxer_msg_iter->next_saved_error); + status = muxer_msg_iter->next_saved_status; + goto end; + } + + do { status = muxer_msg_iter_do_next_one(muxer_comp, muxer_msg_iter, &msgs[i]); - if (status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { + if (status == BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) { i++; } - } + } while (i < capacity && status == BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK); if (i > 0) { /* @@ -1115,10 +1130,23 @@ bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next( * called, possibly without any accumulated * message, in which case we'll return it. */ + if (status < 0) { + /* + * Save this error for the next _next call. Assume that + * this component always appends error causes when + * returning an error status code, which will cause the + * current thread error to be non-NULL. + */ + muxer_msg_iter->next_saved_error = bt_current_thread_take_error(); + BT_ASSERT(muxer_msg_iter->next_saved_error); + muxer_msg_iter->next_saved_status = status; + } + *count = i; - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; } +end: return status; } @@ -1151,14 +1179,14 @@ void destroy_muxer_msg_iter(struct muxer_msg_iter *muxer_msg_iter) } static -bt_component_class_message_iterator_initialize_method_status +bt_message_iterator_class_initialize_method_status muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, struct muxer_msg_iter *muxer_msg_iter, struct bt_self_message_iterator_configuration *config) { int64_t count; int64_t i; - bt_component_class_message_iterator_initialize_method_status status; + bt_message_iterator_class_initialize_method_status status; bool can_seek_forward = true; count = bt_component_filter_get_input_port_count( @@ -1168,17 +1196,17 @@ muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, BT_COMP_LOGD("No input port to initialize for muxer component's message iterator: " "muxer-comp-addr=%p, muxer-msg-iter-addr=%p", muxer_comp, muxer_msg_iter); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_OK; + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK; goto end; } for (i = 0; i < count; i++) { - bt_self_component_port_input_message_iterator *upstream_msg_iter; + bt_message_iterator *upstream_msg_iter; bt_self_component_port_input *self_port = bt_self_component_filter_borrow_input_port_by_index( muxer_comp->self_comp_flt, i); const bt_port *port; - bt_self_component_port_input_message_iterator_create_from_message_iterator_status + bt_message_iterator_create_from_message_iterator_status msg_iter_status; int int_status; @@ -1195,7 +1223,7 @@ muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, msg_iter_status = create_msg_iter_on_input_port(muxer_comp, muxer_msg_iter, self_port, &upstream_msg_iter); - if (msg_iter_status != BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_CREATE_FROM_MESSAGE_ITERATOR_STATUS_OK) { + if (msg_iter_status != BT_MESSAGE_ITERATOR_CREATE_FROM_MESSAGE_ITERATOR_STATUS_OK) { /* create_msg_iter_on_input_port() logs errors */ status = (int) msg_iter_status; goto end; @@ -1203,16 +1231,16 @@ muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, int_status = muxer_msg_iter_add_upstream_msg_iter(muxer_msg_iter, upstream_msg_iter); - bt_self_component_port_input_message_iterator_put_ref( + bt_message_iterator_put_ref( upstream_msg_iter); if (int_status) { - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR; /* muxer_msg_iter_add_upstream_msg_iter() logs errors */ goto end; } can_seek_forward = can_seek_forward && - bt_self_component_port_input_message_iterator_can_seek_forward( + bt_message_iterator_can_seek_forward( upstream_msg_iter); } @@ -1223,25 +1251,25 @@ muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, bt_self_message_iterator_configuration_set_can_seek_forward( config, can_seek_forward); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_OK; + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK; end: return status; } BT_HIDDEN -bt_component_class_message_iterator_initialize_method_status muxer_msg_iter_init( +bt_message_iterator_class_initialize_method_status muxer_msg_iter_init( bt_self_message_iterator *self_msg_iter, bt_self_message_iterator_configuration *config, - bt_self_component_filter *self_comp, bt_self_component_port_output *port) { struct muxer_comp *muxer_comp = NULL; struct muxer_msg_iter *muxer_msg_iter = NULL; - bt_component_class_message_iterator_initialize_method_status status; + bt_message_iterator_class_initialize_method_status status; + bt_self_component *self_comp = + bt_self_message_iterator_borrow_component(self_msg_iter); - muxer_comp = bt_self_component_get_data( - bt_self_component_filter_as_self_component(self_comp)); + muxer_comp = bt_self_component_get_data(self_comp); BT_ASSERT(muxer_comp); BT_COMP_LOGD("Initializing muxer component's message iterator: " "comp-addr=%p, muxer-comp-addr=%p, msg-iter-addr=%p", @@ -1256,7 +1284,7 @@ bt_component_class_message_iterator_initialize_method_status muxer_msg_iter_init BT_COMP_LOGE("Recursive initialization of muxer component's message iterator: " "comp-addr=%p, muxer-comp-addr=%p, msg-iter-addr=%p", self_comp, muxer_comp, self_msg_iter); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR; goto error; } @@ -1264,7 +1292,7 @@ bt_component_class_message_iterator_initialize_method_status muxer_msg_iter_init muxer_msg_iter = g_new0(struct muxer_msg_iter, 1); if (!muxer_msg_iter) { BT_COMP_LOGE_STR("Failed to allocate one muxer component's message iterator."); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; goto error; } @@ -1276,7 +1304,7 @@ bt_component_class_message_iterator_initialize_method_status muxer_msg_iter_init (GDestroyNotify) destroy_muxer_upstream_msg_iter); if (!muxer_msg_iter->active_muxer_upstream_msg_iters) { BT_COMP_LOGE_STR("Failed to allocate a GPtrArray."); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; goto error; } @@ -1285,7 +1313,7 @@ bt_component_class_message_iterator_initialize_method_status muxer_msg_iter_init (GDestroyNotify) destroy_muxer_upstream_msg_iter); if (!muxer_msg_iter->ended_muxer_upstream_msg_iters) { BT_COMP_LOGE_STR("Failed to allocate a GPtrArray."); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; goto error; } @@ -1339,12 +1367,12 @@ void muxer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter) } BT_HIDDEN -bt_component_class_message_iterator_next_method_status muxer_msg_iter_next( +bt_message_iterator_class_next_method_status muxer_msg_iter_next( bt_self_message_iterator *self_msg_iter, bt_message_array_const msgs, uint64_t capacity, uint64_t *count) { - bt_component_class_message_iterator_next_method_status status; + bt_message_iterator_class_next_method_status status; struct muxer_msg_iter *muxer_msg_iter = bt_self_message_iterator_get_data(self_msg_iter); bt_self_component *self_comp = NULL; @@ -1411,20 +1439,20 @@ end: } static inline -bt_component_class_message_iterator_can_seek_beginning_method_status +bt_message_iterator_class_can_seek_beginning_method_status muxer_upstream_msg_iters_can_all_seek_beginning( GPtrArray *muxer_upstream_msg_iters, bt_bool *can_seek) { - bt_component_class_message_iterator_can_seek_beginning_method_status status = - BT_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_BEGINNING_METHOD_STATUS_OK; + bt_message_iterator_class_can_seek_beginning_method_status status = + BT_MESSAGE_ITERATOR_CLASS_CAN_SEEK_BEGINNING_METHOD_STATUS_OK; uint64_t i; for (i = 0; i < muxer_upstream_msg_iters->len; i++) { struct muxer_upstream_msg_iter *upstream_msg_iter = muxer_upstream_msg_iters->pdata[i]; - status = (int) bt_self_component_port_input_message_iterator_can_seek_beginning( + status = (int) bt_message_iterator_can_seek_beginning( upstream_msg_iter->msg_iter, can_seek); - if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_BEGINNING_METHOD_STATUS_OK) { + if (status != BT_MESSAGE_ITERATOR_CLASS_CAN_SEEK_BEGINNING_METHOD_STATUS_OK) { goto end; } @@ -1440,17 +1468,17 @@ end: } BT_HIDDEN -bt_component_class_message_iterator_can_seek_beginning_method_status +bt_message_iterator_class_can_seek_beginning_method_status muxer_msg_iter_can_seek_beginning( bt_self_message_iterator *self_msg_iter, bt_bool *can_seek) { struct muxer_msg_iter *muxer_msg_iter = bt_self_message_iterator_get_data(self_msg_iter); - bt_component_class_message_iterator_can_seek_beginning_method_status status; + bt_message_iterator_class_can_seek_beginning_method_status status; status = muxer_upstream_msg_iters_can_all_seek_beginning( muxer_msg_iter->active_muxer_upstream_msg_iters, can_seek); - if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_BEGINNING_METHOD_STATUS_OK) { + if (status != BT_MESSAGE_ITERATOR_CLASS_CAN_SEEK_BEGINNING_METHOD_STATUS_OK) { goto end; } @@ -1466,13 +1494,13 @@ end: } BT_HIDDEN -bt_component_class_message_iterator_seek_beginning_method_status muxer_msg_iter_seek_beginning( +bt_message_iterator_class_seek_beginning_method_status muxer_msg_iter_seek_beginning( bt_self_message_iterator *self_msg_iter) { struct muxer_msg_iter *muxer_msg_iter = bt_self_message_iterator_get_data(self_msg_iter); - bt_component_class_message_iterator_seek_beginning_method_status status = - BT_COMPONENT_CLASS_MESSAGE_ITERATOR_SEEK_BEGINNING_METHOD_STATUS_OK; + bt_message_iterator_class_seek_beginning_method_status status = + BT_MESSAGE_ITERATOR_CLASS_SEEK_BEGINNING_METHOD_STATUS_OK; bt_message_iterator_seek_beginning_status seek_beg_status; uint64_t i; @@ -1482,7 +1510,7 @@ bt_component_class_message_iterator_seek_beginning_method_status muxer_msg_iter_ struct muxer_upstream_msg_iter *upstream_msg_iter = muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i]; - seek_beg_status = bt_self_component_port_input_message_iterator_seek_beginning( + seek_beg_status = bt_message_iterator_seek_beginning( upstream_msg_iter->msg_iter); if (seek_beg_status != BT_MESSAGE_ITERATOR_SEEK_BEGINNING_STATUS_OK) { status = (int) seek_beg_status; @@ -1498,7 +1526,7 @@ bt_component_class_message_iterator_seek_beginning_method_status muxer_msg_iter_ struct muxer_upstream_msg_iter *upstream_msg_iter = muxer_msg_iter->active_muxer_upstream_msg_iters->pdata[i]; - seek_beg_status = bt_self_component_port_input_message_iterator_seek_beginning( + seek_beg_status = bt_message_iterator_seek_beginning( upstream_msg_iter->msg_iter); if (seek_beg_status != BT_MESSAGE_ITERATOR_SEEK_BEGINNING_STATUS_OK) { status = (int) seek_beg_status;