X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fplugins%2Futils%2Fmuxer%2Fmuxer.c;h=2554d2705a561c23b64ff6c2162d53bc62f642dc;hb=e1b15e554b760856b13a3e617999e21bb9e1ae7e;hp=3e4d42057b1d49e0a00814cdd6f063f9f11da634;hpb=c49bf79b7d7e87cc21d065a7d208bcc238fb0800;p=babeltrace.git diff --git a/src/plugins/utils/muxer/muxer.c b/src/plugins/utils/muxer/muxer.c index 3e4d4205..2554d270 100644 --- a/src/plugins/utils/muxer/muxer.c +++ b/src/plugins/utils/muxer/muxer.c @@ -37,6 +37,7 @@ #include #include "plugins/common/muxing/muxing.h" +#include "plugins/common/param-validation/param-validation.h" #include "muxer.h" @@ -55,7 +56,7 @@ struct muxer_upstream_msg_iter { struct muxer_comp *muxer_comp; /* Owned by this, NULL if ended */ - bt_self_component_port_input_message_iterator *msg_iter; + bt_message_iterator *msg_iter; /* Contains `const bt_message *`, owned by this */ GQueue *msgs; @@ -106,6 +107,14 @@ struct muxer_msg_iter { * MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID. */ bt_uuid_t expected_clock_class_uuid; + + /* + * Saved error. If we hit an error in the _next method, but have some + * messages ready to return, we save the error here and return it on + * the next _next call. + */ + bt_message_iterator_class_next_method_status next_saved_status; + const struct bt_error *next_saved_error; }; static @@ -134,7 +143,7 @@ void destroy_muxer_upstream_msg_iter( muxer_upstream_msg_iter, muxer_upstream_msg_iter->msg_iter, muxer_upstream_msg_iter->msgs->length); - bt_self_component_port_input_message_iterator_put_ref( + bt_message_iterator_put_ref( muxer_upstream_msg_iter->msg_iter); if (muxer_upstream_msg_iter->msgs) { @@ -147,7 +156,7 @@ void destroy_muxer_upstream_msg_iter( static int muxer_msg_iter_add_upstream_msg_iter(struct muxer_msg_iter *muxer_msg_iter, - bt_self_component_port_input_message_iterator *self_msg_iter) + bt_message_iterator *self_msg_iter) { int ret = 0; struct muxer_upstream_msg_iter *muxer_upstream_msg_iter = @@ -161,7 +170,7 @@ int muxer_msg_iter_add_upstream_msg_iter(struct muxer_msg_iter *muxer_msg_iter, muxer_upstream_msg_iter->muxer_comp = muxer_comp; muxer_upstream_msg_iter->msg_iter = self_msg_iter; - bt_self_component_port_input_message_iterator_get_ref(muxer_upstream_msg_iter->msg_iter); + bt_message_iterator_get_ref(muxer_upstream_msg_iter->msg_iter); muxer_upstream_msg_iter->msgs = g_queue_new(); if (!muxer_upstream_msg_iter->msgs) { BT_COMP_LOGE_STR("Failed to allocate a GQueue."); @@ -246,20 +255,26 @@ void destroy_muxer_comp(struct muxer_comp *muxer_comp) g_free(muxer_comp); } +static +struct bt_param_validation_map_value_entry_descr muxer_params[] = { + BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END +}; + BT_HIDDEN bt_component_class_initialize_method_status muxer_init( bt_self_component_filter *self_comp_flt, bt_self_component_filter_configuration *config, const bt_value *params, void *init_data) { - bt_component_class_initialize_method_status status = - BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK; + bt_component_class_initialize_method_status status; bt_self_component_add_port_status add_port_status; bt_self_component *self_comp = bt_self_component_filter_as_self_component(self_comp_flt); struct muxer_comp *muxer_comp = g_new0(struct muxer_comp, 1); bt_logging_level log_level = bt_component_get_logging_level( bt_self_component_as_component(self_comp)); + enum bt_param_validation_status validation_status; + gchar *validate_error = NULL; BT_COMP_LOG_CUR_LVL(BT_LOG_INFO, log_level, self_comp, "Initializing muxer component: " @@ -268,6 +283,7 @@ bt_component_class_initialize_method_status muxer_init( if (!muxer_comp) { BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR, log_level, self_comp, "Failed to allocate one muxer component."); + status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; goto error; } @@ -275,6 +291,17 @@ bt_component_class_initialize_method_status muxer_init( muxer_comp->self_comp = self_comp; muxer_comp->self_comp_flt = self_comp_flt; + validation_status = bt_param_validation_validate(params, + muxer_params, &validate_error); + if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) { + status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; + goto error; + } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) { + status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, "%s", validate_error); + goto error; + } + bt_self_component_set_data(self_comp, muxer_comp); add_port_status = add_available_input_port(self_comp_flt); if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) { @@ -282,13 +309,7 @@ bt_component_class_initialize_method_status muxer_init( "muxer-comp-addr=%p, status=%s", muxer_comp, bt_common_func_status_string(add_port_status)); - if (add_port_status == - BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR) { - status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; - } else { - status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR; - } - + status = (int) add_port_status; goto error; } @@ -298,13 +319,7 @@ bt_component_class_initialize_method_status muxer_init( "muxer-comp-addr=%p, status=%s", muxer_comp, bt_common_func_status_string(add_port_status)); - if (add_port_status == - BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR) { - status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; - } else { - status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR; - } - + status = (int) add_port_status; goto error; } @@ -312,17 +327,15 @@ bt_component_class_initialize_method_status muxer_init( "comp-addr=%p, params-addr=%p, muxer-comp-addr=%p", self_comp, params, muxer_comp); + status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK; goto end; error: destroy_muxer_comp(muxer_comp); bt_self_component_set_data(self_comp, NULL); - if (status == BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) { - status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR; - } - end: + g_free(validate_error); return status; } @@ -338,16 +351,16 @@ void muxer_finalize(bt_self_component_filter *self_comp) } static -bt_self_component_port_input_message_iterator_create_from_message_iterator_status +bt_message_iterator_create_from_message_iterator_status create_msg_iter_on_input_port(struct muxer_comp *muxer_comp, struct muxer_msg_iter *muxer_msg_iter, bt_self_component_port_input *self_port, - bt_self_component_port_input_message_iterator **msg_iter) + bt_message_iterator **msg_iter) { const bt_port *port = bt_self_component_port_as_port( bt_self_component_port_input_as_self_component_port( self_port)); - bt_self_component_port_input_message_iterator_create_from_message_iterator_status + bt_message_iterator_create_from_message_iterator_status status; BT_ASSERT(port); @@ -356,9 +369,9 @@ create_msg_iter_on_input_port(struct muxer_comp *muxer_comp, // TODO: Advance the iterator to >= the time of the latest // returned message by the muxer message // iterator which creates it. - status = bt_self_component_port_input_message_iterator_create_from_message_iterator( + status = bt_message_iterator_create_from_message_iterator( muxer_msg_iter->self_msg_iter, self_port, msg_iter); - if (status != BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_CREATE_FROM_MESSAGE_ITERATOR_STATUS_OK) { + if (status != BT_MESSAGE_ITERATOR_CREATE_FROM_MESSAGE_ITERATOR_STATUS_OK) { BT_COMP_LOGE("Cannot create upstream message iterator on input port: " "port-addr=%p, port-name=\"%s\"", port, bt_port_get_name(port)); @@ -374,13 +387,12 @@ end: } static -bt_component_class_message_iterator_next_method_status muxer_upstream_msg_iter_next( +bt_message_iterator_class_next_method_status muxer_upstream_msg_iter_next( struct muxer_upstream_msg_iter *muxer_upstream_msg_iter, bool *is_ended) { - struct muxer_comp *muxer_comp = - muxer_upstream_msg_iter->muxer_comp; - bt_component_class_message_iterator_next_method_status status; + struct muxer_comp *muxer_comp = muxer_upstream_msg_iter->muxer_comp; + bt_message_iterator_class_next_method_status status; bt_message_iterator_next_status input_port_iter_status; bt_message_array_const msgs; uint64_t i; @@ -390,7 +402,7 @@ bt_component_class_message_iterator_next_method_status muxer_upstream_msg_iter_n "muxer-upstream-msg-iter-wrap-addr=%p, msg-iter-addr=%p", muxer_upstream_msg_iter, muxer_upstream_msg_iter->msg_iter); - input_port_iter_status = bt_self_component_port_input_message_iterator_next( + input_port_iter_status = bt_message_iterator_next( muxer_upstream_msg_iter->msg_iter, &msgs, &count); BT_COMP_LOGD("Upstream message iterator's \"next\" method returned: " "status=%s", @@ -403,7 +415,7 @@ bt_component_class_message_iterator_next_method_status muxer_upstream_msg_iter_n * valid: it must be considered for muxing operations. */ BT_COMP_LOGD_STR("Validated upstream message iterator wrapper."); - BT_ASSERT(count > 0); + BT_ASSERT_DBG(count > 0); /* Move messages to our queue */ for (i = 0; i < count; i++) { @@ -415,7 +427,7 @@ bt_component_class_message_iterator_next_method_status muxer_upstream_msg_iter_n g_queue_push_tail(muxer_upstream_msg_iter->msgs, (void *) msgs[i]); } - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; break; case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN: /* @@ -423,7 +435,7 @@ bt_component_class_message_iterator_next_method_status muxer_upstream_msg_iter_n * valid anymore. Return * BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN immediately. */ - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_AGAIN; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN; break; case BT_MESSAGE_ITERATOR_NEXT_STATUS_END: /* Fall-through. */ /* @@ -432,13 +444,22 @@ bt_component_class_message_iterator_next_method_status muxer_upstream_msg_iter_n * message. */ *is_ended = true; - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; + break; + case BT_MESSAGE_ITERATOR_NEXT_STATUS_ERROR: + case BT_MESSAGE_ITERATOR_NEXT_STATUS_MEMORY_ERROR: + /* Error status code */ + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Upstream iterator's next method returned an error: status=%s", + bt_common_func_status_string(input_port_iter_status)); + status = (int) input_port_iter_status; break; default: - /* Error or unsupported status code */ - BT_COMP_LOGE("Error or unsupported status code: " - "status-code=%d", input_port_iter_status); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; + /* Unsupported status code */ + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Unsupported status code: status=%s", + bt_common_func_status_string(input_port_iter_status)); + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; break; } @@ -456,8 +477,8 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, const bt_stream_class *stream_class = NULL; bt_message_type msg_type; - BT_ASSERT(msg); - BT_ASSERT(ts_ns); + BT_ASSERT_DBG(msg); + BT_ASSERT_DBG(ts_ns); BT_COMP_LOGD("Getting message's timestamp: " "muxer-msg-iter-addr=%p, msg-addr=%p, " "last-returned-ts=%" PRId64, @@ -491,7 +512,7 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, switch (msg_type) { case BT_MESSAGE_TYPE_EVENT: - BT_ASSERT(bt_message_event_borrow_stream_class_default_clock_class_const( + BT_ASSERT_DBG(bt_message_event_borrow_stream_class_default_clock_class_const( msg)); clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const( msg); @@ -608,7 +629,7 @@ int validate_clock_class(struct muxer_msg_iter *muxer_msg_iter, const uint8_t *cc_uuid; const char *cc_name; - BT_ASSERT(clock_class); + BT_ASSERT_DBG(clock_class); cc_uuid = bt_clock_class_get_uuid(clock_class); cc_name = bt_clock_class_get_name(clock_class); @@ -710,7 +731,7 @@ int validate_clock_class(struct muxer_msg_iter *muxer_msg_iter, BT_COMP_LOGF("Unexpected clock class expectation: " "expectation-code=%d", muxer_msg_iter->clock_class_expectation); - abort(); + bt_common_abort(); } goto end; @@ -774,7 +795,7 @@ end: * the youngest, and sets *ts_ns to its time. */ static -bt_component_class_message_iterator_next_method_status +bt_message_iterator_class_next_method_status muxer_msg_iter_youngest_upstream_msg_iter( struct muxer_comp *muxer_comp, struct muxer_msg_iter *muxer_msg_iter, @@ -784,12 +805,12 @@ muxer_msg_iter_youngest_upstream_msg_iter( size_t i; int ret; int64_t youngest_ts_ns = INT64_MAX; - bt_component_class_message_iterator_next_method_status status = - BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + bt_message_iterator_class_next_method_status status = + BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; - BT_ASSERT(muxer_comp); - BT_ASSERT(muxer_msg_iter); - BT_ASSERT(muxer_upstream_msg_iter); + BT_ASSERT_DBG(muxer_comp); + BT_ASSERT_DBG(muxer_msg_iter); + BT_ASSERT_DBG(muxer_upstream_msg_iter); *muxer_upstream_msg_iter = NULL; for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len; @@ -809,9 +830,9 @@ muxer_msg_iter_youngest_upstream_msg_iter( continue; } - BT_ASSERT(cur_muxer_upstream_msg_iter->msgs->length > 0); + BT_ASSERT_DBG(cur_muxer_upstream_msg_iter->msgs->length > 0); msg = g_queue_peek_head(cur_muxer_upstream_msg_iter->msgs); - BT_ASSERT(msg); + BT_ASSERT_DBG(msg); if (G_UNLIKELY(bt_message_get_type(msg) == BT_MESSAGE_TYPE_STREAM_BEGINNING)) { @@ -824,7 +845,7 @@ muxer_msg_iter_youngest_upstream_msg_iter( * validate_new_stream_clock_class() logs * errors. */ - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; goto end; } } else if (G_UNLIKELY(bt_message_get_type(msg) == @@ -837,7 +858,7 @@ muxer_msg_iter_youngest_upstream_msg_iter( bt_clock_snapshot_borrow_clock_class_const(cs)); if (ret) { /* validate_clock_class() logs errors */ - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; goto end; } } @@ -847,7 +868,7 @@ muxer_msg_iter_youngest_upstream_msg_iter( if (ret) { /* get_msg_ts_ns() logs errors */ *muxer_upstream_msg_iter = NULL; - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; goto end; } @@ -899,7 +920,7 @@ muxer_msg_iter_youngest_upstream_msg_iter( } if (!*muxer_upstream_msg_iter) { - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END; *ts_ns = INT64_MIN; } @@ -908,15 +929,13 @@ end: } static -bt_component_class_message_iterator_next_method_status +bt_message_iterator_class_next_method_status validate_muxer_upstream_msg_iter( struct muxer_upstream_msg_iter *muxer_upstream_msg_iter, bool *is_ended) { - struct muxer_comp *muxer_comp = - muxer_upstream_msg_iter->muxer_comp; - bt_component_class_message_iterator_next_method_status status = - BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + struct muxer_comp *muxer_comp = muxer_upstream_msg_iter->muxer_comp; + bt_message_iterator_class_next_method_status status; BT_COMP_LOGD("Validating muxer's upstream message iterator wrapper: " "muxer-upstream-msg-iter-wrap-addr=%p", @@ -928,6 +947,7 @@ validate_muxer_upstream_msg_iter( "queue-len=%u, upstream-msg-iter-addr=%p", muxer_upstream_msg_iter->msgs->length, muxer_upstream_msg_iter->msg_iter); + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; goto end; } @@ -940,13 +960,12 @@ end: } static -bt_component_class_message_iterator_next_method_status +bt_message_iterator_class_next_method_status validate_muxer_upstream_msg_iters( struct muxer_msg_iter *muxer_msg_iter) { struct muxer_comp *muxer_comp = muxer_msg_iter->muxer_comp; - bt_component_class_message_iterator_next_method_status status = - BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + bt_message_iterator_class_next_method_status status; size_t i; BT_COMP_LOGD("Validating muxer's upstream message iterator wrappers: " @@ -962,9 +981,10 @@ validate_muxer_upstream_msg_iters( status = validate_muxer_upstream_msg_iter( muxer_upstream_msg_iter, &is_ended); - if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { + if (status != BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) { if (status < 0) { - BT_COMP_LOGE("Cannot validate muxer's upstream message iterator wrapper: " + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Cannot validate muxer's upstream message iterator wrapper: " "muxer-msg-iter-addr=%p, " "muxer-upstream-msg-iter-wrap-addr=%p", muxer_msg_iter, @@ -1005,22 +1025,24 @@ validate_muxer_upstream_msg_iters( } } + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; + end: return status; } static inline -bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next_one( +bt_message_iterator_class_next_method_status muxer_msg_iter_do_next_one( struct muxer_comp *muxer_comp, struct muxer_msg_iter *muxer_msg_iter, const bt_message **msg) { - bt_component_class_message_iterator_next_method_status status; + bt_message_iterator_class_next_method_status status; struct muxer_upstream_msg_iter *muxer_upstream_msg_iter = NULL; int64_t next_return_ts; status = validate_muxer_upstream_msg_iters(muxer_msg_iter); - if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { + if (status != BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) { /* validate_muxer_upstream_msg_iters() logs details */ goto end; } @@ -1034,9 +1056,10 @@ bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next_on status = muxer_msg_iter_youngest_upstream_msg_iter(muxer_comp, muxer_msg_iter, &muxer_upstream_msg_iter, &next_return_ts); - if (status < 0 || status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END) { + if (status < 0 || status == BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END) { if (status < 0) { - BT_COMP_LOGE("Cannot find the youngest upstream message iterator wrapper: " + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Cannot find the youngest upstream message iterator wrapper: " "status=%s", bt_common_func_status_string(status)); } else { @@ -1049,12 +1072,13 @@ bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next_on } if (next_return_ts < muxer_msg_iter->last_returned_ts_ns) { - BT_COMP_LOGE("Youngest upstream message iterator wrapper's timestamp is less than muxer's message iterator's last returned timestamp: " + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Youngest upstream message iterator wrapper's timestamp is less than muxer's message iterator's last returned timestamp: " "muxer-msg-iter-addr=%p, ts=%" PRId64 ", " "last-returned-ts=%" PRId64, muxer_msg_iter, next_return_ts, muxer_msg_iter->last_returned_ts_ns); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; goto end; } @@ -1063,15 +1087,16 @@ bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next_on "muxer-upstream-msg-iter-wrap-addr=%p, " "ts=%" PRId64, muxer_msg_iter, muxer_upstream_msg_iter, next_return_ts); - BT_ASSERT(status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK); - BT_ASSERT(muxer_upstream_msg_iter); + BT_ASSERT_DBG(status == + BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK); + BT_ASSERT_DBG(muxer_upstream_msg_iter); /* * Consume from the queue's head: other side * (muxer_upstream_msg_iter_next()) writes to the tail. */ *msg = g_queue_pop_head(muxer_upstream_msg_iter->msgs); - BT_ASSERT(*msg); + BT_ASSERT_DBG(*msg); muxer_msg_iter->last_returned_ts_ns = next_return_ts; end: @@ -1079,23 +1104,33 @@ end: } static -bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next( +bt_message_iterator_class_next_method_status muxer_msg_iter_do_next( struct muxer_comp *muxer_comp, struct muxer_msg_iter *muxer_msg_iter, bt_message_array_const msgs, uint64_t capacity, uint64_t *count) { - bt_component_class_message_iterator_next_method_status status = - BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + bt_message_iterator_class_next_method_status status; uint64_t i = 0; - while (i < capacity && status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { + if (G_UNLIKELY(muxer_msg_iter->next_saved_error)) { + /* + * Last time we were called, we hit an error but had some + * messages to deliver, so we stashed the error here. Return + * it now. + */ + BT_CURRENT_THREAD_MOVE_ERROR_AND_RESET(muxer_msg_iter->next_saved_error); + status = muxer_msg_iter->next_saved_status; + goto end; + } + + do { status = muxer_msg_iter_do_next_one(muxer_comp, muxer_msg_iter, &msgs[i]); - if (status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { + if (status == BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) { i++; } - } + } while (i < capacity && status == BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK); if (i > 0) { /* @@ -1110,10 +1145,23 @@ bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next( * called, possibly without any accumulated * message, in which case we'll return it. */ + if (status < 0) { + /* + * Save this error for the next _next call. Assume that + * this component always appends error causes when + * returning an error status code, which will cause the + * current thread error to be non-NULL. + */ + muxer_msg_iter->next_saved_error = bt_current_thread_take_error(); + BT_ASSERT(muxer_msg_iter->next_saved_error); + muxer_msg_iter->next_saved_status = status; + } + *count = i; - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; } +end: return status; } @@ -1146,14 +1194,14 @@ void destroy_muxer_msg_iter(struct muxer_msg_iter *muxer_msg_iter) } static -bt_component_class_message_iterator_initialize_method_status +bt_message_iterator_class_initialize_method_status muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, struct muxer_msg_iter *muxer_msg_iter, struct bt_self_message_iterator_configuration *config) { int64_t count; int64_t i; - bt_component_class_message_iterator_initialize_method_status status; + bt_message_iterator_class_initialize_method_status status; bool can_seek_forward = true; count = bt_component_filter_get_input_port_count( @@ -1163,17 +1211,17 @@ muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, BT_COMP_LOGD("No input port to initialize for muxer component's message iterator: " "muxer-comp-addr=%p, muxer-msg-iter-addr=%p", muxer_comp, muxer_msg_iter); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_OK; + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK; goto end; } for (i = 0; i < count; i++) { - bt_self_component_port_input_message_iterator *upstream_msg_iter; + bt_message_iterator *upstream_msg_iter; bt_self_component_port_input *self_port = bt_self_component_filter_borrow_input_port_by_index( muxer_comp->self_comp_flt, i); const bt_port *port; - bt_self_component_port_input_message_iterator_create_from_message_iterator_status + bt_message_iterator_create_from_message_iterator_status msg_iter_status; int int_status; @@ -1190,7 +1238,7 @@ muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, msg_iter_status = create_msg_iter_on_input_port(muxer_comp, muxer_msg_iter, self_port, &upstream_msg_iter); - if (msg_iter_status != BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_CREATE_FROM_MESSAGE_ITERATOR_STATUS_OK) { + if (msg_iter_status != BT_MESSAGE_ITERATOR_CREATE_FROM_MESSAGE_ITERATOR_STATUS_OK) { /* create_msg_iter_on_input_port() logs errors */ status = (int) msg_iter_status; goto end; @@ -1198,16 +1246,16 @@ muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, int_status = muxer_msg_iter_add_upstream_msg_iter(muxer_msg_iter, upstream_msg_iter); - bt_self_component_port_input_message_iterator_put_ref( + bt_message_iterator_put_ref( upstream_msg_iter); if (int_status) { - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR; /* muxer_msg_iter_add_upstream_msg_iter() logs errors */ goto end; } can_seek_forward = can_seek_forward && - bt_self_component_port_input_message_iterator_can_seek_forward( + bt_message_iterator_can_seek_forward( upstream_msg_iter); } @@ -1218,25 +1266,25 @@ muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, bt_self_message_iterator_configuration_set_can_seek_forward( config, can_seek_forward); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_OK; + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK; end: return status; } BT_HIDDEN -bt_component_class_message_iterator_initialize_method_status muxer_msg_iter_init( +bt_message_iterator_class_initialize_method_status muxer_msg_iter_init( bt_self_message_iterator *self_msg_iter, bt_self_message_iterator_configuration *config, - bt_self_component_filter *self_comp, bt_self_component_port_output *port) { struct muxer_comp *muxer_comp = NULL; struct muxer_msg_iter *muxer_msg_iter = NULL; - bt_component_class_message_iterator_initialize_method_status status; + bt_message_iterator_class_initialize_method_status status; + bt_self_component *self_comp = + bt_self_message_iterator_borrow_component(self_msg_iter); - muxer_comp = bt_self_component_get_data( - bt_self_component_filter_as_self_component(self_comp)); + muxer_comp = bt_self_component_get_data(self_comp); BT_ASSERT(muxer_comp); BT_COMP_LOGD("Initializing muxer component's message iterator: " "comp-addr=%p, muxer-comp-addr=%p, msg-iter-addr=%p", @@ -1251,7 +1299,7 @@ bt_component_class_message_iterator_initialize_method_status muxer_msg_iter_init BT_COMP_LOGE("Recursive initialization of muxer component's message iterator: " "comp-addr=%p, muxer-comp-addr=%p, msg-iter-addr=%p", self_comp, muxer_comp, self_msg_iter); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR; goto error; } @@ -1259,7 +1307,7 @@ bt_component_class_message_iterator_initialize_method_status muxer_msg_iter_init muxer_msg_iter = g_new0(struct muxer_msg_iter, 1); if (!muxer_msg_iter) { BT_COMP_LOGE_STR("Failed to allocate one muxer component's message iterator."); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; goto error; } @@ -1271,7 +1319,7 @@ bt_component_class_message_iterator_initialize_method_status muxer_msg_iter_init (GDestroyNotify) destroy_muxer_upstream_msg_iter); if (!muxer_msg_iter->active_muxer_upstream_msg_iters) { BT_COMP_LOGE_STR("Failed to allocate a GPtrArray."); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; goto error; } @@ -1280,7 +1328,7 @@ bt_component_class_message_iterator_initialize_method_status muxer_msg_iter_init (GDestroyNotify) destroy_muxer_upstream_msg_iter); if (!muxer_msg_iter->ended_muxer_upstream_msg_iters) { BT_COMP_LOGE_STR("Failed to allocate a GPtrArray."); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; goto error; } @@ -1334,23 +1382,23 @@ void muxer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter) } BT_HIDDEN -bt_component_class_message_iterator_next_method_status muxer_msg_iter_next( +bt_message_iterator_class_next_method_status muxer_msg_iter_next( bt_self_message_iterator *self_msg_iter, bt_message_array_const msgs, uint64_t capacity, uint64_t *count) { - bt_component_class_message_iterator_next_method_status status; + bt_message_iterator_class_next_method_status status; struct muxer_msg_iter *muxer_msg_iter = bt_self_message_iterator_get_data(self_msg_iter); bt_self_component *self_comp = NULL; struct muxer_comp *muxer_comp = NULL; - BT_ASSERT(muxer_msg_iter); + BT_ASSERT_DBG(muxer_msg_iter); self_comp = bt_self_message_iterator_borrow_component( self_msg_iter); - BT_ASSERT(self_comp); + BT_ASSERT_DBG(self_comp); muxer_comp = bt_self_component_get_data(self_comp); - BT_ASSERT(muxer_comp); + BT_ASSERT_DBG(muxer_comp); BT_COMP_LOGT("Muxer component's message iterator's \"next\" method called: " "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, " "msg-iter-addr=%p", @@ -1406,20 +1454,20 @@ end: } static inline -bt_component_class_message_iterator_can_seek_beginning_method_status +bt_message_iterator_class_can_seek_beginning_method_status muxer_upstream_msg_iters_can_all_seek_beginning( GPtrArray *muxer_upstream_msg_iters, bt_bool *can_seek) { - bt_component_class_message_iterator_can_seek_beginning_method_status status = - BT_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_BEGINNING_METHOD_STATUS_OK; + bt_message_iterator_class_can_seek_beginning_method_status status = + BT_MESSAGE_ITERATOR_CLASS_CAN_SEEK_BEGINNING_METHOD_STATUS_OK; uint64_t i; for (i = 0; i < muxer_upstream_msg_iters->len; i++) { struct muxer_upstream_msg_iter *upstream_msg_iter = muxer_upstream_msg_iters->pdata[i]; - status = (int) bt_self_component_port_input_message_iterator_can_seek_beginning( + status = (int) bt_message_iterator_can_seek_beginning( upstream_msg_iter->msg_iter, can_seek); - if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_BEGINNING_METHOD_STATUS_OK) { + if (status != BT_MESSAGE_ITERATOR_CLASS_CAN_SEEK_BEGINNING_METHOD_STATUS_OK) { goto end; } @@ -1435,17 +1483,17 @@ end: } BT_HIDDEN -bt_component_class_message_iterator_can_seek_beginning_method_status +bt_message_iterator_class_can_seek_beginning_method_status muxer_msg_iter_can_seek_beginning( bt_self_message_iterator *self_msg_iter, bt_bool *can_seek) { struct muxer_msg_iter *muxer_msg_iter = bt_self_message_iterator_get_data(self_msg_iter); - bt_component_class_message_iterator_can_seek_beginning_method_status status; + bt_message_iterator_class_can_seek_beginning_method_status status; status = muxer_upstream_msg_iters_can_all_seek_beginning( muxer_msg_iter->active_muxer_upstream_msg_iters, can_seek); - if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_BEGINNING_METHOD_STATUS_OK) { + if (status != BT_MESSAGE_ITERATOR_CLASS_CAN_SEEK_BEGINNING_METHOD_STATUS_OK) { goto end; } @@ -1461,13 +1509,13 @@ end: } BT_HIDDEN -bt_component_class_message_iterator_seek_beginning_method_status muxer_msg_iter_seek_beginning( +bt_message_iterator_class_seek_beginning_method_status muxer_msg_iter_seek_beginning( bt_self_message_iterator *self_msg_iter) { struct muxer_msg_iter *muxer_msg_iter = bt_self_message_iterator_get_data(self_msg_iter); - bt_component_class_message_iterator_seek_beginning_method_status status = - BT_COMPONENT_CLASS_MESSAGE_ITERATOR_SEEK_BEGINNING_METHOD_STATUS_OK; + bt_message_iterator_class_seek_beginning_method_status status = + BT_MESSAGE_ITERATOR_CLASS_SEEK_BEGINNING_METHOD_STATUS_OK; bt_message_iterator_seek_beginning_status seek_beg_status; uint64_t i; @@ -1477,7 +1525,7 @@ bt_component_class_message_iterator_seek_beginning_method_status muxer_msg_iter_ struct muxer_upstream_msg_iter *upstream_msg_iter = muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i]; - seek_beg_status = bt_self_component_port_input_message_iterator_seek_beginning( + seek_beg_status = bt_message_iterator_seek_beginning( upstream_msg_iter->msg_iter); if (seek_beg_status != BT_MESSAGE_ITERATOR_SEEK_BEGINNING_STATUS_OK) { status = (int) seek_beg_status; @@ -1493,7 +1541,7 @@ bt_component_class_message_iterator_seek_beginning_method_status muxer_msg_iter_ struct muxer_upstream_msg_iter *upstream_msg_iter = muxer_msg_iter->active_muxer_upstream_msg_iters->pdata[i]; - seek_beg_status = bt_self_component_port_input_message_iterator_seek_beginning( + seek_beg_status = bt_message_iterator_seek_beginning( upstream_msg_iter->msg_iter); if (seek_beg_status != BT_MESSAGE_ITERATOR_SEEK_BEGINNING_STATUS_OK) { status = (int) seek_beg_status;