X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fplugins%2Futils%2Fmuxer%2Fmuxer.c;h=f4af51c8df083765e2242c1f80d39810fb355733;hb=d9120ccba72cfe3173cf5987871e9667d4c4c547;hp=a2db3cb9ee398dd3e523a9d4d6dbc91cd042e5ab;hpb=498e7994d60bd0e9f63c3d5c0fd00eec77ba7c34;p=babeltrace.git diff --git a/src/plugins/utils/muxer/muxer.c b/src/plugins/utils/muxer/muxer.c index a2db3cb9..f4af51c8 100644 --- a/src/plugins/utils/muxer/muxer.c +++ b/src/plugins/utils/muxer/muxer.c @@ -107,6 +107,14 @@ struct muxer_msg_iter { * MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID. */ bt_uuid_t expected_clock_class_uuid; + + /* + * Saved error. If we hit an error in the _next method, but have some + * messages ready to return, we save the error here and return it on + * the next _next call. + */ + bt_component_class_message_iterator_next_method_status next_saved_status; + const struct bt_error *next_saved_error; }; static @@ -247,6 +255,7 @@ void destroy_muxer_comp(struct muxer_comp *muxer_comp) g_free(muxer_comp); } +static struct bt_param_validation_map_value_entry_descr muxer_params[] = { BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END }; @@ -382,8 +391,7 @@ bt_component_class_message_iterator_next_method_status muxer_upstream_msg_iter_n struct muxer_upstream_msg_iter *muxer_upstream_msg_iter, bool *is_ended) { - struct muxer_comp *muxer_comp = - muxer_upstream_msg_iter->muxer_comp; + struct muxer_comp *muxer_comp = muxer_upstream_msg_iter->muxer_comp; bt_component_class_message_iterator_next_method_status status; bt_message_iterator_next_status input_port_iter_status; bt_message_array_const msgs; @@ -438,10 +446,19 @@ bt_component_class_message_iterator_next_method_status muxer_upstream_msg_iter_n *is_ended = true; status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; break; + case BT_MESSAGE_ITERATOR_NEXT_STATUS_ERROR: + case BT_MESSAGE_ITERATOR_NEXT_STATUS_MEMORY_ERROR: + /* Error status code */ + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Upstream iterator's next method returned an error: status=%s", + bt_common_func_status_string(input_port_iter_status)); + status = (int) input_port_iter_status; + break; default: - /* Error or unsupported status code */ - BT_COMP_LOGE("Error or unsupported status code: " - "status-code=%d", input_port_iter_status); + /* Unsupported status code */ + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Unsupported status code: status=%s", + bt_common_func_status_string(input_port_iter_status)); status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; break; } @@ -917,10 +934,8 @@ validate_muxer_upstream_msg_iter( struct muxer_upstream_msg_iter *muxer_upstream_msg_iter, bool *is_ended) { - struct muxer_comp *muxer_comp = - muxer_upstream_msg_iter->muxer_comp; - bt_component_class_message_iterator_next_method_status status = - BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + struct muxer_comp *muxer_comp = muxer_upstream_msg_iter->muxer_comp; + bt_component_class_message_iterator_next_method_status status; BT_COMP_LOGD("Validating muxer's upstream message iterator wrapper: " "muxer-upstream-msg-iter-wrap-addr=%p", @@ -932,6 +947,7 @@ validate_muxer_upstream_msg_iter( "queue-len=%u, upstream-msg-iter-addr=%p", muxer_upstream_msg_iter->msgs->length, muxer_upstream_msg_iter->msg_iter); + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; goto end; } @@ -949,8 +965,7 @@ validate_muxer_upstream_msg_iters( struct muxer_msg_iter *muxer_msg_iter) { struct muxer_comp *muxer_comp = muxer_msg_iter->muxer_comp; - bt_component_class_message_iterator_next_method_status status = - BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + bt_component_class_message_iterator_next_method_status status; size_t i; BT_COMP_LOGD("Validating muxer's upstream message iterator wrappers: " @@ -968,7 +983,8 @@ validate_muxer_upstream_msg_iters( muxer_upstream_msg_iter, &is_ended); if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { if (status < 0) { - BT_COMP_LOGE("Cannot validate muxer's upstream message iterator wrapper: " + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Cannot validate muxer's upstream message iterator wrapper: " "muxer-msg-iter-addr=%p, " "muxer-upstream-msg-iter-wrap-addr=%p", muxer_msg_iter, @@ -1009,6 +1025,8 @@ validate_muxer_upstream_msg_iters( } } + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + end: return status; } @@ -1040,7 +1058,8 @@ bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next_on &next_return_ts); if (status < 0 || status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END) { if (status < 0) { - BT_COMP_LOGE("Cannot find the youngest upstream message iterator wrapper: " + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Cannot find the youngest upstream message iterator wrapper: " "status=%s", bt_common_func_status_string(status)); } else { @@ -1053,7 +1072,8 @@ bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next_on } if (next_return_ts < muxer_msg_iter->last_returned_ts_ns) { - BT_COMP_LOGE("Youngest upstream message iterator wrapper's timestamp is less than muxer's message iterator's last returned timestamp: " + BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, + "Youngest upstream message iterator wrapper's timestamp is less than muxer's message iterator's last returned timestamp: " "muxer-msg-iter-addr=%p, ts=%" PRId64 ", " "last-returned-ts=%" PRId64, muxer_msg_iter, next_return_ts, @@ -1090,17 +1110,27 @@ bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next( bt_message_array_const msgs, uint64_t capacity, uint64_t *count) { - bt_component_class_message_iterator_next_method_status status = - BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + bt_component_class_message_iterator_next_method_status status; uint64_t i = 0; - while (i < capacity && status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { + if (G_UNLIKELY(muxer_msg_iter->next_saved_error)) { + /* + * Last time we were called, we hit an error but had some + * messages to deliver, so we stashed the error here. Return + * it now. + */ + BT_CURRENT_THREAD_MOVE_ERROR_AND_RESET(muxer_msg_iter->next_saved_error); + status = muxer_msg_iter->next_saved_status; + goto end; + } + + do { status = muxer_msg_iter_do_next_one(muxer_comp, muxer_msg_iter, &msgs[i]); if (status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { i++; } - } + } while (i < capacity && status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK); if (i > 0) { /* @@ -1115,10 +1145,23 @@ bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next( * called, possibly without any accumulated * message, in which case we'll return it. */ + if (status < 0) { + /* + * Save this error for the next _next call. Assume that + * this component always appends error causes when + * returning an error status code, which will cause the + * current thread error to be non-NULL. + */ + muxer_msg_iter->next_saved_error = bt_current_thread_take_error(); + BT_ASSERT(muxer_msg_iter->next_saved_error); + muxer_msg_iter->next_saved_status = status; + } + *count = i; status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; } +end: return status; }