X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fplugins%2Futils%2Fmuxer%2Fmuxer.c;h=f4af51c8df083765e2242c1f80d39810fb355733;hb=d9120ccba72cfe3173cf5987871e9667d4c4c547;hp=8571b2756944decadf81c0ef52eb819cb9ac8fb0;hpb=8d8b141db4c46135a35be19e4a1c192f6a36d67b;p=babeltrace.git diff --git a/src/plugins/utils/muxer/muxer.c b/src/plugins/utils/muxer/muxer.c index 8571b275..f4af51c8 100644 --- a/src/plugins/utils/muxer/muxer.c +++ b/src/plugins/utils/muxer/muxer.c @@ -37,6 +37,7 @@ #include #include "plugins/common/muxing/muxing.h" +#include "plugins/common/param-validation/param-validation.h" #include "muxer.h" @@ -106,6 +107,14 @@ struct muxer_msg_iter { * MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID. */ bt_uuid_t expected_clock_class_uuid; + + /* + * Saved error. If we hit an error in the _next method, but have some + * messages ready to return, we save the error here and return it on + * the next _next call. + */ + bt_component_class_message_iterator_next_method_status next_saved_status; + const struct bt_error *next_saved_error; }; static @@ -246,20 +255,26 @@ void destroy_muxer_comp(struct muxer_comp *muxer_comp) g_free(muxer_comp); } +static +struct bt_param_validation_map_value_entry_descr muxer_params[] = { + BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END +}; + BT_HIDDEN -bt_component_class_init_method_status muxer_init( +bt_component_class_initialize_method_status muxer_init( bt_self_component_filter *self_comp_flt, bt_self_component_filter_configuration *config, const bt_value *params, void *init_data) { - bt_component_class_init_method_status status = - BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK; + bt_component_class_initialize_method_status status; bt_self_component_add_port_status add_port_status; bt_self_component *self_comp = bt_self_component_filter_as_self_component(self_comp_flt); struct muxer_comp *muxer_comp = g_new0(struct muxer_comp, 1); bt_logging_level log_level = bt_component_get_logging_level( bt_self_component_as_component(self_comp)); + enum bt_param_validation_status validation_status; + gchar *validate_error = NULL; BT_COMP_LOG_CUR_LVL(BT_LOG_INFO, log_level, self_comp, "Initializing muxer component: " @@ -268,6 +283,7 @@ bt_component_class_init_method_status muxer_init( if (!muxer_comp) { BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR, log_level, self_comp, "Failed to allocate one muxer component."); + status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; goto error; } @@ -275,6 +291,17 @@ bt_component_class_init_method_status muxer_init( muxer_comp->self_comp = self_comp; muxer_comp->self_comp_flt = self_comp_flt; + validation_status = bt_param_validation_validate(params, + muxer_params, &validate_error); + if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) { + status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; + goto error; + } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) { + status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, "%s", validate_error); + goto error; + } + bt_self_component_set_data(self_comp, muxer_comp); add_port_status = add_available_input_port(self_comp_flt); if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) { @@ -282,13 +309,7 @@ bt_component_class_init_method_status muxer_init( "muxer-comp-addr=%p, status=%s", muxer_comp, bt_common_func_status_string(add_port_status)); - if (add_port_status == - BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR) { - status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR; - } else { - status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR; - } - + status = (int) add_port_status; goto error; } @@ -298,13 +319,7 @@ bt_component_class_init_method_status muxer_init( "muxer-comp-addr=%p, status=%s", muxer_comp, bt_common_func_status_string(add_port_status)); - if (add_port_status == - BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR) { - status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR; - } else { - status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR; - } - + status = (int) add_port_status; goto error; } @@ -312,17 +327,15 @@ bt_component_class_init_method_status muxer_init( "comp-addr=%p, params-addr=%p, muxer-comp-addr=%p", self_comp, params, muxer_comp); + status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK; goto end; error: destroy_muxer_comp(muxer_comp); bt_self_component_set_data(self_comp, NULL); - if (status == BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK) { - status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR; - } - end: + g_free(validate_error); return status; } @@ -378,8 +391,7 @@ bt_component_class_message_iterator_next_method_status muxer_upstream_msg_iter_n struct muxer_upstream_msg_iter *muxer_upstream_msg_iter, bool *is_ended) { - struct muxer_comp *muxer_comp = - muxer_upstream_msg_iter->muxer_comp; + struct muxer_comp *muxer_comp = muxer_upstream_msg_iter->muxer_comp; bt_component_class_message_iterator_next_method_status status; bt_message_iterator_next_status input_port_iter_status; bt_message_array_const msgs; @@ -403,7 +415,7 @@ bt_component_class_message_iterator_next_method_status muxer_upstream_msg_iter_n * valid: it must be considered for muxing operations. */ BT_COMP_LOGD_STR("Validated upstream message iterator wrapper."); - BT_ASSERT(count > 0); + BT_ASSERT_DBG(count > 0); /* Move messages to our queue */ for (i = 0; i < count; i++) { @@ -434,10 +446,19 @@ bt_component_class_message_iterator_next_method_status muxer_upstream_msg_iter_n *is_ended = true; status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; break; + case BT_MESSAGE_ITERATOR_NEXT_STATUS_ERROR: + case BT_MESSAGE_ITERATOR_NEXT_STATUS_MEMORY_ERROR: + /* Error status code */ + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Upstream iterator's next method returned an error: status=%s", + bt_common_func_status_string(input_port_iter_status)); + status = (int) input_port_iter_status; + break; default: - /* Error or unsupported status code */ - BT_COMP_LOGE("Error or unsupported status code: " - "status-code=%d", input_port_iter_status); + /* Unsupported status code */ + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Unsupported status code: status=%s", + bt_common_func_status_string(input_port_iter_status)); status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; break; } @@ -456,8 +477,8 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, const bt_stream_class *stream_class = NULL; bt_message_type msg_type; - BT_ASSERT(msg); - BT_ASSERT(ts_ns); + BT_ASSERT_DBG(msg); + BT_ASSERT_DBG(ts_ns); BT_COMP_LOGD("Getting message's timestamp: " "muxer-msg-iter-addr=%p, msg-addr=%p, " "last-returned-ts=%" PRId64, @@ -491,7 +512,7 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, switch (msg_type) { case BT_MESSAGE_TYPE_EVENT: - BT_ASSERT(bt_message_event_borrow_stream_class_default_clock_class_const( + BT_ASSERT_DBG(bt_message_event_borrow_stream_class_default_clock_class_const( msg)); clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const( msg); @@ -608,7 +629,7 @@ int validate_clock_class(struct muxer_msg_iter *muxer_msg_iter, const uint8_t *cc_uuid; const char *cc_name; - BT_ASSERT(clock_class); + BT_ASSERT_DBG(clock_class); cc_uuid = bt_clock_class_get_uuid(clock_class); cc_name = bt_clock_class_get_name(clock_class); @@ -710,7 +731,7 @@ int validate_clock_class(struct muxer_msg_iter *muxer_msg_iter, BT_COMP_LOGF("Unexpected clock class expectation: " "expectation-code=%d", muxer_msg_iter->clock_class_expectation); - abort(); + bt_common_abort(); } goto end; @@ -787,9 +808,9 @@ muxer_msg_iter_youngest_upstream_msg_iter( bt_component_class_message_iterator_next_method_status status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; - BT_ASSERT(muxer_comp); - BT_ASSERT(muxer_msg_iter); - BT_ASSERT(muxer_upstream_msg_iter); + BT_ASSERT_DBG(muxer_comp); + BT_ASSERT_DBG(muxer_msg_iter); + BT_ASSERT_DBG(muxer_upstream_msg_iter); *muxer_upstream_msg_iter = NULL; for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len; @@ -809,9 +830,9 @@ muxer_msg_iter_youngest_upstream_msg_iter( continue; } - BT_ASSERT(cur_muxer_upstream_msg_iter->msgs->length > 0); + BT_ASSERT_DBG(cur_muxer_upstream_msg_iter->msgs->length > 0); msg = g_queue_peek_head(cur_muxer_upstream_msg_iter->msgs); - BT_ASSERT(msg); + BT_ASSERT_DBG(msg); if (G_UNLIKELY(bt_message_get_type(msg) == BT_MESSAGE_TYPE_STREAM_BEGINNING)) { @@ -913,10 +934,8 @@ validate_muxer_upstream_msg_iter( struct muxer_upstream_msg_iter *muxer_upstream_msg_iter, bool *is_ended) { - struct muxer_comp *muxer_comp = - muxer_upstream_msg_iter->muxer_comp; - bt_component_class_message_iterator_next_method_status status = - BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + struct muxer_comp *muxer_comp = muxer_upstream_msg_iter->muxer_comp; + bt_component_class_message_iterator_next_method_status status; BT_COMP_LOGD("Validating muxer's upstream message iterator wrapper: " "muxer-upstream-msg-iter-wrap-addr=%p", @@ -928,6 +947,7 @@ validate_muxer_upstream_msg_iter( "queue-len=%u, upstream-msg-iter-addr=%p", muxer_upstream_msg_iter->msgs->length, muxer_upstream_msg_iter->msg_iter); + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; goto end; } @@ -945,8 +965,7 @@ validate_muxer_upstream_msg_iters( struct muxer_msg_iter *muxer_msg_iter) { struct muxer_comp *muxer_comp = muxer_msg_iter->muxer_comp; - bt_component_class_message_iterator_next_method_status status = - BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + bt_component_class_message_iterator_next_method_status status; size_t i; BT_COMP_LOGD("Validating muxer's upstream message iterator wrappers: " @@ -964,7 +983,8 @@ validate_muxer_upstream_msg_iters( muxer_upstream_msg_iter, &is_ended); if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { if (status < 0) { - BT_COMP_LOGE("Cannot validate muxer's upstream message iterator wrapper: " + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Cannot validate muxer's upstream message iterator wrapper: " "muxer-msg-iter-addr=%p, " "muxer-upstream-msg-iter-wrap-addr=%p", muxer_msg_iter, @@ -1005,6 +1025,8 @@ validate_muxer_upstream_msg_iters( } } + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + end: return status; } @@ -1036,7 +1058,8 @@ bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next_on &next_return_ts); if (status < 0 || status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END) { if (status < 0) { - BT_COMP_LOGE("Cannot find the youngest upstream message iterator wrapper: " + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Cannot find the youngest upstream message iterator wrapper: " "status=%s", bt_common_func_status_string(status)); } else { @@ -1049,7 +1072,8 @@ bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next_on } if (next_return_ts < muxer_msg_iter->last_returned_ts_ns) { - BT_COMP_LOGE("Youngest upstream message iterator wrapper's timestamp is less than muxer's message iterator's last returned timestamp: " + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Youngest upstream message iterator wrapper's timestamp is less than muxer's message iterator's last returned timestamp: " "muxer-msg-iter-addr=%p, ts=%" PRId64 ", " "last-returned-ts=%" PRId64, muxer_msg_iter, next_return_ts, @@ -1063,15 +1087,16 @@ bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next_on "muxer-upstream-msg-iter-wrap-addr=%p, " "ts=%" PRId64, muxer_msg_iter, muxer_upstream_msg_iter, next_return_ts); - BT_ASSERT(status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK); - BT_ASSERT(muxer_upstream_msg_iter); + BT_ASSERT_DBG(status == + BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK); + BT_ASSERT_DBG(muxer_upstream_msg_iter); /* * Consume from the queue's head: other side * (muxer_upstream_msg_iter_next()) writes to the tail. */ *msg = g_queue_pop_head(muxer_upstream_msg_iter->msgs); - BT_ASSERT(*msg); + BT_ASSERT_DBG(*msg); muxer_msg_iter->last_returned_ts_ns = next_return_ts; end: @@ -1085,17 +1110,27 @@ bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next( bt_message_array_const msgs, uint64_t capacity, uint64_t *count) { - bt_component_class_message_iterator_next_method_status status = - BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + bt_component_class_message_iterator_next_method_status status; uint64_t i = 0; - while (i < capacity && status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { + if (G_UNLIKELY(muxer_msg_iter->next_saved_error)) { + /* + * Last time we were called, we hit an error but had some + * messages to deliver, so we stashed the error here. Return + * it now. + */ + BT_CURRENT_THREAD_MOVE_ERROR_AND_RESET(muxer_msg_iter->next_saved_error); + status = muxer_msg_iter->next_saved_status; + goto end; + } + + do { status = muxer_msg_iter_do_next_one(muxer_comp, muxer_msg_iter, &msgs[i]); if (status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { i++; } - } + } while (i < capacity && status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK); if (i > 0) { /* @@ -1110,10 +1145,23 @@ bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next( * called, possibly without any accumulated * message, in which case we'll return it. */ + if (status < 0) { + /* + * Save this error for the next _next call. Assume that + * this component always appends error causes when + * returning an error status code, which will cause the + * current thread error to be non-NULL. + */ + muxer_msg_iter->next_saved_error = bt_current_thread_take_error(); + BT_ASSERT(muxer_msg_iter->next_saved_error); + muxer_msg_iter->next_saved_status = status; + } + *count = i; status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; } +end: return status; } @@ -1146,13 +1194,15 @@ void destroy_muxer_msg_iter(struct muxer_msg_iter *muxer_msg_iter) } static -bt_component_class_message_iterator_init_method_status +bt_component_class_message_iterator_initialize_method_status muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, - struct muxer_msg_iter *muxer_msg_iter) + struct muxer_msg_iter *muxer_msg_iter, + struct bt_self_message_iterator_configuration *config) { int64_t count; int64_t i; - bt_component_class_message_iterator_init_method_status status; + bt_component_class_message_iterator_initialize_method_status status; + bool can_seek_forward = true; count = bt_component_filter_get_input_port_count( bt_self_component_filter_as_component_filter( @@ -1161,7 +1211,7 @@ muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, BT_COMP_LOGD("No input port to initialize for muxer component's message iterator: " "muxer-comp-addr=%p, muxer-msg-iter-addr=%p", muxer_comp, muxer_msg_iter); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_OK; + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_OK; goto end; } @@ -1199,20 +1249,31 @@ muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, bt_self_component_port_input_message_iterator_put_ref( upstream_msg_iter); if (int_status) { - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_ERROR; + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR; /* muxer_msg_iter_add_upstream_msg_iter() logs errors */ goto end; } + + can_seek_forward = can_seek_forward && + bt_self_component_port_input_message_iterator_can_seek_forward( + upstream_msg_iter); } - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_OK; + /* + * This iterator can seek forward if all of its iterators can seek + * forward. + */ + bt_self_message_iterator_configuration_set_can_seek_forward( + config, can_seek_forward); + + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_OK; end: return status; } BT_HIDDEN -bt_component_class_message_iterator_init_method_status muxer_msg_iter_init( +bt_component_class_message_iterator_initialize_method_status muxer_msg_iter_init( bt_self_message_iterator *self_msg_iter, bt_self_message_iterator_configuration *config, bt_self_component_filter *self_comp, @@ -1220,7 +1281,7 @@ bt_component_class_message_iterator_init_method_status muxer_msg_iter_init( { struct muxer_comp *muxer_comp = NULL; struct muxer_msg_iter *muxer_msg_iter = NULL; - bt_component_class_message_iterator_init_method_status status; + bt_component_class_message_iterator_initialize_method_status status; muxer_comp = bt_self_component_get_data( bt_self_component_filter_as_self_component(self_comp)); @@ -1238,7 +1299,7 @@ bt_component_class_message_iterator_init_method_status muxer_msg_iter_init( BT_COMP_LOGE("Recursive initialization of muxer component's message iterator: " "comp-addr=%p, muxer-comp-addr=%p, msg-iter-addr=%p", self_comp, muxer_comp, self_msg_iter); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_ERROR; + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR; goto error; } @@ -1246,7 +1307,7 @@ bt_component_class_message_iterator_init_method_status muxer_msg_iter_init( muxer_msg_iter = g_new0(struct muxer_msg_iter, 1); if (!muxer_msg_iter) { BT_COMP_LOGE_STR("Failed to allocate one muxer component's message iterator."); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_MEMORY_ERROR; + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; goto error; } @@ -1258,7 +1319,7 @@ bt_component_class_message_iterator_init_method_status muxer_msg_iter_init( (GDestroyNotify) destroy_muxer_upstream_msg_iter); if (!muxer_msg_iter->active_muxer_upstream_msg_iters) { BT_COMP_LOGE_STR("Failed to allocate a GPtrArray."); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_MEMORY_ERROR; + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; goto error; } @@ -1267,12 +1328,12 @@ bt_component_class_message_iterator_init_method_status muxer_msg_iter_init( (GDestroyNotify) destroy_muxer_upstream_msg_iter); if (!muxer_msg_iter->ended_muxer_upstream_msg_iters) { BT_COMP_LOGE_STR("Failed to allocate a GPtrArray."); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_MEMORY_ERROR; + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; goto error; } status = muxer_msg_iter_init_upstream_iterators(muxer_comp, - muxer_msg_iter); + muxer_msg_iter, config); if (status) { BT_COMP_LOGE("Cannot initialize connected input ports for muxer component's message iterator: " "comp-addr=%p, muxer-comp-addr=%p, " @@ -1332,12 +1393,12 @@ bt_component_class_message_iterator_next_method_status muxer_msg_iter_next( bt_self_component *self_comp = NULL; struct muxer_comp *muxer_comp = NULL; - BT_ASSERT(muxer_msg_iter); + BT_ASSERT_DBG(muxer_msg_iter); self_comp = bt_self_message_iterator_borrow_component( self_msg_iter); - BT_ASSERT(self_comp); + BT_ASSERT_DBG(self_comp); muxer_comp = bt_self_component_get_data(self_comp); - BT_ASSERT(muxer_comp); + BT_ASSERT_DBG(muxer_comp); BT_COMP_LOGT("Muxer component's message iterator's \"next\" method called: " "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, " "msg-iter-addr=%p",