X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=plugins%2Futils%2Fmuxer%2Fmuxer.c;h=47d790b3d9a14e59387e7d611c984077680d608e;hb=6c20f4a0c496cfac67ceabe876edc796929ffb64;hp=cc6dc4602d649092eb19a75890346c52da23c3ca;hpb=4cdfc5e86b64137d96c31495cbdea99801714c2b;p=babeltrace.git diff --git a/plugins/utils/muxer/muxer.c b/plugins/utils/muxer/muxer.c index cc6dc460..47d790b3 100644 --- a/plugins/utils/muxer/muxer.c +++ b/plugins/utils/muxer/muxer.c @@ -39,16 +39,11 @@ #include #include +#include "muxer.h" + #define ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME "assume-absolute-clock-classes" struct muxer_comp { - /* - * Array of struct - * bt_self_message_iterator * - * (weak refs) - */ - GPtrArray *muxer_msg_iters; - /* Weak ref */ bt_self_component_filter *self_comp; @@ -68,6 +63,7 @@ struct muxer_upstream_msg_iter { enum muxer_msg_iter_clock_class_expectation { MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY = 0, + MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE, MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE, MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID, MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID, @@ -82,18 +78,16 @@ struct muxer_msg_iter { * another data structure is faster than this for our typical * use cases. */ - GPtrArray *muxer_upstream_msg_iters; + GPtrArray *active_muxer_upstream_msg_iters; /* - * List of "recently" connected input ports (weak) to - * handle by this muxer message iterator. - * muxer_port_connected() adds entries to this list, and the - * entries are removed when a message iterator is created - * on the port's connection and put into - * muxer_upstream_msg_iters above by - * muxer_msg_iter_handle_newly_connected_ports(). + * Array of struct muxer_upstream_msg_iter * (owned by this). + * + * We move ended message iterators from + * `active_muxer_upstream_msg_iters` to this array so as to be + * able to restore them when seeking. */ - GList *newly_connected_self_ports; + GPtrArray *ended_muxer_upstream_msg_iters; /* Last time returned in a message */ int64_t last_returned_ts_ns; @@ -109,6 +103,16 @@ struct muxer_msg_iter { unsigned char expected_clock_class_uuid[BABELTRACE_UUID_LEN]; }; +static +void empty_message_queue(struct muxer_upstream_msg_iter *upstream_msg_iter) +{ + const bt_message *msg; + + while ((msg = g_queue_pop_head(upstream_msg_iter->msgs))) { + bt_message_put_ref(msg); + } +} + static void destroy_muxer_upstream_msg_iter( struct muxer_upstream_msg_iter *muxer_upstream_msg_iter) @@ -122,16 +126,11 @@ void destroy_muxer_upstream_msg_iter( muxer_upstream_msg_iter, muxer_upstream_msg_iter->msg_iter, muxer_upstream_msg_iter->msgs->length); - bt_self_component_port_input_message_iterator_put_ref(muxer_upstream_msg_iter->msg_iter); + bt_self_component_port_input_message_iterator_put_ref( + muxer_upstream_msg_iter->msg_iter); if (muxer_upstream_msg_iter->msgs) { - const bt_message *msg; - - while ((msg = g_queue_pop_head( - muxer_upstream_msg_iter->msgs))) { - bt_message_put_ref(msg); - } - + empty_message_queue(muxer_upstream_msg_iter); g_queue_free(muxer_upstream_msg_iter->msgs); } @@ -148,7 +147,7 @@ struct muxer_upstream_msg_iter *muxer_msg_iter_add_upstream_msg_iter( if (!muxer_upstream_msg_iter) { BT_LOGE_STR("Failed to allocate one muxer's upstream message iterator wrapper."); - goto end; + goto error; } muxer_upstream_msg_iter->msg_iter = self_msg_iter; @@ -156,22 +155,28 @@ struct muxer_upstream_msg_iter *muxer_msg_iter_add_upstream_msg_iter( muxer_upstream_msg_iter->msgs = g_queue_new(); if (!muxer_upstream_msg_iter->msgs) { BT_LOGE_STR("Failed to allocate a GQueue."); - goto end; + goto error; } - g_ptr_array_add(muxer_msg_iter->muxer_upstream_msg_iters, + g_ptr_array_add(muxer_msg_iter->active_muxer_upstream_msg_iters, muxer_upstream_msg_iter); BT_LOGD("Added muxer's upstream message iterator wrapper: " "addr=%p, muxer-msg-iter-addr=%p, msg-iter-addr=%p", muxer_upstream_msg_iter, muxer_msg_iter, self_msg_iter); + goto end; + +error: + g_free(muxer_upstream_msg_iter); + muxer_upstream_msg_iter = NULL; + end: return muxer_upstream_msg_iter; } static -bt_self_component_status ensure_available_input_port( +bt_self_component_status add_available_input_port( bt_self_component_filter *self_comp) { struct muxer_comp *muxer_comp = bt_self_component_get_data( @@ -180,11 +185,6 @@ bt_self_component_status ensure_available_input_port( GString *port_name = NULL; BT_ASSERT(muxer_comp); - - if (muxer_comp->available_input_ports >= 1) { - goto end; - } - port_name = g_string_new("in"); if (!port_name) { BT_LOGE_STR("Failed to allocate a GString."); @@ -208,6 +208,7 @@ bt_self_component_status ensure_available_input_port( BT_LOGD("Added one input port to muxer component: " "port-name=\"%s\", comp-addr=%p", port_name->str, self_comp); + end: if (port_name) { g_string_free(port_name, TRUE); @@ -231,15 +232,6 @@ void destroy_muxer_comp(struct muxer_comp *muxer_comp) return; } - BT_LOGD("Destroying muxer component: muxer-comp-addr=%p, " - "muxer-msg-iter-count=%u", muxer_comp, - muxer_comp->muxer_msg_iters ? - muxer_comp->muxer_msg_iters->len : 0); - - if (muxer_comp->muxer_msg_iters) { - g_ptr_array_free(muxer_comp->muxer_msg_iters, TRUE); - } - g_free(muxer_comp); } @@ -328,7 +320,7 @@ end: BT_HIDDEN bt_self_component_status muxer_init( bt_self_component_filter *self_comp, - bt_value *params, void *init_data) + const bt_value *params, void *init_data) { int ret; bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK; @@ -350,17 +342,11 @@ bt_self_component_status muxer_init( goto error; } - muxer_comp->muxer_msg_iters = g_ptr_array_new(); - if (!muxer_comp->muxer_msg_iters) { - BT_LOGE_STR("Failed to allocate a GPtrArray."); - goto error; - } - muxer_comp->self_comp = self_comp; bt_self_component_set_data( bt_self_component_filter_as_self_component(self_comp), muxer_comp); - status = ensure_available_input_port(self_comp); + status = add_available_input_port(self_comp); if (status != BT_SELF_COMPONENT_STATUS_OK) { BT_LOGE("Cannot ensure that at least one muxer component's input port is available: " "muxer-comp-addr=%p, status=%s", @@ -411,8 +397,7 @@ void muxer_finalize(bt_self_component_filter *self_comp) static bt_self_component_port_input_message_iterator * -create_msg_iter_on_input_port( - bt_self_component_port_input *self_port, int *ret) +create_msg_iter_on_input_port(bt_self_component_port_input *self_port) { const bt_port *port = bt_self_component_port_as_port( bt_self_component_port_input_as_self_component_port( @@ -420,8 +405,6 @@ create_msg_iter_on_input_port( bt_self_component_port_input_message_iterator *msg_iter = NULL; - BT_ASSERT(ret); - *ret = 0; BT_ASSERT(port); BT_ASSERT(bt_port_is_connected(port)); @@ -434,7 +417,6 @@ create_msg_iter_on_input_port( BT_LOGE("Cannot create upstream message iterator on input port: " "port-addr=%p, port-name=\"%s\"", port, bt_port_get_name(port)); - *ret = -1; goto end; } @@ -447,10 +429,12 @@ end: } static -bt_message_iterator_status muxer_upstream_msg_iter_next( - struct muxer_upstream_msg_iter *muxer_upstream_msg_iter) +bt_self_message_iterator_status muxer_upstream_msg_iter_next( + struct muxer_upstream_msg_iter *muxer_upstream_msg_iter, + bool *is_ended) { - bt_message_iterator_status status; + bt_self_message_iterator_status status; + bt_message_iterator_status input_port_iter_status; bt_message_array_const msgs; uint64_t i; uint64_t count; @@ -459,12 +443,12 @@ bt_message_iterator_status muxer_upstream_msg_iter_next( "muxer-upstream-msg-iter-wrap-addr=%p, msg-iter-addr=%p", muxer_upstream_msg_iter, muxer_upstream_msg_iter->msg_iter); - status = bt_self_component_port_input_message_iterator_next( + input_port_iter_status = bt_self_component_port_input_message_iterator_next( muxer_upstream_msg_iter->msg_iter, &msgs, &count); BT_LOGV("Upstream message iterator's \"next\" method returned: " - "status=%s", bt_message_iterator_status_string(status)); + "status=%s", bt_message_iterator_status_string(input_port_iter_status)); - switch (status) { + switch (input_port_iter_status) { case BT_MESSAGE_ITERATOR_STATUS_OK: /* * Message iterator's current message is @@ -483,6 +467,7 @@ bt_message_iterator_status muxer_upstream_msg_iter_next( g_queue_push_tail(muxer_upstream_msg_iter->msgs, (void *) msgs[i]); } + status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK; break; case BT_MESSAGE_ITERATOR_STATUS_AGAIN: /* @@ -490,145 +475,106 @@ bt_message_iterator_status muxer_upstream_msg_iter_next( * valid anymore. Return * BT_MESSAGE_ITERATOR_STATUS_AGAIN immediately. */ + status = BT_SELF_MESSAGE_ITERATOR_STATUS_AGAIN; break; case BT_MESSAGE_ITERATOR_STATUS_END: /* Fall-through. */ - case BT_MESSAGE_ITERATOR_STATUS_CANCELED: /* * Message iterator reached the end: release it. It * won't be considered again to find the youngest * message. */ - BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(muxer_upstream_msg_iter->msg_iter); - status = BT_MESSAGE_ITERATOR_STATUS_OK; + *is_ended = true; + status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK; break; default: /* Error or unsupported status code */ BT_LOGE("Error or unsupported status code: " - "status-code=%d", status); - status = BT_MESSAGE_ITERATOR_STATUS_ERROR; + "status-code=%d", input_port_iter_status); + status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; break; } return status; } -static -int muxer_msg_iter_handle_newly_connected_ports( - struct muxer_msg_iter *muxer_msg_iter) -{ - int ret = 0; - - BT_LOGV("Handling newly connected ports: " - "muxer-msg-iter-addr=%p", muxer_msg_iter); - - /* - * Here we create one upstream message iterator for each - * newly connected port. We do NOT perform an initial "next" on - * those new upstream message iterators: they are - * invalidated, to be validated later. The list of newly - * connected ports to handle here is updated by - * muxer_port_connected(). - */ - while (true) { - GList *node = muxer_msg_iter->newly_connected_self_ports; - bt_self_component_port_input *self_port; - const bt_port *port; - bt_self_component_port_input_message_iterator * - upstream_msg_iter = NULL; - struct muxer_upstream_msg_iter *muxer_upstream_msg_iter; - - if (!node) { - break; - } - - self_port = node->data; - port = bt_self_component_port_as_port( - bt_self_component_port_input_as_self_component_port( - (self_port))); - BT_ASSERT(port); - - if (!bt_port_is_connected(port)) { - /* - * Looks like this port is not connected - * anymore: we can't create an upstream - * message iterator on its (non-existing) - * connection in this case. - */ - goto remove_node; - } - - upstream_msg_iter = create_msg_iter_on_input_port( - self_port, &ret); - if (ret) { - /* create_msg_iter_on_input_port() logs errors */ - BT_ASSERT(!upstream_msg_iter); - goto error; - } - - muxer_upstream_msg_iter = - muxer_msg_iter_add_upstream_msg_iter( - muxer_msg_iter, upstream_msg_iter); - BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(upstream_msg_iter); - if (!muxer_upstream_msg_iter) { - /* - * muxer_msg_iter_add_upstream_msg_iter() - * logs errors. - */ - goto error; - } - -remove_node: - bt_self_component_port_input_message_iterator_put_ref(upstream_msg_iter); - muxer_msg_iter->newly_connected_self_ports = - g_list_delete_link( - muxer_msg_iter->newly_connected_self_ports, - node); - } - - goto end; - -error: - if (ret >= 0) { - ret = -1; - } - -end: - return ret; -} - static int get_msg_ts_ns(struct muxer_comp *muxer_comp, struct muxer_msg_iter *muxer_msg_iter, const bt_message *msg, int64_t last_returned_ts_ns, int64_t *ts_ns) { - const bt_clock_class *clock_class = NULL; const bt_clock_snapshot *clock_snapshot = NULL; - const bt_event *event = NULL; int ret = 0; - const unsigned char *cc_uuid; - const char *cc_name; bt_clock_snapshot_state cs_state = BT_CLOCK_SNAPSHOT_STATE_KNOWN; + bt_message_stream_activity_clock_snapshot_state sa_cs_state; BT_ASSERT(msg); BT_ASSERT(ts_ns); - BT_LOGV("Getting message's timestamp: " "muxer-msg-iter-addr=%p, msg-addr=%p, " "last-returned-ts=%" PRId64, muxer_msg_iter, msg, last_returned_ts_ns); + if (unlikely(muxer_msg_iter->clock_class_expectation == + MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE)) { + *ts_ns = last_returned_ts_ns; + goto end; + } + switch (bt_message_get_type(msg)) { case BT_MESSAGE_TYPE_EVENT: - event = bt_message_event_borrow_event_const(msg); - BT_ASSERT(event); - cs_state = bt_event_borrow_default_clock_snapshot_const(event, - &clock_snapshot); + BT_ASSERT(bt_message_event_borrow_stream_class_default_clock_class_const( + msg)); + cs_state = bt_message_event_borrow_default_clock_snapshot_const( + msg, &clock_snapshot); + break; + case BT_MESSAGE_TYPE_PACKET_BEGINNING: + BT_ASSERT(bt_message_packet_beginning_borrow_stream_class_default_clock_class_const( + msg)); + cs_state = bt_message_packet_beginning_borrow_default_clock_snapshot_const( + msg, &clock_snapshot); + break; + case BT_MESSAGE_TYPE_PACKET_END: + BT_ASSERT(bt_message_packet_end_borrow_stream_class_default_clock_class_const( + msg)); + cs_state = bt_message_packet_end_borrow_default_clock_snapshot_const( + msg, &clock_snapshot); + break; + case BT_MESSAGE_TYPE_DISCARDED_EVENTS: + BT_ASSERT(bt_message_discarded_events_borrow_stream_class_default_clock_class_const( + msg)); + cs_state = bt_message_discarded_events_borrow_default_beginning_clock_snapshot_const( + msg, &clock_snapshot); break; + case BT_MESSAGE_TYPE_DISCARDED_PACKETS: + BT_ASSERT(bt_message_discarded_packets_borrow_stream_class_default_clock_class_const( + msg)); + cs_state = bt_message_discarded_packets_borrow_default_beginning_clock_snapshot_const( + msg, &clock_snapshot); + break; + case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING: + BT_ASSERT(bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const( + msg)); + sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const( + msg, &clock_snapshot); + if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) { + goto no_clock_snapshot; + } + + break; + case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END: + BT_ASSERT(bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const( + msg)); + sa_cs_state = bt_message_stream_activity_end_borrow_default_clock_snapshot_const( + msg, &clock_snapshot); + if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) { + goto no_clock_snapshot; + } - case BT_MESSAGE_TYPE_INACTIVITY: + break; + case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY: cs_state = - bt_message_inactivity_borrow_default_clock_snapshot_const( + bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const( msg, &clock_snapshot); break; default: @@ -638,25 +584,46 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, goto end; } - if (cs_state != BT_CLOCK_SNAPSHOT_STATE_KNOWN) { - BT_LOGE_STR("Unsupported unknown clock snapshot."); - ret = -1; - goto end; + BT_ASSERT(cs_state == BT_CLOCK_SNAPSHOT_STATE_KNOWN); + ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns); + if (ret) { + BT_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: " + "clock-snapshot-addr=%p", clock_snapshot); + goto error; } - /* - * If the clock snapshot is missing, then we consider that this - * message has no time. In this case it's always the - * youngest. - */ - if (!clock_snapshot) { - BT_LOGV_STR("Message's default clock snapshot is missing: " - "using the last returned timestamp."); - *ts_ns = last_returned_ts_ns; - goto end; + goto end; + +no_clock_snapshot: + BT_LOGV_STR("Message's default clock snapshot is missing: " + "using the last returned timestamp."); + *ts_ns = last_returned_ts_ns; + goto end; + +error: + ret = -1; + +end: + if (ret == 0) { + BT_LOGV("Found message's timestamp: " + "muxer-msg-iter-addr=%p, msg-addr=%p, " + "last-returned-ts=%" PRId64 ", ts=%" PRId64, + muxer_msg_iter, msg, last_returned_ts_ns, + *ts_ns); } - clock_class = bt_clock_snapshot_borrow_clock_class_const(clock_snapshot); + return ret; +} + +static inline +int validate_clock_class(struct muxer_msg_iter *muxer_msg_iter, + struct muxer_comp *muxer_comp, + const bt_clock_class *clock_class) +{ + int ret = 0; + const unsigned char *cc_uuid; + const char *cc_name; + BT_ASSERT(clock_class); cc_uuid = bt_clock_class_get_uuid(clock_class); cc_name = bt_clock_class_get_name(clock_class); @@ -670,7 +637,7 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, * the iterator without a true * `assume-absolute-clock-classes` parameter. */ - if (bt_clock_class_is_absolute(clock_class)) { + if (bt_clock_class_origin_is_unix_epoch(clock_class)) { /* Expect absolute clock classes */ muxer_msg_iter->clock_class_expectation = MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE; @@ -698,7 +665,7 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, if (!muxer_comp->assume_absolute_clock_classes) { switch (muxer_msg_iter->clock_class_expectation) { case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE: - if (!bt_clock_class_is_absolute(clock_class)) { + if (!bt_clock_class_origin_is_unix_epoch(clock_class)) { BT_LOGE("Expecting an absolute clock class, " "but got a non-absolute one: " "clock-class-addr=%p, clock-class-name=\"%s\"", @@ -707,7 +674,7 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, } break; case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID: - if (bt_clock_class_is_absolute(clock_class)) { + if (bt_clock_class_origin_is_unix_epoch(clock_class)) { BT_LOGE("Expecting a non-absolute clock class with no UUID, " "but got an absolute one: " "clock-class-addr=%p, clock-class-name=\"%s\"", @@ -741,7 +708,7 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, } break; case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID: - if (bt_clock_class_is_absolute(clock_class)) { + if (bt_clock_class_origin_is_unix_epoch(clock_class)) { BT_LOGE("Expecting a non-absolute clock class with a specific UUID, " "but got an absolute one: " "clock-class-addr=%p, clock-class-name=\"%s\"", @@ -800,6 +767,11 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, goto error; } break; + case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE: + BT_LOGE("Expecting no clock class, but got one: " + "clock-class-addr=%p, clock-class-name=\"%s\"", + clock_class, cc_name); + goto error; default: /* Unexpected */ BT_LOGF("Unexpected clock class expectation: " @@ -809,27 +781,56 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, } } - ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns); - if (ret) { - BT_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: " - "clock-snapshot-addr=%p", clock_snapshot); - goto error; - } - goto end; error: ret = -1; end: - if (ret == 0) { - BT_LOGV("Found message's timestamp: " - "muxer-msg-iter-addr=%p, msg-addr=%p, " - "last-returned-ts=%" PRId64 ", ts=%" PRId64, - muxer_msg_iter, msg, last_returned_ts_ns, - *ts_ns); + return ret; +} + +static inline +int validate_new_stream_clock_class(struct muxer_msg_iter *muxer_msg_iter, + struct muxer_comp *muxer_comp, const bt_stream *stream) +{ + int ret = 0; + const bt_stream_class *stream_class = + bt_stream_borrow_class_const(stream); + const bt_clock_class *clock_class = + bt_stream_class_borrow_default_clock_class_const(stream_class); + + if (!clock_class) { + if (muxer_msg_iter->clock_class_expectation == + MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY) { + /* Expect no clock class */ + muxer_msg_iter->clock_class_expectation = + MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE; + } else { + BT_LOGE("Expecting stream class with a default clock class: " + "stream-class-addr=%p, stream-class-name=\"%s\", " + "stream-class-id=%" PRIu64, + stream_class, bt_stream_class_get_name(stream_class), + bt_stream_class_get_id(stream_class)); + ret = -1; + } + + goto end; + } + + if (!bt_stream_class_default_clock_is_always_known(stream_class)) { + BT_LOGE("Stream's default clock is not always known: " + "stream-class-addr=%p, stream-class-name=\"%s\", " + "stream-class-id=%" PRIu64, + stream_class, bt_stream_class_get_name(stream_class), + bt_stream_class_get_id(stream_class)); + ret = -1; + goto end; } + ret = validate_clock_class(muxer_msg_iter, muxer_comp, clock_class); + +end: return ret; } @@ -843,7 +844,6 @@ end: * This function does NOT: * * * Update any upstream message iterator. - * * Check for newly connected ports. * * Check the upstream message iterators to retry. * * On sucess, this function sets *muxer_upstream_msg_iter to the @@ -851,7 +851,7 @@ end: * the youngest, and sets *ts_ns to its time. */ static -bt_message_iterator_status +bt_self_message_iterator_status muxer_msg_iter_youngest_upstream_msg_iter( struct muxer_comp *muxer_comp, struct muxer_msg_iter *muxer_msg_iter, @@ -861,18 +861,21 @@ muxer_msg_iter_youngest_upstream_msg_iter( size_t i; int ret; int64_t youngest_ts_ns = INT64_MAX; - bt_message_iterator_status status = - BT_MESSAGE_ITERATOR_STATUS_OK; + bt_self_message_iterator_status status = + BT_SELF_MESSAGE_ITERATOR_STATUS_OK; BT_ASSERT(muxer_comp); BT_ASSERT(muxer_msg_iter); BT_ASSERT(muxer_upstream_msg_iter); *muxer_upstream_msg_iter = NULL; - for (i = 0; i < muxer_msg_iter->muxer_upstream_msg_iters->len; i++) { + for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len; + i++) { const bt_message *msg; struct muxer_upstream_msg_iter *cur_muxer_upstream_msg_iter = - g_ptr_array_index(muxer_msg_iter->muxer_upstream_msg_iters, i); + g_ptr_array_index( + muxer_msg_iter->active_muxer_upstream_msg_iters, + i); int64_t msg_ts_ns; if (!cur_muxer_upstream_msg_iter->msg_iter) { @@ -886,12 +889,53 @@ muxer_msg_iter_youngest_upstream_msg_iter( BT_ASSERT(cur_muxer_upstream_msg_iter->msgs->length > 0); msg = g_queue_peek_head(cur_muxer_upstream_msg_iter->msgs); BT_ASSERT(msg); + + if (unlikely(bt_message_get_type(msg) == + BT_MESSAGE_TYPE_STREAM_BEGINNING)) { + ret = validate_new_stream_clock_class( + muxer_msg_iter, muxer_comp, + bt_message_stream_beginning_borrow_stream_const( + msg)); + if (ret) { + /* + * validate_new_stream_clock_class() logs + * errors. + */ + status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; + goto end; + } + } else if (unlikely(bt_message_get_type(msg) == + BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY)) { + const bt_clock_snapshot *cs; + bt_clock_snapshot_state cs_state; + + cs_state = bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const( + msg, &cs); + + if (cs_state != BT_CLOCK_SNAPSHOT_STATE_KNOWN) { + BT_LOGE("Message iterator inactivity message's " + "default clock snapshot is unknown: " + "msg-addr=%p", + msg); + status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; + goto end; + } + + ret = validate_clock_class(muxer_msg_iter, muxer_comp, + bt_clock_snapshot_borrow_clock_class_const(cs)); + if (ret) { + /* validate_clock_class() logs errors */ + status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; + goto end; + } + } + ret = get_msg_ts_ns(muxer_comp, muxer_msg_iter, msg, muxer_msg_iter->last_returned_ts_ns, &msg_ts_ns); if (ret) { /* get_msg_ts_ns() logs errors */ *muxer_upstream_msg_iter = NULL; - status = BT_MESSAGE_ITERATOR_STATUS_ERROR; + status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; goto end; } @@ -904,7 +948,7 @@ muxer_msg_iter_youngest_upstream_msg_iter( } if (!*muxer_upstream_msg_iter) { - status = BT_MESSAGE_ITERATOR_STATUS_END; + status = BT_SELF_MESSAGE_ITERATOR_STATUS_END; *ts_ns = INT64_MIN; } @@ -913,11 +957,12 @@ end: } static -bt_message_iterator_status validate_muxer_upstream_msg_iter( - struct muxer_upstream_msg_iter *muxer_upstream_msg_iter) +bt_self_message_iterator_status validate_muxer_upstream_msg_iter( + struct muxer_upstream_msg_iter *muxer_upstream_msg_iter, + bool *is_ended) { - bt_message_iterator_status status = - BT_MESSAGE_ITERATOR_STATUS_OK; + bt_self_message_iterator_status status = + BT_SELF_MESSAGE_ITERATOR_STATUS_OK; BT_LOGV("Validating muxer's upstream message iterator wrapper: " "muxer-upstream-msg-iter-wrap-addr=%p", @@ -933,32 +978,35 @@ bt_message_iterator_status validate_muxer_upstream_msg_iter( } /* muxer_upstream_msg_iter_next() logs details/errors */ - status = muxer_upstream_msg_iter_next(muxer_upstream_msg_iter); + status = muxer_upstream_msg_iter_next(muxer_upstream_msg_iter, + is_ended); end: return status; } static -bt_message_iterator_status validate_muxer_upstream_msg_iters( +bt_self_message_iterator_status validate_muxer_upstream_msg_iters( struct muxer_msg_iter *muxer_msg_iter) { - bt_message_iterator_status status = - BT_MESSAGE_ITERATOR_STATUS_OK; + bt_self_message_iterator_status status = + BT_SELF_MESSAGE_ITERATOR_STATUS_OK; size_t i; BT_LOGV("Validating muxer's upstream message iterator wrappers: " "muxer-msg-iter-addr=%p", muxer_msg_iter); - for (i = 0; i < muxer_msg_iter->muxer_upstream_msg_iters->len; i++) { + for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len; + i++) { + bool is_ended = false; struct muxer_upstream_msg_iter *muxer_upstream_msg_iter = g_ptr_array_index( - muxer_msg_iter->muxer_upstream_msg_iters, + muxer_msg_iter->active_muxer_upstream_msg_iters, i); status = validate_muxer_upstream_msg_iter( - muxer_upstream_msg_iter); - if (status != BT_MESSAGE_ITERATOR_STATUS_OK) { + muxer_upstream_msg_iter, &is_ended); + if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) { if (status < 0) { BT_LOGE("Cannot validate muxer's upstream message iterator wrapper: " "muxer-msg-iter-addr=%p, " @@ -977,20 +1025,25 @@ bt_message_iterator_status validate_muxer_upstream_msg_iters( } /* - * Remove this muxer upstream message iterator - * if it's ended or canceled. + * Move this muxer upstream message iterator to the + * array of ended iterators if it's ended. */ - if (!muxer_upstream_msg_iter->msg_iter) { + if (unlikely(is_ended)) { + BT_LOGV("Muxer's upstream message iterator wrapper: ended or canceled: " + "muxer-msg-iter-addr=%p, " + "muxer-upstream-msg-iter-wrap-addr=%p", + muxer_msg_iter, muxer_upstream_msg_iter); + g_ptr_array_add( + muxer_msg_iter->ended_muxer_upstream_msg_iters, + muxer_upstream_msg_iter); + muxer_msg_iter->active_muxer_upstream_msg_iters->pdata[i] = NULL; + /* * Use g_ptr_array_remove_fast() because the * order of those elements is not important. */ - BT_LOGV("Removing muxer's upstream message iterator wrapper: ended or canceled: " - "muxer-msg-iter-addr=%p, " - "muxer-upstream-msg-iter-wrap-addr=%p", - muxer_msg_iter, muxer_upstream_msg_iter); g_ptr_array_remove_index_fast( - muxer_msg_iter->muxer_upstream_msg_iters, + muxer_msg_iter->active_muxer_upstream_msg_iters, i); i--; } @@ -1001,52 +1054,21 @@ end: } static inline -bt_message_iterator_status muxer_msg_iter_do_next_one( +bt_self_message_iterator_status muxer_msg_iter_do_next_one( struct muxer_comp *muxer_comp, struct muxer_msg_iter *muxer_msg_iter, const bt_message **msg) { - bt_message_iterator_status status = - BT_MESSAGE_ITERATOR_STATUS_OK; + bt_self_message_iterator_status status; struct muxer_upstream_msg_iter *muxer_upstream_msg_iter = NULL; int64_t next_return_ts; - while (true) { - int ret = muxer_msg_iter_handle_newly_connected_ports( - muxer_msg_iter); - - if (ret) { - BT_LOGE("Cannot handle newly connected input ports for muxer's message iterator: " - "muxer-comp-addr=%p, muxer-msg-iter-addr=%p, " - "ret=%d", - muxer_comp, muxer_msg_iter, ret); - status = BT_MESSAGE_ITERATOR_STATUS_ERROR; - goto end; - } - - status = validate_muxer_upstream_msg_iters(muxer_msg_iter); - if (status != BT_MESSAGE_ITERATOR_STATUS_OK) { - /* validate_muxer_upstream_msg_iters() logs details */ - goto end; - } - - /* - * At this point, we know that all the existing upstream - * message iterators are valid. However the - * operations to validate them (during - * validate_muxer_upstream_msg_iters()) may have - * connected new ports. If no ports were connected - * during this operation, exit the loop. - */ - if (!muxer_msg_iter->newly_connected_self_ports) { - BT_LOGV("Not breaking this loop: muxer's message iterator still has newly connected input ports to handle: " - "muxer-comp-addr=%p", muxer_comp); - break; - } + status = validate_muxer_upstream_msg_iters(muxer_msg_iter); + if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) { + /* validate_muxer_upstream_msg_iters() logs details */ + goto end; } - BT_ASSERT(!muxer_msg_iter->newly_connected_self_ports); - /* * At this point we know that all the existing upstream * message iterators are valid. We can find the one, @@ -1056,16 +1078,15 @@ bt_message_iterator_status muxer_msg_iter_do_next_one( status = muxer_msg_iter_youngest_upstream_msg_iter(muxer_comp, muxer_msg_iter, &muxer_upstream_msg_iter, &next_return_ts); - if (status < 0 || status == BT_MESSAGE_ITERATOR_STATUS_END || - status == BT_MESSAGE_ITERATOR_STATUS_CANCELED) { + if (status < 0 || status == BT_SELF_MESSAGE_ITERATOR_STATUS_END) { if (status < 0) { BT_LOGE("Cannot find the youngest upstream message iterator wrapper: " "status=%s", - bt_message_iterator_status_string(status)); + bt_common_self_message_iterator_status_string(status)); } else { BT_LOGV("Cannot find the youngest upstream message iterator wrapper: " "status=%s", - bt_message_iterator_status_string(status)); + bt_common_self_message_iterator_status_string(status)); } goto end; @@ -1077,7 +1098,7 @@ bt_message_iterator_status muxer_msg_iter_do_next_one( "last-returned-ts=%" PRId64, muxer_msg_iter, next_return_ts, muxer_msg_iter->last_returned_ts_ns); - status = BT_MESSAGE_ITERATOR_STATUS_ERROR; + status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; goto end; } @@ -1086,7 +1107,7 @@ bt_message_iterator_status muxer_msg_iter_do_next_one( "muxer-upstream-msg-iter-wrap-addr=%p, " "ts=%" PRId64, muxer_msg_iter, muxer_upstream_msg_iter, next_return_ts); - BT_ASSERT(status == BT_MESSAGE_ITERATOR_STATUS_OK); + BT_ASSERT(status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK); BT_ASSERT(muxer_upstream_msg_iter); /* @@ -1102,20 +1123,20 @@ end: } static -bt_message_iterator_status muxer_msg_iter_do_next( +bt_self_message_iterator_status muxer_msg_iter_do_next( struct muxer_comp *muxer_comp, struct muxer_msg_iter *muxer_msg_iter, bt_message_array_const msgs, uint64_t capacity, uint64_t *count) { - bt_message_iterator_status status = - BT_MESSAGE_ITERATOR_STATUS_OK; + bt_self_message_iterator_status status = + BT_SELF_MESSAGE_ITERATOR_STATUS_OK; uint64_t i = 0; - while (i < capacity && status == BT_MESSAGE_ITERATOR_STATUS_OK) { + while (i < capacity && status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK) { status = muxer_msg_iter_do_next_one(muxer_comp, muxer_msg_iter, &msgs[i]); - if (status == BT_MESSAGE_ITERATOR_STATUS_OK) { + if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK) { i++; } } @@ -1134,7 +1155,7 @@ bt_message_iterator_status muxer_msg_iter_do_next( * message, in which case we'll return it. */ *count = i; - status = BT_MESSAGE_ITERATOR_STATUS_OK; + status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK; } return status; @@ -1150,29 +1171,29 @@ void destroy_muxer_msg_iter(struct muxer_msg_iter *muxer_msg_iter) BT_LOGD("Destroying muxer component's message iterator: " "muxer-msg-iter-addr=%p", muxer_msg_iter); - if (muxer_msg_iter->muxer_upstream_msg_iters) { - BT_LOGD_STR("Destroying muxer's upstream message iterator wrappers."); + if (muxer_msg_iter->active_muxer_upstream_msg_iters) { + BT_LOGD_STR("Destroying muxer's active upstream message iterator wrappers."); g_ptr_array_free( - muxer_msg_iter->muxer_upstream_msg_iters, TRUE); + muxer_msg_iter->active_muxer_upstream_msg_iters, TRUE); + } + + if (muxer_msg_iter->ended_muxer_upstream_msg_iters) { + BT_LOGD_STR("Destroying muxer's ended upstream message iterator wrappers."); + g_ptr_array_free( + muxer_msg_iter->ended_muxer_upstream_msg_iters, TRUE); } - g_list_free(muxer_msg_iter->newly_connected_self_ports); g_free(muxer_msg_iter); } static -int muxer_msg_iter_init_newly_connected_ports(struct muxer_comp *muxer_comp, +int muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, struct muxer_msg_iter *muxer_msg_iter) { int64_t count; int64_t i; int ret = 0; - /* - * Add the connected input ports to this muxer message - * iterator's list of newly connected ports. They will be - * handled by muxer_msg_iter_handle_newly_connected_ports(). - */ count = bt_component_filter_get_input_port_count( bt_self_component_filter_as_component_filter( muxer_comp->self_comp)); @@ -1184,6 +1205,8 @@ int muxer_msg_iter_init_newly_connected_ports(struct muxer_comp *muxer_comp, } for (i = 0; i < count; i++) { + bt_self_component_port_input_message_iterator *upstream_msg_iter; + struct muxer_upstream_msg_iter *muxer_upstream_msg_iter; bt_self_component_port_input *self_port = bt_self_component_filter_borrow_input_port_by_index( muxer_comp->self_comp, i); @@ -1196,29 +1219,28 @@ int muxer_msg_iter_init_newly_connected_ports(struct muxer_comp *muxer_comp, BT_ASSERT(port); if (!bt_port_is_connected(port)) { - BT_LOGD("Skipping input port: not connected: " - "muxer-comp-addr=%p, port-addr=%p, port-name\"%s\"", - muxer_comp, port, bt_port_get_name(port)); + /* Skip non-connected port */ continue; } - muxer_msg_iter->newly_connected_self_ports = - g_list_append( - muxer_msg_iter->newly_connected_self_ports, - self_port); - if (!muxer_msg_iter->newly_connected_self_ports) { - BT_LOGE("Cannot append port to muxer's message iterator list of newly connected input ports: " - "port-addr=%p, port-name=\"%s\", " - "muxer-msg-iter-addr=%p", port, - bt_port_get_name(port), muxer_msg_iter); + upstream_msg_iter = create_msg_iter_on_input_port(self_port); + if (!upstream_msg_iter) { + /* create_msg_iter_on_input_port() logs errors */ + BT_ASSERT(!upstream_msg_iter); ret = -1; goto end; } - BT_LOGD("Appended port to muxer's message iterator list of newly connected input ports: " - "port-addr=%p, port-name=\"%s\", " - "muxer-msg-iter-addr=%p", port, - bt_port_get_name(port), muxer_msg_iter); + muxer_upstream_msg_iter = + muxer_msg_iter_add_upstream_msg_iter( + muxer_msg_iter, upstream_msg_iter); + bt_self_component_port_input_message_iterator_put_ref( + upstream_msg_iter); + if (!muxer_upstream_msg_iter) { + /* muxer_msg_iter_add_upstream_msg_iter() logs errors */ + ret = -1; + goto end; + } } end: @@ -1234,7 +1256,7 @@ bt_self_message_iterator_status muxer_msg_iter_init( struct muxer_comp *muxer_comp = NULL; struct muxer_msg_iter *muxer_msg_iter = NULL; bt_self_message_iterator_status status = - BT_MESSAGE_ITERATOR_STATUS_OK; + BT_SELF_MESSAGE_ITERATOR_STATUS_OK; int ret; muxer_comp = bt_self_component_get_data( @@ -1264,27 +1286,26 @@ bt_self_message_iterator_status muxer_msg_iter_init( } muxer_msg_iter->last_returned_ts_ns = INT64_MIN; - muxer_msg_iter->muxer_upstream_msg_iters = + muxer_msg_iter->active_muxer_upstream_msg_iters = g_ptr_array_new_with_free_func( (GDestroyNotify) destroy_muxer_upstream_msg_iter); - if (!muxer_msg_iter->muxer_upstream_msg_iters) { + if (!muxer_msg_iter->active_muxer_upstream_msg_iters) { BT_LOGE_STR("Failed to allocate a GPtrArray."); goto error; } - /* - * Add the muxer message iterator to the component's array - * of muxer message iterators here because - * muxer_msg_iter_init_newly_connected_ports() can cause - * muxer_port_connected() to be called, which adds the newly - * connected port to each muxer message iterator's list of - * newly connected ports. - */ - g_ptr_array_add(muxer_comp->muxer_msg_iters, muxer_msg_iter); - ret = muxer_msg_iter_init_newly_connected_ports(muxer_comp, + muxer_msg_iter->ended_muxer_upstream_msg_iters = + g_ptr_array_new_with_free_func( + (GDestroyNotify) destroy_muxer_upstream_msg_iter); + if (!muxer_msg_iter->ended_muxer_upstream_msg_iters) { + BT_LOGE_STR("Failed to allocate a GPtrArray."); + goto error; + } + + ret = muxer_msg_iter_init_upstream_iterators(muxer_comp, muxer_msg_iter); if (ret) { - BT_LOGE("Cannot initialize newly connected input ports for muxer component's message iterator: " + BT_LOGE("Cannot initialize connected input ports for muxer component's message iterator: " "comp-addr=%p, muxer-comp-addr=%p, " "muxer-msg-iter-addr=%p, msg-iter-addr=%p, ret=%d", self_comp, muxer_comp, muxer_msg_iter, @@ -1292,8 +1313,7 @@ bt_self_message_iterator_status muxer_msg_iter_init( goto error; } - bt_self_message_iterator_set_data(self_msg_iter, - muxer_msg_iter); + bt_self_message_iterator_set_data(self_msg_iter, muxer_msg_iter); BT_LOGD("Initialized muxer component's message iterator: " "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, " "msg-iter-addr=%p", @@ -1301,16 +1321,9 @@ bt_self_message_iterator_status muxer_msg_iter_init( goto end; error: - if (g_ptr_array_index(muxer_comp->muxer_msg_iters, - muxer_comp->muxer_msg_iters->len - 1) == muxer_msg_iter) { - g_ptr_array_remove_index(muxer_comp->muxer_msg_iters, - muxer_comp->muxer_msg_iters->len - 1); - } - destroy_muxer_msg_iter(muxer_msg_iter); - bt_self_message_iterator_set_data(self_msg_iter, - NULL); - status = BT_MESSAGE_ITERATOR_STATUS_ERROR; + bt_self_message_iterator_set_data(self_msg_iter, NULL); + status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; end: muxer_comp->initializing_muxer_msg_iter = false; @@ -1318,8 +1331,7 @@ end: } BT_HIDDEN -void muxer_msg_iter_finalize( - bt_self_message_iterator *self_msg_iter) +void muxer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter) { struct muxer_msg_iter *muxer_msg_iter = bt_self_message_iterator_get_data(self_msg_iter); @@ -1335,20 +1347,18 @@ void muxer_msg_iter_finalize( "msg-iter-addr=%p", self_comp, muxer_comp, muxer_msg_iter, self_msg_iter); - if (muxer_comp) { - (void) g_ptr_array_remove_fast(muxer_comp->muxer_msg_iters, - muxer_msg_iter); + if (muxer_msg_iter) { destroy_muxer_msg_iter(muxer_msg_iter); } } BT_HIDDEN -bt_message_iterator_status muxer_msg_iter_next( +bt_self_message_iterator_status muxer_msg_iter_next( bt_self_message_iterator *self_msg_iter, bt_message_array_const msgs, uint64_t capacity, uint64_t *count) { - bt_message_iterator_status status; + bt_self_message_iterator_status status; struct muxer_msg_iter *muxer_msg_iter = bt_self_message_iterator_get_data(self_msg_iter); bt_self_component *self_comp = NULL; @@ -1372,11 +1382,11 @@ bt_message_iterator_status muxer_msg_iter_next( "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, " "msg-iter-addr=%p, status=%s", self_comp, muxer_comp, muxer_msg_iter, self_msg_iter, - bt_message_iterator_status_string(status)); + bt_common_self_message_iterator_status_string(status)); } else { BT_LOGV("Returning from muxer component's message iterator's \"next\" method: " "status=%s", - bt_message_iterator_status_string(status)); + bt_common_self_message_iterator_status_string(status)); } return status; @@ -1388,98 +1398,126 @@ bt_self_component_status muxer_input_port_connected( bt_self_component_port_input *self_port, const bt_port_output *other_port) { - bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK; - const bt_port *port = bt_self_component_port_as_port( - bt_self_component_port_input_as_self_component_port( - self_port)); - struct muxer_comp *muxer_comp = - bt_self_component_get_data( - bt_self_component_filter_as_self_component( - self_comp)); - size_t i; - int ret; - - BT_ASSERT(port); - BT_ASSERT(muxer_comp); - BT_LOGD("Port connected: " - "comp-addr=%p, muxer-comp-addr=%p, " - "port-addr=%p, port-name=\"%s\", " - "other-port-addr=%p, other-port-name=\"%s\"", - self_comp, muxer_comp, self_port, bt_port_get_name(port), - other_port, - bt_port_get_name(bt_port_output_as_port_const(other_port))); - - for (i = 0; i < muxer_comp->muxer_msg_iters->len; i++) { - struct muxer_msg_iter *muxer_msg_iter = - g_ptr_array_index(muxer_comp->muxer_msg_iters, i); + bt_self_component_status status; + status = add_available_input_port(self_comp); + if (status) { /* - * Add this port to the list of newly connected ports - * for this muxer message iterator. We append at - * the end of this list while - * muxer_msg_iter_handle_newly_connected_ports() - * removes the nodes from the beginning. + * Only way to report an error later since this + * method does not return anything. */ - muxer_msg_iter->newly_connected_self_ports = - g_list_append( - muxer_msg_iter->newly_connected_self_ports, - self_port); - if (!muxer_msg_iter->newly_connected_self_ports) { - BT_LOGE("Cannot append port to muxer's message iterator list of newly connected input ports: " - "port-addr=%p, port-name=\"%s\", " - "muxer-msg-iter-addr=%p", self_port, - bt_port_get_name(port), muxer_msg_iter); - status = BT_SELF_COMPONENT_STATUS_ERROR; + BT_LOGE("Cannot add one muxer component's input port: " + "status=%s", + bt_self_component_status_string(status)); + goto end; + } + +end: + return status; +} + +static inline +bt_bool muxer_upstream_msg_iters_can_all_seek_beginning( + GPtrArray *muxer_upstream_msg_iters) +{ + uint64_t i; + bt_bool ret = BT_TRUE; + + for (i = 0; i < muxer_upstream_msg_iters->len; i++) { + struct muxer_upstream_msg_iter *upstream_msg_iter = + muxer_upstream_msg_iters->pdata[i]; + + if (!bt_self_component_port_input_message_iterator_can_seek_beginning( + upstream_msg_iter->msg_iter)) { + ret = BT_FALSE; goto end; } + } + +end: + return ret; +} + +BT_HIDDEN +bt_bool muxer_msg_iter_can_seek_beginning( + bt_self_message_iterator *self_msg_iter) +{ + struct muxer_msg_iter *muxer_msg_iter = + bt_self_message_iterator_get_data(self_msg_iter); + bt_bool ret = BT_TRUE; - BT_LOGD("Appended port to muxer's message iterator list of newly connected input ports: " - "port-addr=%p, port-name=\"%s\", " - "muxer-msg-iter-addr=%p", self_port, - bt_port_get_name(port), muxer_msg_iter); + if (!muxer_upstream_msg_iters_can_all_seek_beginning( + muxer_msg_iter->active_muxer_upstream_msg_iters)) { + ret = BT_FALSE; + goto end; } - /* One less available input port */ - muxer_comp->available_input_ports--; - ret = ensure_available_input_port(self_comp); - if (ret) { - /* - * Only way to report an error later since this - * method does not return anything. - */ - BT_LOGE("Cannot ensure that at least one muxer component's input port is available: " - "muxer-comp-addr=%p, status=%s", - muxer_comp, bt_self_component_status_string(ret)); - status = BT_SELF_COMPONENT_STATUS_ERROR; + if (!muxer_upstream_msg_iters_can_all_seek_beginning( + muxer_msg_iter->ended_muxer_upstream_msg_iters)) { + ret = BT_FALSE; goto end; } end: - return status; + return ret; } BT_HIDDEN -void muxer_input_port_disconnected( - bt_self_component_filter *self_component, - bt_self_component_port_input *self_port) +bt_self_message_iterator_status muxer_msg_iter_seek_beginning( + bt_self_message_iterator *self_msg_iter) { - struct muxer_comp *muxer_comp = - bt_self_component_get_data( - bt_self_component_filter_as_self_component( - self_component)); - const bt_port *port = - bt_self_component_port_as_port( - bt_self_component_port_input_as_self_component_port( - self_port)); + struct muxer_msg_iter *muxer_msg_iter = + bt_self_message_iterator_get_data(self_msg_iter); + bt_message_iterator_status status = BT_MESSAGE_ITERATOR_STATUS_OK; + uint64_t i; - BT_ASSERT(port); - BT_ASSERT(muxer_comp); + /* Seek all ended upstream iterators first */ + for (i = 0; i < muxer_msg_iter->ended_muxer_upstream_msg_iters->len; + i++) { + struct muxer_upstream_msg_iter *upstream_msg_iter = + muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i]; - /* One more available input port */ - muxer_comp->available_input_ports++; - BT_LOGD("Leaving disconnected input port available for future connections: " - "comp-addr=%p, muxer-comp-addr=%p, port-addr=%p, " - "port-name=\"%s\", avail-input-port-count=%zu", - self_component, muxer_comp, port, bt_port_get_name(port), - muxer_comp->available_input_ports); + status = bt_self_component_port_input_message_iterator_seek_beginning( + upstream_msg_iter->msg_iter); + if (status != BT_MESSAGE_ITERATOR_STATUS_OK) { + goto end; + } + + empty_message_queue(upstream_msg_iter); + } + + /* Seek all previously active upstream iterators */ + for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len; + i++) { + struct muxer_upstream_msg_iter *upstream_msg_iter = + muxer_msg_iter->active_muxer_upstream_msg_iters->pdata[i]; + + status = bt_self_component_port_input_message_iterator_seek_beginning( + upstream_msg_iter->msg_iter); + if (status != BT_MESSAGE_ITERATOR_STATUS_OK) { + goto end; + } + + empty_message_queue(upstream_msg_iter); + } + + /* Make them all active */ + for (i = 0; i < muxer_msg_iter->ended_muxer_upstream_msg_iters->len; + i++) { + struct muxer_upstream_msg_iter *upstream_msg_iter = + muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i]; + + g_ptr_array_add(muxer_msg_iter->active_muxer_upstream_msg_iters, + upstream_msg_iter); + muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i] = NULL; + } + + g_ptr_array_remove_range(muxer_msg_iter->ended_muxer_upstream_msg_iters, + 0, muxer_msg_iter->ended_muxer_upstream_msg_iters->len); + muxer_msg_iter->last_returned_ts_ns = INT64_MIN; + muxer_msg_iter->clock_class_expectation = + MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY; + +end: + return (bt_self_message_iterator_status) status; }