X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fplugins%2Futils%2Fmuxer%2Fmuxer.c;h=d6ca31bb63b435383c45c1ee55d5505425b44472;hb=f2fb1b3297ca0bc13b53189a063b63944be7fae9;hp=b575f967d0e994a43c71a569468f8dba09a34059;hpb=578e048b5debf169e286e5b5cc747b5d6c16886d;p=babeltrace.git diff --git a/src/plugins/utils/muxer/muxer.c b/src/plugins/utils/muxer/muxer.c index b575f967..d6ca31bb 100644 --- a/src/plugins/utils/muxer/muxer.c +++ b/src/plugins/utils/muxer/muxer.c @@ -20,17 +20,14 @@ * SOFTWARE. */ -#define BT_LOG_TAG "PLUGIN-UTILS-MUXER-FLT" -#include "logging.h" +#define BT_COMP_LOG_SELF_COMP (muxer_comp->self_comp) +#define BT_LOG_OUTPUT_LEVEL (muxer_comp->log_level) +#define BT_LOG_TAG "PLUGIN/FLT.UTILS.MUXER" +#include "logging/comp-logging.h" -#include "common/babeltrace.h" -#include "compat/uuid.h" +#include "common/macros.h" +#include "common/uuid.h" #include -#include "lib/value.h" -#include "lib/graph/component.h" -#include "lib/graph/message/iterator.h" -#include "lib/graph/connection.h" -#include "plugins/plugins-common.h" #include #include #include @@ -39,21 +36,27 @@ #include #include +#include "plugins/common/muxing/muxing.h" + #include "muxer.h" #define ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME "assume-absolute-clock-classes" struct muxer_comp { - /* Weak ref */ - bt_self_component_filter *self_comp; + /* Weak refs */ + bt_self_component_filter *self_comp_flt; + bt_self_component *self_comp; unsigned int next_port_num; size_t available_input_ports; bool initializing_muxer_msg_iter; bool assume_absolute_clock_classes; + bt_logging_level log_level; }; struct muxer_upstream_msg_iter { + struct muxer_comp *muxer_comp; + /* Owned by this, NULL if ended */ bt_self_component_port_input_message_iterator *msg_iter; @@ -70,6 +73,11 @@ enum muxer_msg_iter_clock_class_expectation { }; struct muxer_msg_iter { + struct muxer_comp *muxer_comp; + + /* Weak */ + bt_self_message_iterator *self_msg_iter; + /* * Array of struct muxer_upstream_msg_iter * (owned by this). * @@ -100,7 +108,7 @@ struct muxer_msg_iter { * clock_class_expectation is * MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID. */ - unsigned char expected_clock_class_uuid[BABELTRACE_UUID_LEN]; + bt_uuid_t expected_clock_class_uuid; }; static @@ -117,11 +125,14 @@ static void destroy_muxer_upstream_msg_iter( struct muxer_upstream_msg_iter *muxer_upstream_msg_iter) { + struct muxer_comp *muxer_comp; + if (!muxer_upstream_msg_iter) { return; } - BT_LOGD("Destroying muxer's upstream message iterator wrapper: " + muxer_comp = muxer_upstream_msg_iter->muxer_comp; + BT_COMP_LOGD("Destroying muxer's upstream message iterator wrapper: " "addr=%p, msg-iter-addr=%p, queue-len=%u", muxer_upstream_msg_iter, muxer_upstream_msg_iter->msg_iter, @@ -144,23 +155,25 @@ int muxer_msg_iter_add_upstream_msg_iter(struct muxer_msg_iter *muxer_msg_iter, int ret = 0; struct muxer_upstream_msg_iter *muxer_upstream_msg_iter = g_new0(struct muxer_upstream_msg_iter, 1); + struct muxer_comp *muxer_comp = muxer_msg_iter->muxer_comp; if (!muxer_upstream_msg_iter) { - BT_LOGE_STR("Failed to allocate one muxer's upstream message iterator wrapper."); + BT_COMP_LOGE_STR("Failed to allocate one muxer's upstream message iterator wrapper."); goto error; } + muxer_upstream_msg_iter->muxer_comp = muxer_comp; muxer_upstream_msg_iter->msg_iter = self_msg_iter; bt_self_component_port_input_message_iterator_get_ref(muxer_upstream_msg_iter->msg_iter); muxer_upstream_msg_iter->msgs = g_queue_new(); if (!muxer_upstream_msg_iter->msgs) { - BT_LOGE_STR("Failed to allocate a GQueue."); + BT_COMP_LOGE_STR("Failed to allocate a GQueue."); goto error; } g_ptr_array_add(muxer_msg_iter->active_muxer_upstream_msg_iters, muxer_upstream_msg_iter); - BT_LOGD("Added muxer's upstream message iterator wrapper: " + BT_COMP_LOGD("Added muxer's upstream message iterator wrapper: " "addr=%p, muxer-msg-iter-addr=%p, msg-iter-addr=%p", muxer_upstream_msg_iter, muxer_msg_iter, self_msg_iter); @@ -176,36 +189,37 @@ end: } static -bt_self_component_status add_available_input_port( +bt_self_component_add_port_status add_available_input_port( bt_self_component_filter *self_comp) { struct muxer_comp *muxer_comp = bt_self_component_get_data( bt_self_component_filter_as_self_component(self_comp)); - bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK; + bt_self_component_add_port_status status = + BT_SELF_COMPONENT_ADD_PORT_STATUS_OK; GString *port_name = NULL; BT_ASSERT(muxer_comp); port_name = g_string_new("in"); if (!port_name) { - BT_LOGE_STR("Failed to allocate a GString."); - status = BT_SELF_COMPONENT_STATUS_NOMEM; + BT_COMP_LOGE_STR("Failed to allocate a GString."); + status = BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR; goto end; } g_string_append_printf(port_name, "%u", muxer_comp->next_port_num); status = bt_self_component_filter_add_input_port( self_comp, port_name->str, NULL, NULL); - if (status != BT_SELF_COMPONENT_STATUS_OK) { - BT_LOGE("Cannot add input port to muxer component: " + if (status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) { + BT_COMP_LOGE("Cannot add input port to muxer component: " "port-name=\"%s\", comp-addr=%p, status=%s", port_name->str, self_comp, - bt_self_component_status_string(status)); + bt_common_func_status_string(status)); goto end; } muxer_comp->available_input_ports++; muxer_comp->next_port_num++; - BT_LOGD("Added one input port to muxer component: " + BT_COMP_LOGI("Added one input port to muxer component: " "port-name=\"%s\", comp-addr=%p", port_name->str, self_comp); @@ -218,7 +232,7 @@ end: } static -bt_self_component_status create_output_port( +bt_self_component_add_port_status create_output_port( bt_self_component_filter *self_comp) { return bt_self_component_filter_add_output_port( @@ -236,21 +250,21 @@ void destroy_muxer_comp(struct muxer_comp *muxer_comp) } static -bt_value *get_default_params(void) +bt_value *get_default_params(struct muxer_comp *muxer_comp) { bt_value *params; int ret; params = bt_value_map_create(); if (!params) { - BT_LOGE_STR("Cannot create a map value object."); + BT_COMP_LOGE_STR("Cannot create a map value object."); goto error; } ret = bt_value_map_insert_bool_entry(params, ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME, false); if (ret) { - BT_LOGE_STR("Cannot add boolean value to map value object."); + BT_COMP_LOGE_STR("Cannot add boolean value to map value object."); goto error; } @@ -273,16 +287,16 @@ int configure_muxer_comp(struct muxer_comp *muxer_comp, int ret = 0; bt_bool bool_val; - default_params = get_default_params(); + default_params = get_default_params(muxer_comp); if (!default_params) { - BT_LOGE("Cannot get default parameters: " + BT_COMP_LOGE("Cannot get default parameters: " "muxer-comp-addr=%p", muxer_comp); goto error; } ret = bt_value_map_extend(default_params, params, &real_params); if (ret) { - BT_LOGE("Cannot extend default parameters map value: " + BT_COMP_LOGE("Cannot extend default parameters map value: " "muxer-comp-addr=%p, def-params-addr=%p, " "params-addr=%p", muxer_comp, default_params, params); @@ -293,7 +307,7 @@ int configure_muxer_comp(struct muxer_comp *muxer_comp, ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME); if (assume_absolute_clock_classes && !bt_value_is_bool(assume_absolute_clock_classes)) { - BT_LOGE("Expecting a boolean value for the `%s` parameter: " + BT_COMP_LOGE("Expecting a boolean value for the `%s` parameter: " "muxer-comp-addr=%p, value-type=%s", ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME, muxer_comp, bt_common_value_type_string( @@ -303,7 +317,7 @@ int configure_muxer_comp(struct muxer_comp *muxer_comp, bool_val = bt_value_bool_get(assume_absolute_clock_classes); muxer_comp->assume_absolute_clock_classes = (bool) bool_val; - BT_LOGD("Configured muxer component: muxer-comp-addr=%p, " + BT_COMP_LOGI("Configured muxer component: muxer-comp-addr=%p, " "assume-absolute-clock-classes=%d", muxer_comp, muxer_comp->assume_absolute_clock_classes); goto end; @@ -318,53 +332,75 @@ end: } BT_HIDDEN -bt_self_component_status muxer_init( - bt_self_component_filter *self_comp, +bt_component_class_init_method_status muxer_init( + bt_self_component_filter *self_comp_flt, const bt_value *params, void *init_data) { int ret; - bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK; + bt_component_class_init_method_status status = + BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK; + bt_self_component_add_port_status add_port_status; + bt_self_component *self_comp = + bt_self_component_filter_as_self_component(self_comp_flt); struct muxer_comp *muxer_comp = g_new0(struct muxer_comp, 1); + bt_logging_level log_level = bt_component_get_logging_level( + bt_self_component_as_component(self_comp)); - BT_LOGD("Initializing muxer component: " + BT_COMP_LOG_CUR_LVL(BT_LOG_INFO, log_level, self_comp, + "Initializing muxer component: " "comp-addr=%p, params-addr=%p", self_comp, params); if (!muxer_comp) { - BT_LOGE_STR("Failed to allocate one muxer component."); + BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR, log_level, self_comp, + "Failed to allocate one muxer component."); goto error; } + muxer_comp->log_level = log_level; + muxer_comp->self_comp = self_comp; + muxer_comp->self_comp_flt = self_comp_flt; ret = configure_muxer_comp(muxer_comp, params); if (ret) { - BT_LOGE("Cannot configure muxer component: " + BT_COMP_LOGE("Cannot configure muxer component: " "muxer-comp-addr=%p, params-addr=%p", muxer_comp, params); goto error; } - muxer_comp->self_comp = self_comp; - bt_self_component_set_data( - bt_self_component_filter_as_self_component(self_comp), - muxer_comp); - status = add_available_input_port(self_comp); - if (status != BT_SELF_COMPONENT_STATUS_OK) { - BT_LOGE("Cannot ensure that at least one muxer component's input port is available: " + bt_self_component_set_data(self_comp, muxer_comp); + add_port_status = add_available_input_port(self_comp_flt); + if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) { + BT_COMP_LOGE("Cannot ensure that at least one muxer component's input port is available: " "muxer-comp-addr=%p, status=%s", muxer_comp, - bt_self_component_status_string(status)); + bt_common_func_status_string(add_port_status)); + if (add_port_status == + BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR) { + status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR; + } else { + status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR; + } + goto error; } - status = create_output_port(self_comp); - if (status) { - BT_LOGE("Cannot create muxer component's output port: " + add_port_status = create_output_port(self_comp_flt); + if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) { + BT_COMP_LOGE("Cannot create muxer component's output port: " "muxer-comp-addr=%p, status=%s", muxer_comp, - bt_self_component_status_string(status)); + bt_common_func_status_string(add_port_status)); + if (add_port_status == + BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR) { + status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR; + } else { + status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR; + } + goto error; } - BT_LOGD("Initialized muxer component: " + BT_COMP_LOGI("Initialized muxer component: " "comp-addr=%p, params-addr=%p, muxer-comp-addr=%p", self_comp, params, muxer_comp); @@ -372,12 +408,10 @@ bt_self_component_status muxer_init( error: destroy_muxer_comp(muxer_comp); - bt_self_component_set_data( - bt_self_component_filter_as_self_component(self_comp), - NULL); + bt_self_component_set_data(self_comp, NULL); - if (status == BT_SELF_COMPONENT_STATUS_OK) { - status = BT_SELF_COMPONENT_STATUS_ERROR; + if (status == BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK) { + status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR; } end: @@ -390,20 +424,23 @@ void muxer_finalize(bt_self_component_filter *self_comp) struct muxer_comp *muxer_comp = bt_self_component_get_data( bt_self_component_filter_as_self_component(self_comp)); - BT_LOGD("Finalizing muxer component: comp-addr=%p", + BT_COMP_LOGI("Finalizing muxer component: comp-addr=%p", self_comp); destroy_muxer_comp(muxer_comp); } static -bt_self_component_port_input_message_iterator * -create_msg_iter_on_input_port(bt_self_component_port_input *self_port) +bt_self_component_port_input_message_iterator_create_from_message_iterator_status +create_msg_iter_on_input_port(struct muxer_comp *muxer_comp, + struct muxer_msg_iter *muxer_msg_iter, + bt_self_component_port_input *self_port, + bt_self_component_port_input_message_iterator **msg_iter) { const bt_port *port = bt_self_component_port_as_port( bt_self_component_port_input_as_self_component_port( self_port)); - bt_self_component_port_input_message_iterator *msg_iter = - NULL; + bt_self_component_port_input_message_iterator_create_from_message_iterator_status + status; BT_ASSERT(port); BT_ASSERT(bt_port_is_connected(port)); @@ -411,50 +448,53 @@ create_msg_iter_on_input_port(bt_self_component_port_input *self_port) // TODO: Advance the iterator to >= the time of the latest // returned message by the muxer message // iterator which creates it. - msg_iter = bt_self_component_port_input_message_iterator_create( - self_port); - if (!msg_iter) { - BT_LOGE("Cannot create upstream message iterator on input port: " + status = bt_self_component_port_input_message_iterator_create_from_message_iterator( + muxer_msg_iter->self_msg_iter, self_port, msg_iter); + if (status != BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_CREATE_FROM_MESSAGE_ITERATOR_STATUS_OK) { + BT_COMP_LOGE("Cannot create upstream message iterator on input port: " "port-addr=%p, port-name=\"%s\"", port, bt_port_get_name(port)); goto end; } - BT_LOGD("Created upstream message iterator on input port: " + BT_COMP_LOGI("Created upstream message iterator on input port: " "port-addr=%p, port-name=\"%s\", msg-iter-addr=%p", port, bt_port_get_name(port), msg_iter); end: - return msg_iter; + return status; } static -bt_self_message_iterator_status muxer_upstream_msg_iter_next( +bt_component_class_message_iterator_next_method_status muxer_upstream_msg_iter_next( struct muxer_upstream_msg_iter *muxer_upstream_msg_iter, bool *is_ended) { - bt_self_message_iterator_status status; - bt_message_iterator_status input_port_iter_status; + struct muxer_comp *muxer_comp = + muxer_upstream_msg_iter->muxer_comp; + bt_component_class_message_iterator_next_method_status status; + bt_message_iterator_next_status input_port_iter_status; bt_message_array_const msgs; uint64_t i; uint64_t count; - BT_LOGV("Calling upstream message iterator's \"next\" method: " + BT_COMP_LOGD("Calling upstream message iterator's \"next\" method: " "muxer-upstream-msg-iter-wrap-addr=%p, msg-iter-addr=%p", muxer_upstream_msg_iter, muxer_upstream_msg_iter->msg_iter); input_port_iter_status = bt_self_component_port_input_message_iterator_next( muxer_upstream_msg_iter->msg_iter, &msgs, &count); - BT_LOGV("Upstream message iterator's \"next\" method returned: " - "status=%s", bt_message_iterator_status_string(input_port_iter_status)); + BT_COMP_LOGD("Upstream message iterator's \"next\" method returned: " + "status=%s", + bt_common_func_status_string(input_port_iter_status)); switch (input_port_iter_status) { - case BT_MESSAGE_ITERATOR_STATUS_OK: + case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK: /* * Message iterator's current message is * valid: it must be considered for muxing operations. */ - BT_LOGV_STR("Validated upstream message iterator wrapper."); + BT_COMP_LOGD_STR("Validated upstream message iterator wrapper."); BT_ASSERT(count > 0); /* Move messages to our queue */ @@ -467,30 +507,30 @@ bt_self_message_iterator_status muxer_upstream_msg_iter_next( g_queue_push_tail(muxer_upstream_msg_iter->msgs, (void *) msgs[i]); } - status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK; + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; break; - case BT_MESSAGE_ITERATOR_STATUS_AGAIN: + case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN: /* * Message iterator's current message is not * valid anymore. Return - * BT_MESSAGE_ITERATOR_STATUS_AGAIN immediately. + * BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN immediately. */ - status = BT_SELF_MESSAGE_ITERATOR_STATUS_AGAIN; + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_AGAIN; break; - case BT_MESSAGE_ITERATOR_STATUS_END: /* Fall-through. */ + case BT_MESSAGE_ITERATOR_NEXT_STATUS_END: /* Fall-through. */ /* * Message iterator reached the end: release it. It * won't be considered again to find the youngest * message. */ *is_ended = true; - status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK; + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; break; default: /* Error or unsupported status code */ - BT_LOGE("Error or unsupported status code: " + BT_COMP_LOGE("Error or unsupported status code: " "status-code=%d", input_port_iter_status); - status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; break; } @@ -505,18 +545,17 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, { const bt_clock_snapshot *clock_snapshot = NULL; int ret = 0; - bt_message_stream_activity_clock_snapshot_state sa_cs_state; const bt_stream_class *stream_class = NULL; bt_message_type msg_type; BT_ASSERT(msg); BT_ASSERT(ts_ns); - BT_LOGV("Getting message's timestamp: " + BT_COMP_LOGD("Getting message's timestamp: " "muxer-msg-iter-addr=%p, msg-addr=%p, " "last-returned-ts=%" PRId64, muxer_msg_iter, msg, last_returned_ts_ns); - if (unlikely(muxer_msg_iter->clock_class_expectation == + if (G_UNLIKELY(muxer_msg_iter->clock_class_expectation == MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE)) { *ts_ns = last_returned_ts_ns; goto end; @@ -524,20 +563,20 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, msg_type = bt_message_get_type(msg); - if (unlikely(msg_type == BT_MESSAGE_TYPE_PACKET_BEGINNING)) { + if (G_UNLIKELY(msg_type == BT_MESSAGE_TYPE_PACKET_BEGINNING)) { stream_class = bt_stream_borrow_class_const( bt_packet_borrow_stream_const( bt_message_packet_beginning_borrow_packet_const( msg))); - } else if (unlikely(msg_type == BT_MESSAGE_TYPE_PACKET_END)) { + } else if (G_UNLIKELY(msg_type == BT_MESSAGE_TYPE_PACKET_END)) { stream_class = bt_stream_borrow_class_const( bt_packet_borrow_stream_const( bt_message_packet_end_borrow_packet_const( msg))); - } else if (unlikely(msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS)) { + } else if (G_UNLIKELY(msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS)) { stream_class = bt_stream_borrow_class_const( bt_message_discarded_events_borrow_stream_const(msg)); - } else if (unlikely(msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS)) { + } else if (G_UNLIKELY(msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS)) { stream_class = bt_stream_borrow_class_const( bt_message_discarded_packets_borrow_stream_const(msg)); } @@ -569,6 +608,28 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, } break; + case BT_MESSAGE_TYPE_STREAM_BEGINNING: + { + enum bt_message_stream_clock_snapshot_state snapshot_state = + bt_message_stream_beginning_borrow_default_clock_snapshot_const( + msg, &clock_snapshot); + if (snapshot_state == BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_UNKNOWN) { + goto no_clock_snapshot; + } + + break; + } + case BT_MESSAGE_TYPE_STREAM_END: + { + enum bt_message_stream_clock_snapshot_state snapshot_state = + bt_message_stream_end_borrow_default_clock_snapshot_const( + msg, &clock_snapshot); + if (snapshot_state == BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_UNKNOWN) { + goto no_clock_snapshot; + } + + break; + } case BT_MESSAGE_TYPE_DISCARDED_EVENTS: if (bt_stream_class_discarded_events_have_default_clock_snapshots( stream_class)) { @@ -588,26 +649,6 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, goto no_clock_snapshot; } - break; - case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING: - BT_ASSERT(bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const( - msg)); - sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const( - msg, &clock_snapshot); - if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) { - goto no_clock_snapshot; - } - - break; - case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END: - BT_ASSERT(bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const( - msg)); - sa_cs_state = bt_message_stream_activity_end_borrow_default_clock_snapshot_const( - msg, &clock_snapshot); - if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) { - goto no_clock_snapshot; - } - break; case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY: clock_snapshot = bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const( @@ -615,14 +656,14 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, break; default: /* All the other messages have a higher priority */ - BT_LOGV_STR("Message has no timestamp: using the last returned timestamp."); + BT_COMP_LOGD_STR("Message has no timestamp: using the last returned timestamp."); *ts_ns = last_returned_ts_ns; goto end; } ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns); if (ret) { - BT_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: " + BT_COMP_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: " "clock-snapshot-addr=%p", clock_snapshot); goto error; } @@ -630,7 +671,7 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, goto end; no_clock_snapshot: - BT_LOGV_STR("Message's default clock snapshot is missing: " + BT_COMP_LOGD_STR("Message's default clock snapshot is missing: " "using the last returned timestamp."); *ts_ns = last_returned_ts_ns; goto end; @@ -640,7 +681,7 @@ error: end: if (ret == 0) { - BT_LOGV("Found message's timestamp: " + BT_COMP_LOGD("Found message's timestamp: " "muxer-msg-iter-addr=%p, msg-addr=%p, " "last-returned-ts=%" PRId64 ", ts=%" PRId64, muxer_msg_iter, msg, last_returned_ts_ns, @@ -656,7 +697,7 @@ int validate_clock_class(struct muxer_msg_iter *muxer_msg_iter, const bt_clock_class *clock_class) { int ret = 0; - const unsigned char *cc_uuid; + const uint8_t *cc_uuid; const char *cc_name; BT_ASSERT(clock_class); @@ -684,8 +725,7 @@ int validate_clock_class(struct muxer_msg_iter *muxer_msg_iter, */ muxer_msg_iter->clock_class_expectation = MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID; - memcpy(muxer_msg_iter->expected_clock_class_uuid, - cc_uuid, BABELTRACE_UUID_LEN); + bt_uuid_copy(muxer_msg_iter->expected_clock_class_uuid, cc_uuid); } else { /* * Expect non-absolute clock classes @@ -701,7 +741,7 @@ int validate_clock_class(struct muxer_msg_iter *muxer_msg_iter, switch (muxer_msg_iter->clock_class_expectation) { case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE: if (!bt_clock_class_origin_is_unix_epoch(clock_class)) { - BT_LOGE("Expecting an absolute clock class, " + BT_COMP_LOGE("Expecting an absolute clock class, " "but got a non-absolute one: " "clock-class-addr=%p, clock-class-name=\"%s\"", clock_class, cc_name); @@ -710,7 +750,7 @@ int validate_clock_class(struct muxer_msg_iter *muxer_msg_iter, break; case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID: if (bt_clock_class_origin_is_unix_epoch(clock_class)) { - BT_LOGE("Expecting a non-absolute clock class with no UUID, " + BT_COMP_LOGE("Expecting a non-absolute clock class with no UUID, " "but got an absolute one: " "clock-class-addr=%p, clock-class-name=\"%s\"", clock_class, cc_name); @@ -718,33 +758,17 @@ int validate_clock_class(struct muxer_msg_iter *muxer_msg_iter, } if (cc_uuid) { - BT_LOGE("Expecting a non-absolute clock class with no UUID, " + BT_COMP_LOGE("Expecting a non-absolute clock class with no UUID, " "but got one with a UUID: " "clock-class-addr=%p, clock-class-name=\"%s\", " - "uuid=\"%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x\"", - clock_class, cc_name, - (unsigned int) cc_uuid[0], - (unsigned int) cc_uuid[1], - (unsigned int) cc_uuid[2], - (unsigned int) cc_uuid[3], - (unsigned int) cc_uuid[4], - (unsigned int) cc_uuid[5], - (unsigned int) cc_uuid[6], - (unsigned int) cc_uuid[7], - (unsigned int) cc_uuid[8], - (unsigned int) cc_uuid[9], - (unsigned int) cc_uuid[10], - (unsigned int) cc_uuid[11], - (unsigned int) cc_uuid[12], - (unsigned int) cc_uuid[13], - (unsigned int) cc_uuid[14], - (unsigned int) cc_uuid[15]); + "uuid=\"" BT_UUID_FMT "\"", + clock_class, cc_name, BT_UUID_FMT_VALUES(cc_uuid)); goto error; } break; case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID: if (bt_clock_class_origin_is_unix_epoch(clock_class)) { - BT_LOGE("Expecting a non-absolute clock class with a specific UUID, " + BT_COMP_LOGE("Expecting a non-absolute clock class with a specific UUID, " "but got an absolute one: " "clock-class-addr=%p, clock-class-name=\"%s\"", clock_class, cc_name); @@ -752,64 +776,33 @@ int validate_clock_class(struct muxer_msg_iter *muxer_msg_iter, } if (!cc_uuid) { - BT_LOGE("Expecting a non-absolute clock class with a specific UUID, " + BT_COMP_LOGE("Expecting a non-absolute clock class with a specific UUID, " "but got one with no UUID: " "clock-class-addr=%p, clock-class-name=\"%s\"", clock_class, cc_name); goto error; } - if (memcmp(muxer_msg_iter->expected_clock_class_uuid, - cc_uuid, BABELTRACE_UUID_LEN) != 0) { - BT_LOGE("Expecting a non-absolute clock class with a specific UUID, " + if (bt_uuid_compare(muxer_msg_iter->expected_clock_class_uuid, cc_uuid) != 0) { + BT_COMP_LOGE("Expecting a non-absolute clock class with a specific UUID, " "but got one with different UUID: " "clock-class-addr=%p, clock-class-name=\"%s\", " - "expected-uuid=\"%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x\", " - "uuid=\"%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x\"", + "expected-uuid=\"" BT_UUID_FMT "\", " + "uuid=\"" BT_UUID_FMT "\"", clock_class, cc_name, - (unsigned int) muxer_msg_iter->expected_clock_class_uuid[0], - (unsigned int) muxer_msg_iter->expected_clock_class_uuid[1], - (unsigned int) muxer_msg_iter->expected_clock_class_uuid[2], - (unsigned int) muxer_msg_iter->expected_clock_class_uuid[3], - (unsigned int) muxer_msg_iter->expected_clock_class_uuid[4], - (unsigned int) muxer_msg_iter->expected_clock_class_uuid[5], - (unsigned int) muxer_msg_iter->expected_clock_class_uuid[6], - (unsigned int) muxer_msg_iter->expected_clock_class_uuid[7], - (unsigned int) muxer_msg_iter->expected_clock_class_uuid[8], - (unsigned int) muxer_msg_iter->expected_clock_class_uuid[9], - (unsigned int) muxer_msg_iter->expected_clock_class_uuid[10], - (unsigned int) muxer_msg_iter->expected_clock_class_uuid[11], - (unsigned int) muxer_msg_iter->expected_clock_class_uuid[12], - (unsigned int) muxer_msg_iter->expected_clock_class_uuid[13], - (unsigned int) muxer_msg_iter->expected_clock_class_uuid[14], - (unsigned int) muxer_msg_iter->expected_clock_class_uuid[15], - (unsigned int) cc_uuid[0], - (unsigned int) cc_uuid[1], - (unsigned int) cc_uuid[2], - (unsigned int) cc_uuid[3], - (unsigned int) cc_uuid[4], - (unsigned int) cc_uuid[5], - (unsigned int) cc_uuid[6], - (unsigned int) cc_uuid[7], - (unsigned int) cc_uuid[8], - (unsigned int) cc_uuid[9], - (unsigned int) cc_uuid[10], - (unsigned int) cc_uuid[11], - (unsigned int) cc_uuid[12], - (unsigned int) cc_uuid[13], - (unsigned int) cc_uuid[14], - (unsigned int) cc_uuid[15]); + BT_UUID_FMT_VALUES(muxer_msg_iter->expected_clock_class_uuid), + BT_UUID_FMT_VALUES(cc_uuid)); goto error; } break; case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE: - BT_LOGE("Expecting no clock class, but got one: " + BT_COMP_LOGE("Expecting no clock class, but got one: " "clock-class-addr=%p, clock-class-name=\"%s\"", clock_class, cc_name); goto error; default: /* Unexpected */ - BT_LOGF("Unexpected clock class expectation: " + BT_COMP_LOGF("Unexpected clock class expectation: " "expectation-code=%d", muxer_msg_iter->clock_class_expectation); abort(); @@ -841,8 +834,9 @@ int validate_new_stream_clock_class(struct muxer_msg_iter *muxer_msg_iter, /* Expect no clock class */ muxer_msg_iter->clock_class_expectation = MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE; - } else { - BT_LOGE("Expecting stream class with a default clock class: " + } else if (muxer_msg_iter->clock_class_expectation != + MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE) { + BT_COMP_LOGE("Expecting stream class without a default clock class: " "stream-class-addr=%p, stream-class-name=\"%s\", " "stream-class-id=%" PRIu64, stream_class, bt_stream_class_get_name(stream_class), @@ -876,7 +870,7 @@ end: * the youngest, and sets *ts_ns to its time. */ static -bt_self_message_iterator_status +bt_component_class_message_iterator_next_method_status muxer_msg_iter_youngest_upstream_msg_iter( struct muxer_comp *muxer_comp, struct muxer_msg_iter *muxer_msg_iter, @@ -886,8 +880,8 @@ muxer_msg_iter_youngest_upstream_msg_iter( size_t i; int ret; int64_t youngest_ts_ns = INT64_MAX; - bt_self_message_iterator_status status = - BT_SELF_MESSAGE_ITERATOR_STATUS_OK; + bt_component_class_message_iterator_next_method_status status = + BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; BT_ASSERT(muxer_comp); BT_ASSERT(muxer_msg_iter); @@ -905,7 +899,7 @@ muxer_msg_iter_youngest_upstream_msg_iter( if (!cur_muxer_upstream_msg_iter->msg_iter) { /* This upstream message iterator is ended */ - BT_LOGV("Skipping ended upstream message iterator: " + BT_COMP_LOGT("Skipping ended upstream message iterator: " "muxer-upstream-msg-iter-wrap-addr=%p", cur_muxer_upstream_msg_iter); continue; @@ -915,7 +909,7 @@ muxer_msg_iter_youngest_upstream_msg_iter( msg = g_queue_peek_head(cur_muxer_upstream_msg_iter->msgs); BT_ASSERT(msg); - if (unlikely(bt_message_get_type(msg) == + if (G_UNLIKELY(bt_message_get_type(msg) == BT_MESSAGE_TYPE_STREAM_BEGINNING)) { ret = validate_new_stream_clock_class( muxer_msg_iter, muxer_comp, @@ -926,10 +920,10 @@ muxer_msg_iter_youngest_upstream_msg_iter( * validate_new_stream_clock_class() logs * errors. */ - status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; goto end; } - } else if (unlikely(bt_message_get_type(msg) == + } else if (G_UNLIKELY(bt_message_get_type(msg) == BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY)) { const bt_clock_snapshot *cs; @@ -939,7 +933,7 @@ muxer_msg_iter_youngest_upstream_msg_iter( bt_clock_snapshot_borrow_clock_class_const(cs)); if (ret) { /* validate_clock_class() logs errors */ - status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; goto end; } } @@ -949,20 +943,59 @@ muxer_msg_iter_youngest_upstream_msg_iter( if (ret) { /* get_msg_ts_ns() logs errors */ *muxer_upstream_msg_iter = NULL; - status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; goto end; } - if (msg_ts_ns <= youngest_ts_ns) { + /* + * Update the current message iterator if it has not been set + * yet, or if its current message has a timestamp smaller than + * the previously selected youngest message. + */ + if (G_UNLIKELY(*muxer_upstream_msg_iter == NULL) || + msg_ts_ns < youngest_ts_ns) { *muxer_upstream_msg_iter = cur_muxer_upstream_msg_iter; youngest_ts_ns = msg_ts_ns; *ts_ns = youngest_ts_ns; + } else if (msg_ts_ns == youngest_ts_ns) { + /* + * The currently selected message to be sent downstream + * next has the exact same timestamp that of the + * current candidate message. We must break the tie + * in a predictable manner. + */ + const bt_message *selected_msg = g_queue_peek_head( + (*muxer_upstream_msg_iter)->msgs); + BT_COMP_LOGD_STR("Two of the next message candidates have the same timestamps, pick one deterministically."); + + /* + * Order the messages in an arbitrary but determinitic + * way. + */ + ret = common_muxing_compare_messages(msg, selected_msg); + if (ret < 0) { + /* + * The `msg` should go first. Update the next + * iterator and the current timestamp. + */ + *muxer_upstream_msg_iter = + cur_muxer_upstream_msg_iter; + youngest_ts_ns = msg_ts_ns; + *ts_ns = youngest_ts_ns; + } else if (ret == 0) { + /* Unable to pick which one should go first. */ + BT_COMP_LOGW("Cannot deterministically pick next upstream message iterator because they have identical next messages: " + "muxer-upstream-msg-iter-wrap-addr=%p" + "cur-muxer-upstream-msg-iter-wrap-addr=%p", + *muxer_upstream_msg_iter, + cur_muxer_upstream_msg_iter); + } } } if (!*muxer_upstream_msg_iter) { - status = BT_SELF_MESSAGE_ITERATOR_STATUS_END; + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END; *ts_ns = INT64_MIN; } @@ -971,20 +1004,23 @@ end: } static -bt_self_message_iterator_status validate_muxer_upstream_msg_iter( +bt_component_class_message_iterator_next_method_status +validate_muxer_upstream_msg_iter( struct muxer_upstream_msg_iter *muxer_upstream_msg_iter, bool *is_ended) { - bt_self_message_iterator_status status = - BT_SELF_MESSAGE_ITERATOR_STATUS_OK; + struct muxer_comp *muxer_comp = + muxer_upstream_msg_iter->muxer_comp; + bt_component_class_message_iterator_next_method_status status = + BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; - BT_LOGV("Validating muxer's upstream message iterator wrapper: " + BT_COMP_LOGD("Validating muxer's upstream message iterator wrapper: " "muxer-upstream-msg-iter-wrap-addr=%p", muxer_upstream_msg_iter); if (muxer_upstream_msg_iter->msgs->length > 0 || !muxer_upstream_msg_iter->msg_iter) { - BT_LOGV("Already valid or not considered: " + BT_COMP_LOGD("Already valid or not considered: " "queue-len=%u, upstream-msg-iter-addr=%p", muxer_upstream_msg_iter->msgs->length, muxer_upstream_msg_iter->msg_iter); @@ -1000,14 +1036,16 @@ end: } static -bt_self_message_iterator_status validate_muxer_upstream_msg_iters( +bt_component_class_message_iterator_next_method_status +validate_muxer_upstream_msg_iters( struct muxer_msg_iter *muxer_msg_iter) { - bt_self_message_iterator_status status = - BT_SELF_MESSAGE_ITERATOR_STATUS_OK; + struct muxer_comp *muxer_comp = muxer_msg_iter->muxer_comp; + bt_component_class_message_iterator_next_method_status status = + BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; size_t i; - BT_LOGV("Validating muxer's upstream message iterator wrappers: " + BT_COMP_LOGD("Validating muxer's upstream message iterator wrappers: " "muxer-msg-iter-addr=%p", muxer_msg_iter); for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len; @@ -1020,15 +1058,15 @@ bt_self_message_iterator_status validate_muxer_upstream_msg_iters( status = validate_muxer_upstream_msg_iter( muxer_upstream_msg_iter, &is_ended); - if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) { + if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { if (status < 0) { - BT_LOGE("Cannot validate muxer's upstream message iterator wrapper: " + BT_COMP_LOGE("Cannot validate muxer's upstream message iterator wrapper: " "muxer-msg-iter-addr=%p, " "muxer-upstream-msg-iter-wrap-addr=%p", muxer_msg_iter, muxer_upstream_msg_iter); } else { - BT_LOGV("Cannot validate muxer's upstream message iterator wrapper: " + BT_COMP_LOGD("Cannot validate muxer's upstream message iterator wrapper: " "muxer-msg-iter-addr=%p, " "muxer-upstream-msg-iter-wrap-addr=%p", muxer_msg_iter, @@ -1042,8 +1080,8 @@ bt_self_message_iterator_status validate_muxer_upstream_msg_iters( * Move this muxer upstream message iterator to the * array of ended iterators if it's ended. */ - if (unlikely(is_ended)) { - BT_LOGV("Muxer's upstream message iterator wrapper: ended or canceled: " + if (G_UNLIKELY(is_ended)) { + BT_COMP_LOGD("Muxer's upstream message iterator wrapper: ended or canceled: " "muxer-msg-iter-addr=%p, " "muxer-upstream-msg-iter-wrap-addr=%p", muxer_msg_iter, muxer_upstream_msg_iter); @@ -1068,17 +1106,17 @@ end: } static inline -bt_self_message_iterator_status muxer_msg_iter_do_next_one( +bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next_one( struct muxer_comp *muxer_comp, struct muxer_msg_iter *muxer_msg_iter, const bt_message **msg) { - bt_self_message_iterator_status status; + bt_component_class_message_iterator_next_method_status status; struct muxer_upstream_msg_iter *muxer_upstream_msg_iter = NULL; int64_t next_return_ts; status = validate_muxer_upstream_msg_iters(muxer_msg_iter); - if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) { + if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { /* validate_muxer_upstream_msg_iters() logs details */ goto end; } @@ -1092,36 +1130,36 @@ bt_self_message_iterator_status muxer_msg_iter_do_next_one( status = muxer_msg_iter_youngest_upstream_msg_iter(muxer_comp, muxer_msg_iter, &muxer_upstream_msg_iter, &next_return_ts); - if (status < 0 || status == BT_SELF_MESSAGE_ITERATOR_STATUS_END) { + if (status < 0 || status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END) { if (status < 0) { - BT_LOGE("Cannot find the youngest upstream message iterator wrapper: " + BT_COMP_LOGE("Cannot find the youngest upstream message iterator wrapper: " "status=%s", - bt_common_self_message_iterator_status_string(status)); + bt_common_func_status_string(status)); } else { - BT_LOGV("Cannot find the youngest upstream message iterator wrapper: " + BT_COMP_LOGD("Cannot find the youngest upstream message iterator wrapper: " "status=%s", - bt_common_self_message_iterator_status_string(status)); + bt_common_func_status_string(status)); } goto end; } if (next_return_ts < muxer_msg_iter->last_returned_ts_ns) { - BT_LOGE("Youngest upstream message iterator wrapper's timestamp is less than muxer's message iterator's last returned timestamp: " + BT_COMP_LOGE("Youngest upstream message iterator wrapper's timestamp is less than muxer's message iterator's last returned timestamp: " "muxer-msg-iter-addr=%p, ts=%" PRId64 ", " "last-returned-ts=%" PRId64, muxer_msg_iter, next_return_ts, muxer_msg_iter->last_returned_ts_ns); - status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; goto end; } - BT_LOGV("Found youngest upstream message iterator wrapper: " + BT_COMP_LOGD("Found youngest upstream message iterator wrapper: " "muxer-msg-iter-addr=%p, " "muxer-upstream-msg-iter-wrap-addr=%p, " "ts=%" PRId64, muxer_msg_iter, muxer_upstream_msg_iter, next_return_ts); - BT_ASSERT(status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK); + BT_ASSERT(status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK); BT_ASSERT(muxer_upstream_msg_iter); /* @@ -1137,20 +1175,20 @@ end: } static -bt_self_message_iterator_status muxer_msg_iter_do_next( +bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next( struct muxer_comp *muxer_comp, struct muxer_msg_iter *muxer_msg_iter, bt_message_array_const msgs, uint64_t capacity, uint64_t *count) { - bt_self_message_iterator_status status = - BT_SELF_MESSAGE_ITERATOR_STATUS_OK; + bt_component_class_message_iterator_next_method_status status = + BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; uint64_t i = 0; - while (i < capacity && status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK) { + while (i < capacity && status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { status = muxer_msg_iter_do_next_one(muxer_comp, muxer_msg_iter, &msgs[i]); - if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK) { + if (status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { i++; } } @@ -1169,7 +1207,7 @@ bt_self_message_iterator_status muxer_msg_iter_do_next( * message, in which case we'll return it. */ *count = i; - status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK; + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; } return status; @@ -1178,21 +1216,24 @@ bt_self_message_iterator_status muxer_msg_iter_do_next( static void destroy_muxer_msg_iter(struct muxer_msg_iter *muxer_msg_iter) { + struct muxer_comp *muxer_comp; + if (!muxer_msg_iter) { return; } - BT_LOGD("Destroying muxer component's message iterator: " + muxer_comp = muxer_msg_iter->muxer_comp; + BT_COMP_LOGD("Destroying muxer component's message iterator: " "muxer-msg-iter-addr=%p", muxer_msg_iter); if (muxer_msg_iter->active_muxer_upstream_msg_iters) { - BT_LOGD_STR("Destroying muxer's active upstream message iterator wrappers."); + BT_COMP_LOGD_STR("Destroying muxer's active upstream message iterator wrappers."); g_ptr_array_free( muxer_msg_iter->active_muxer_upstream_msg_iters, TRUE); } if (muxer_msg_iter->ended_muxer_upstream_msg_iters) { - BT_LOGD_STR("Destroying muxer's ended upstream message iterator wrappers."); + BT_COMP_LOGD_STR("Destroying muxer's ended upstream message iterator wrappers."); g_ptr_array_free( muxer_msg_iter->ended_muxer_upstream_msg_iters, TRUE); } @@ -1201,20 +1242,22 @@ void destroy_muxer_msg_iter(struct muxer_msg_iter *muxer_msg_iter) } static -int muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, +bt_component_class_message_iterator_init_method_status +muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, struct muxer_msg_iter *muxer_msg_iter) { int64_t count; int64_t i; - int ret = 0; + bt_component_class_message_iterator_init_method_status status; count = bt_component_filter_get_input_port_count( bt_self_component_filter_as_component_filter( - muxer_comp->self_comp)); + muxer_comp->self_comp_flt)); if (count < 0) { - BT_LOGD("No input port to initialize for muxer component's message iterator: " + BT_COMP_LOGD("No input port to initialize for muxer component's message iterator: " "muxer-comp-addr=%p, muxer-msg-iter-addr=%p", muxer_comp, muxer_msg_iter); + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_OK; goto end; } @@ -1222,8 +1265,11 @@ int muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, bt_self_component_port_input_message_iterator *upstream_msg_iter; bt_self_component_port_input *self_port = bt_self_component_filter_borrow_input_port_by_index( - muxer_comp->self_comp, i); + muxer_comp->self_comp_flt, i); const bt_port *port; + bt_self_component_port_input_message_iterator_create_from_message_iterator_status + msg_iter_status; + int int_status; BT_ASSERT(self_port); port = bt_self_component_port_as_port( @@ -1236,44 +1282,45 @@ int muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, continue; } - upstream_msg_iter = create_msg_iter_on_input_port(self_port); - if (!upstream_msg_iter) { + msg_iter_status = create_msg_iter_on_input_port(muxer_comp, + muxer_msg_iter, self_port, &upstream_msg_iter); + if (msg_iter_status != BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_CREATE_FROM_MESSAGE_ITERATOR_STATUS_OK) { /* create_msg_iter_on_input_port() logs errors */ - BT_ASSERT(!upstream_msg_iter); - ret = -1; + status = (int) msg_iter_status; goto end; } - ret = muxer_msg_iter_add_upstream_msg_iter(muxer_msg_iter, + int_status = muxer_msg_iter_add_upstream_msg_iter(muxer_msg_iter, upstream_msg_iter); bt_self_component_port_input_message_iterator_put_ref( upstream_msg_iter); - if (ret) { + if (int_status) { + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_ERROR; /* muxer_msg_iter_add_upstream_msg_iter() logs errors */ goto end; } } + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_OK; + end: - return ret; + return status; } BT_HIDDEN -bt_self_message_iterator_status muxer_msg_iter_init( +bt_component_class_message_iterator_init_method_status muxer_msg_iter_init( bt_self_message_iterator *self_msg_iter, bt_self_component_filter *self_comp, bt_self_component_port_output *port) { struct muxer_comp *muxer_comp = NULL; struct muxer_msg_iter *muxer_msg_iter = NULL; - bt_self_message_iterator_status status = - BT_SELF_MESSAGE_ITERATOR_STATUS_OK; - int ret; + bt_component_class_message_iterator_init_method_status status; muxer_comp = bt_self_component_get_data( bt_self_component_filter_as_self_component(self_comp)); BT_ASSERT(muxer_comp); - BT_LOGD("Initializing muxer component's message iterator: " + BT_COMP_LOGD("Initializing muxer component's message iterator: " "comp-addr=%p, muxer-comp-addr=%p, msg-iter-addr=%p", self_comp, muxer_comp, self_msg_iter); @@ -1283,25 +1330,30 @@ bt_self_message_iterator_status muxer_msg_iter_init( * creates a muxer message iterator while creating * another muxer message iterator (same component). */ - BT_LOGE("Recursive initialization of muxer component's message iterator: " + BT_COMP_LOGE("Recursive initialization of muxer component's message iterator: " "comp-addr=%p, muxer-comp-addr=%p, msg-iter-addr=%p", self_comp, muxer_comp, self_msg_iter); + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_ERROR; goto error; } muxer_comp->initializing_muxer_msg_iter = true; muxer_msg_iter = g_new0(struct muxer_msg_iter, 1); if (!muxer_msg_iter) { - BT_LOGE_STR("Failed to allocate one muxer component's message iterator."); + BT_COMP_LOGE_STR("Failed to allocate one muxer component's message iterator."); + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_MEMORY_ERROR; goto error; } + muxer_msg_iter->muxer_comp = muxer_comp; + muxer_msg_iter->self_msg_iter = self_msg_iter; muxer_msg_iter->last_returned_ts_ns = INT64_MIN; muxer_msg_iter->active_muxer_upstream_msg_iters = g_ptr_array_new_with_free_func( (GDestroyNotify) destroy_muxer_upstream_msg_iter); if (!muxer_msg_iter->active_muxer_upstream_msg_iters) { - BT_LOGE_STR("Failed to allocate a GPtrArray."); + BT_COMP_LOGE_STR("Failed to allocate a GPtrArray."); + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_MEMORY_ERROR; goto error; } @@ -1309,23 +1361,24 @@ bt_self_message_iterator_status muxer_msg_iter_init( g_ptr_array_new_with_free_func( (GDestroyNotify) destroy_muxer_upstream_msg_iter); if (!muxer_msg_iter->ended_muxer_upstream_msg_iters) { - BT_LOGE_STR("Failed to allocate a GPtrArray."); + BT_COMP_LOGE_STR("Failed to allocate a GPtrArray."); + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_MEMORY_ERROR; goto error; } - ret = muxer_msg_iter_init_upstream_iterators(muxer_comp, + status = muxer_msg_iter_init_upstream_iterators(muxer_comp, muxer_msg_iter); - if (ret) { - BT_LOGE("Cannot initialize connected input ports for muxer component's message iterator: " + if (status) { + BT_COMP_LOGE("Cannot initialize connected input ports for muxer component's message iterator: " "comp-addr=%p, muxer-comp-addr=%p, " "muxer-msg-iter-addr=%p, msg-iter-addr=%p, ret=%d", self_comp, muxer_comp, muxer_msg_iter, - self_msg_iter, ret); + self_msg_iter, status); goto error; } bt_self_message_iterator_set_data(self_msg_iter, muxer_msg_iter); - BT_LOGD("Initialized muxer component's message iterator: " + BT_COMP_LOGD("Initialized muxer component's message iterator: " "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, " "msg-iter-addr=%p", self_comp, muxer_comp, muxer_msg_iter, self_msg_iter); @@ -1334,7 +1387,6 @@ bt_self_message_iterator_status muxer_msg_iter_init( error: destroy_muxer_msg_iter(muxer_msg_iter); bt_self_message_iterator_set_data(self_msg_iter, NULL); - status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; end: muxer_comp->initializing_muxer_msg_iter = false; @@ -1353,7 +1405,7 @@ void muxer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter) self_msg_iter); BT_ASSERT(self_comp); muxer_comp = bt_self_component_get_data(self_comp); - BT_LOGD("Finalizing muxer component's message iterator: " + BT_COMP_LOGD("Finalizing muxer component's message iterator: " "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, " "msg-iter-addr=%p", self_comp, muxer_comp, muxer_msg_iter, self_msg_iter); @@ -1364,12 +1416,12 @@ void muxer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter) } BT_HIDDEN -bt_self_message_iterator_status muxer_msg_iter_next( +bt_component_class_message_iterator_next_method_status muxer_msg_iter_next( bt_self_message_iterator *self_msg_iter, bt_message_array_const msgs, uint64_t capacity, uint64_t *count) { - bt_self_message_iterator_status status; + bt_component_class_message_iterator_next_method_status status; struct muxer_msg_iter *muxer_msg_iter = bt_self_message_iterator_get_data(self_msg_iter); bt_self_component *self_comp = NULL; @@ -1381,7 +1433,7 @@ bt_self_message_iterator_status muxer_msg_iter_next( BT_ASSERT(self_comp); muxer_comp = bt_self_component_get_data(self_comp); BT_ASSERT(muxer_comp); - BT_LOGV("Muxer component's message iterator's \"next\" method called: " + BT_COMP_LOGT("Muxer component's message iterator's \"next\" method called: " "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, " "msg-iter-addr=%p", self_comp, muxer_comp, muxer_msg_iter, self_msg_iter); @@ -1389,37 +1441,45 @@ bt_self_message_iterator_status muxer_msg_iter_next( status = muxer_msg_iter_do_next(muxer_comp, muxer_msg_iter, msgs, capacity, count); if (status < 0) { - BT_LOGE("Cannot get next message: " + BT_COMP_LOGE("Cannot get next message: " "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, " "msg-iter-addr=%p, status=%s", self_comp, muxer_comp, muxer_msg_iter, self_msg_iter, - bt_common_self_message_iterator_status_string(status)); + bt_common_func_status_string(status)); } else { - BT_LOGV("Returning from muxer component's message iterator's \"next\" method: " + BT_COMP_LOGT("Returning from muxer component's message iterator's \"next\" method: " "status=%s", - bt_common_self_message_iterator_status_string(status)); + bt_common_func_status_string(status)); } return status; } BT_HIDDEN -bt_self_component_status muxer_input_port_connected( +bt_component_class_port_connected_method_status muxer_input_port_connected( bt_self_component_filter *self_comp, bt_self_component_port_input *self_port, const bt_port_output *other_port) { - bt_self_component_status status; + bt_component_class_port_connected_method_status status = + BT_COMPONENT_CLASS_PORT_CONNECTED_METHOD_STATUS_OK; + bt_self_component_add_port_status add_port_status; + struct muxer_comp *muxer_comp = bt_self_component_get_data( + bt_self_component_filter_as_self_component(self_comp)); - status = add_available_input_port(self_comp); - if (status) { - /* - * Only way to report an error later since this - * method does not return anything. - */ - BT_LOGE("Cannot add one muxer component's input port: " + add_port_status = add_available_input_port(self_comp); + if (add_port_status) { + BT_COMP_LOGE("Cannot add one muxer component's input port: " "status=%s", - bt_self_component_status_string(status)); + bt_common_func_status_string(status)); + + if (add_port_status == + BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR) { + status = BT_COMPONENT_CLASS_PORT_CONNECTED_METHOD_STATUS_MEMORY_ERROR; + } else { + status = BT_COMPONENT_CLASS_PORT_CONNECTED_METHOD_STATUS_ERROR; + } + goto end; } @@ -1428,58 +1488,68 @@ end: } static inline -bt_bool muxer_upstream_msg_iters_can_all_seek_beginning( - GPtrArray *muxer_upstream_msg_iters) +bt_component_class_message_iterator_can_seek_beginning_method_status +muxer_upstream_msg_iters_can_all_seek_beginning( + GPtrArray *muxer_upstream_msg_iters, bt_bool *can_seek) { + bt_component_class_message_iterator_can_seek_beginning_method_status status; uint64_t i; - bt_bool ret = BT_TRUE; for (i = 0; i < muxer_upstream_msg_iters->len; i++) { struct muxer_upstream_msg_iter *upstream_msg_iter = muxer_upstream_msg_iters->pdata[i]; + status = (int) bt_self_component_port_input_message_iterator_can_seek_beginning( + upstream_msg_iter->msg_iter, can_seek); + if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_BEGINNING_METHOD_STATUS_OK) { + goto end; + } - if (!bt_self_component_port_input_message_iterator_can_seek_beginning( - upstream_msg_iter->msg_iter)) { - ret = BT_FALSE; + if (!*can_seek) { goto end; } } + *can_seek = BT_TRUE; + end: - return ret; + return status; } BT_HIDDEN -bt_bool muxer_msg_iter_can_seek_beginning( - bt_self_message_iterator *self_msg_iter) +bt_component_class_message_iterator_can_seek_beginning_method_status +muxer_msg_iter_can_seek_beginning( + bt_self_message_iterator *self_msg_iter, bt_bool *can_seek) { struct muxer_msg_iter *muxer_msg_iter = bt_self_message_iterator_get_data(self_msg_iter); - bt_bool ret = BT_TRUE; + bt_component_class_message_iterator_can_seek_beginning_method_status status; - if (!muxer_upstream_msg_iters_can_all_seek_beginning( - muxer_msg_iter->active_muxer_upstream_msg_iters)) { - ret = BT_FALSE; + status = muxer_upstream_msg_iters_can_all_seek_beginning( + muxer_msg_iter->active_muxer_upstream_msg_iters, can_seek); + if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_BEGINNING_METHOD_STATUS_OK) { goto end; } - if (!muxer_upstream_msg_iters_can_all_seek_beginning( - muxer_msg_iter->ended_muxer_upstream_msg_iters)) { - ret = BT_FALSE; + if (!*can_seek) { goto end; } + status = muxer_upstream_msg_iters_can_all_seek_beginning( + muxer_msg_iter->ended_muxer_upstream_msg_iters, can_seek); + end: - return ret; + return status; } BT_HIDDEN -bt_self_message_iterator_status muxer_msg_iter_seek_beginning( +bt_component_class_message_iterator_seek_beginning_method_status muxer_msg_iter_seek_beginning( bt_self_message_iterator *self_msg_iter) { struct muxer_msg_iter *muxer_msg_iter = bt_self_message_iterator_get_data(self_msg_iter); - bt_message_iterator_status status = BT_MESSAGE_ITERATOR_STATUS_OK; + bt_component_class_message_iterator_seek_beginning_method_status status = + BT_COMPONENT_CLASS_MESSAGE_ITERATOR_SEEK_BEGINNING_METHOD_STATUS_OK; + bt_message_iterator_seek_beginning_status seek_beg_status; uint64_t i; /* Seek all ended upstream iterators first */ @@ -1488,9 +1558,10 @@ bt_self_message_iterator_status muxer_msg_iter_seek_beginning( struct muxer_upstream_msg_iter *upstream_msg_iter = muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i]; - status = bt_self_component_port_input_message_iterator_seek_beginning( + seek_beg_status = bt_self_component_port_input_message_iterator_seek_beginning( upstream_msg_iter->msg_iter); - if (status != BT_MESSAGE_ITERATOR_STATUS_OK) { + if (seek_beg_status != BT_MESSAGE_ITERATOR_SEEK_BEGINNING_STATUS_OK) { + status = (int) seek_beg_status; goto end; } @@ -1503,9 +1574,10 @@ bt_self_message_iterator_status muxer_msg_iter_seek_beginning( struct muxer_upstream_msg_iter *upstream_msg_iter = muxer_msg_iter->active_muxer_upstream_msg_iters->pdata[i]; - status = bt_self_component_port_input_message_iterator_seek_beginning( + seek_beg_status = bt_self_component_port_input_message_iterator_seek_beginning( upstream_msg_iter->msg_iter); - if (status != BT_MESSAGE_ITERATOR_STATUS_OK) { + if (seek_beg_status != BT_MESSAGE_ITERATOR_SEEK_BEGINNING_STATUS_OK) { + status = (int) seek_beg_status; goto end; } @@ -1523,12 +1595,18 @@ bt_self_message_iterator_status muxer_msg_iter_seek_beginning( muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i] = NULL; } - g_ptr_array_remove_range(muxer_msg_iter->ended_muxer_upstream_msg_iters, - 0, muxer_msg_iter->ended_muxer_upstream_msg_iters->len); + /* + * GLib < 2.48.0 asserts when g_ptr_array_remove_range() is + * called on an empty array. + */ + if (muxer_msg_iter->ended_muxer_upstream_msg_iters->len > 0) { + g_ptr_array_remove_range(muxer_msg_iter->ended_muxer_upstream_msg_iters, + 0, muxer_msg_iter->ended_muxer_upstream_msg_iters->len); + } muxer_msg_iter->last_returned_ts_ns = INT64_MIN; muxer_msg_iter->clock_class_expectation = MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY; end: - return (bt_self_message_iterator_status) status; + return status; }