X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=plugins%2Futils%2Fmuxer%2Fmuxer.c;h=47d790b3d9a14e59387e7d611c984077680d608e;hb=6c20f4a0c496cfac67ceabe876edc796929ffb64;hp=aa5cbb23645c1bef5522e5c2cfc0da25b478962d;hpb=5badd463e184894a3bfd5b8db257efc6f92c6374;p=babeltrace.git diff --git a/plugins/utils/muxer/muxer.c b/plugins/utils/muxer/muxer.c index aa5cbb23..47d790b3 100644 --- a/plugins/utils/muxer/muxer.c +++ b/plugins/utils/muxer/muxer.c @@ -63,6 +63,7 @@ struct muxer_upstream_msg_iter { enum muxer_msg_iter_clock_class_expectation { MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY = 0, + MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE, MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE, MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID, MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID, @@ -77,7 +78,16 @@ struct muxer_msg_iter { * another data structure is faster than this for our typical * use cases. */ - GPtrArray *muxer_upstream_msg_iters; + GPtrArray *active_muxer_upstream_msg_iters; + + /* + * Array of struct muxer_upstream_msg_iter * (owned by this). + * + * We move ended message iterators from + * `active_muxer_upstream_msg_iters` to this array so as to be + * able to restore them when seeking. + */ + GPtrArray *ended_muxer_upstream_msg_iters; /* Last time returned in a message */ int64_t last_returned_ts_ns; @@ -93,6 +103,16 @@ struct muxer_msg_iter { unsigned char expected_clock_class_uuid[BABELTRACE_UUID_LEN]; }; +static +void empty_message_queue(struct muxer_upstream_msg_iter *upstream_msg_iter) +{ + const bt_message *msg; + + while ((msg = g_queue_pop_head(upstream_msg_iter->msgs))) { + bt_message_put_ref(msg); + } +} + static void destroy_muxer_upstream_msg_iter( struct muxer_upstream_msg_iter *muxer_upstream_msg_iter) @@ -106,16 +126,11 @@ void destroy_muxer_upstream_msg_iter( muxer_upstream_msg_iter, muxer_upstream_msg_iter->msg_iter, muxer_upstream_msg_iter->msgs->length); - bt_self_component_port_input_message_iterator_put_ref(muxer_upstream_msg_iter->msg_iter); + bt_self_component_port_input_message_iterator_put_ref( + muxer_upstream_msg_iter->msg_iter); if (muxer_upstream_msg_iter->msgs) { - const bt_message *msg; - - while ((msg = g_queue_pop_head( - muxer_upstream_msg_iter->msgs))) { - bt_message_put_ref(msg); - } - + empty_message_queue(muxer_upstream_msg_iter); g_queue_free(muxer_upstream_msg_iter->msgs); } @@ -132,7 +147,7 @@ struct muxer_upstream_msg_iter *muxer_msg_iter_add_upstream_msg_iter( if (!muxer_upstream_msg_iter) { BT_LOGE_STR("Failed to allocate one muxer's upstream message iterator wrapper."); - goto end; + goto error; } muxer_upstream_msg_iter->msg_iter = self_msg_iter; @@ -140,16 +155,22 @@ struct muxer_upstream_msg_iter *muxer_msg_iter_add_upstream_msg_iter( muxer_upstream_msg_iter->msgs = g_queue_new(); if (!muxer_upstream_msg_iter->msgs) { BT_LOGE_STR("Failed to allocate a GQueue."); - goto end; + goto error; } - g_ptr_array_add(muxer_msg_iter->muxer_upstream_msg_iters, + g_ptr_array_add(muxer_msg_iter->active_muxer_upstream_msg_iters, muxer_upstream_msg_iter); BT_LOGD("Added muxer's upstream message iterator wrapper: " "addr=%p, muxer-msg-iter-addr=%p, msg-iter-addr=%p", muxer_upstream_msg_iter, muxer_msg_iter, self_msg_iter); + goto end; + +error: + g_free(muxer_upstream_msg_iter); + muxer_upstream_msg_iter = NULL; + end: return muxer_upstream_msg_iter; } @@ -409,7 +430,8 @@ end: static bt_self_message_iterator_status muxer_upstream_msg_iter_next( - struct muxer_upstream_msg_iter *muxer_upstream_msg_iter) + struct muxer_upstream_msg_iter *muxer_upstream_msg_iter, + bool *is_ended) { bt_self_message_iterator_status status; bt_message_iterator_status input_port_iter_status; @@ -461,7 +483,7 @@ bt_self_message_iterator_status muxer_upstream_msg_iter_next( * won't be considered again to find the youngest * message. */ - BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(muxer_upstream_msg_iter->msg_iter); + *is_ended = true; status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK; break; default: @@ -481,81 +503,58 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, const bt_message *msg, int64_t last_returned_ts_ns, int64_t *ts_ns) { - const bt_clock_class *clock_class = NULL; const bt_clock_snapshot *clock_snapshot = NULL; int ret = 0; - const unsigned char *cc_uuid; - const char *cc_name; bt_clock_snapshot_state cs_state = BT_CLOCK_SNAPSHOT_STATE_KNOWN; bt_message_stream_activity_clock_snapshot_state sa_cs_state; BT_ASSERT(msg); BT_ASSERT(ts_ns); - BT_LOGV("Getting message's timestamp: " "muxer-msg-iter-addr=%p, msg-addr=%p, " "last-returned-ts=%" PRId64, muxer_msg_iter, msg, last_returned_ts_ns); + if (unlikely(muxer_msg_iter->clock_class_expectation == + MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE)) { + *ts_ns = last_returned_ts_ns; + goto end; + } + switch (bt_message_get_type(msg)) { case BT_MESSAGE_TYPE_EVENT: - clock_class = - bt_message_event_borrow_stream_class_default_clock_class_const( - msg); - if (!clock_class) { - goto no_clock_snapshot; - } - + BT_ASSERT(bt_message_event_borrow_stream_class_default_clock_class_const( + msg)); cs_state = bt_message_event_borrow_default_clock_snapshot_const( msg, &clock_snapshot); break; case BT_MESSAGE_TYPE_PACKET_BEGINNING: - bt_message_packet_beginning_borrow_stream_class_default_clock_class_const( - msg); - if (!clock_class) { - goto no_clock_snapshot; - } - + BT_ASSERT(bt_message_packet_beginning_borrow_stream_class_default_clock_class_const( + msg)); cs_state = bt_message_packet_beginning_borrow_default_clock_snapshot_const( msg, &clock_snapshot); break; case BT_MESSAGE_TYPE_PACKET_END: - bt_message_packet_end_borrow_stream_class_default_clock_class_const( - msg); - if (!clock_class) { - goto no_clock_snapshot; - } - + BT_ASSERT(bt_message_packet_end_borrow_stream_class_default_clock_class_const( + msg)); cs_state = bt_message_packet_end_borrow_default_clock_snapshot_const( msg, &clock_snapshot); break; case BT_MESSAGE_TYPE_DISCARDED_EVENTS: - bt_message_discarded_events_borrow_stream_class_default_clock_class_const( - msg); - if (!clock_class) { - goto no_clock_snapshot; - } - + BT_ASSERT(bt_message_discarded_events_borrow_stream_class_default_clock_class_const( + msg)); cs_state = bt_message_discarded_events_borrow_default_beginning_clock_snapshot_const( msg, &clock_snapshot); break; case BT_MESSAGE_TYPE_DISCARDED_PACKETS: - bt_message_discarded_packets_borrow_stream_class_default_clock_class_const( - msg); - if (!clock_class) { - goto no_clock_snapshot; - } - + BT_ASSERT(bt_message_discarded_packets_borrow_stream_class_default_clock_class_const( + msg)); cs_state = bt_message_discarded_packets_borrow_default_beginning_clock_snapshot_const( msg, &clock_snapshot); break; case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING: - bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const( - msg); - if (!clock_class) { - goto no_clock_snapshot; - } - + BT_ASSERT(bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const( + msg)); sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const( msg, &clock_snapshot); if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) { @@ -564,12 +563,8 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, break; case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END: - bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const( - msg); - if (!clock_class) { - goto no_clock_snapshot; - } - + BT_ASSERT(bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const( + msg)); sa_cs_state = bt_message_stream_activity_end_borrow_default_clock_snapshot_const( msg, &clock_snapshot); if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) { @@ -577,9 +572,9 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, } break; - case BT_MESSAGE_TYPE_INACTIVITY: + case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY: cs_state = - bt_message_inactivity_borrow_default_clock_snapshot_const( + bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const( msg, &clock_snapshot); break; default: @@ -589,13 +584,46 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, goto end; } - if (cs_state != BT_CLOCK_SNAPSHOT_STATE_KNOWN) { - BT_LOGE_STR("Unsupported unknown clock snapshot."); - ret = -1; - goto end; + BT_ASSERT(cs_state == BT_CLOCK_SNAPSHOT_STATE_KNOWN); + ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns); + if (ret) { + BT_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: " + "clock-snapshot-addr=%p", clock_snapshot); + goto error; + } + + goto end; + +no_clock_snapshot: + BT_LOGV_STR("Message's default clock snapshot is missing: " + "using the last returned timestamp."); + *ts_ns = last_returned_ts_ns; + goto end; + +error: + ret = -1; + +end: + if (ret == 0) { + BT_LOGV("Found message's timestamp: " + "muxer-msg-iter-addr=%p, msg-addr=%p, " + "last-returned-ts=%" PRId64 ", ts=%" PRId64, + muxer_msg_iter, msg, last_returned_ts_ns, + *ts_ns); } - clock_class = bt_clock_snapshot_borrow_clock_class_const(clock_snapshot); + return ret; +} + +static inline +int validate_clock_class(struct muxer_msg_iter *muxer_msg_iter, + struct muxer_comp *muxer_comp, + const bt_clock_class *clock_class) +{ + int ret = 0; + const unsigned char *cc_uuid; + const char *cc_name; + BT_ASSERT(clock_class); cc_uuid = bt_clock_class_get_uuid(clock_class); cc_name = bt_clock_class_get_name(clock_class); @@ -739,6 +767,11 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, goto error; } break; + case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE: + BT_LOGE("Expecting no clock class, but got one: " + "clock-class-addr=%p, clock-class-name=\"%s\"", + clock_class, cc_name); + goto error; default: /* Unexpected */ BT_LOGF("Unexpected clock class expectation: " @@ -748,33 +781,56 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, } } - ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns); - if (ret) { - BT_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: " - "clock-snapshot-addr=%p", clock_snapshot); - goto error; - } - - goto end; - -no_clock_snapshot: - BT_LOGV_STR("Message's default clock snapshot is missing: " - "using the last returned timestamp."); - *ts_ns = last_returned_ts_ns; goto end; error: ret = -1; end: - if (ret == 0) { - BT_LOGV("Found message's timestamp: " - "muxer-msg-iter-addr=%p, msg-addr=%p, " - "last-returned-ts=%" PRId64 ", ts=%" PRId64, - muxer_msg_iter, msg, last_returned_ts_ns, - *ts_ns); + return ret; +} + +static inline +int validate_new_stream_clock_class(struct muxer_msg_iter *muxer_msg_iter, + struct muxer_comp *muxer_comp, const bt_stream *stream) +{ + int ret = 0; + const bt_stream_class *stream_class = + bt_stream_borrow_class_const(stream); + const bt_clock_class *clock_class = + bt_stream_class_borrow_default_clock_class_const(stream_class); + + if (!clock_class) { + if (muxer_msg_iter->clock_class_expectation == + MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY) { + /* Expect no clock class */ + muxer_msg_iter->clock_class_expectation = + MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE; + } else { + BT_LOGE("Expecting stream class with a default clock class: " + "stream-class-addr=%p, stream-class-name=\"%s\", " + "stream-class-id=%" PRIu64, + stream_class, bt_stream_class_get_name(stream_class), + bt_stream_class_get_id(stream_class)); + ret = -1; + } + + goto end; + } + + if (!bt_stream_class_default_clock_is_always_known(stream_class)) { + BT_LOGE("Stream's default clock is not always known: " + "stream-class-addr=%p, stream-class-name=\"%s\", " + "stream-class-id=%" PRIu64, + stream_class, bt_stream_class_get_name(stream_class), + bt_stream_class_get_id(stream_class)); + ret = -1; + goto end; } + ret = validate_clock_class(muxer_msg_iter, muxer_comp, clock_class); + +end: return ret; } @@ -813,10 +869,13 @@ muxer_msg_iter_youngest_upstream_msg_iter( BT_ASSERT(muxer_upstream_msg_iter); *muxer_upstream_msg_iter = NULL; - for (i = 0; i < muxer_msg_iter->muxer_upstream_msg_iters->len; i++) { + for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len; + i++) { const bt_message *msg; struct muxer_upstream_msg_iter *cur_muxer_upstream_msg_iter = - g_ptr_array_index(muxer_msg_iter->muxer_upstream_msg_iters, i); + g_ptr_array_index( + muxer_msg_iter->active_muxer_upstream_msg_iters, + i); int64_t msg_ts_ns; if (!cur_muxer_upstream_msg_iter->msg_iter) { @@ -830,6 +889,47 @@ muxer_msg_iter_youngest_upstream_msg_iter( BT_ASSERT(cur_muxer_upstream_msg_iter->msgs->length > 0); msg = g_queue_peek_head(cur_muxer_upstream_msg_iter->msgs); BT_ASSERT(msg); + + if (unlikely(bt_message_get_type(msg) == + BT_MESSAGE_TYPE_STREAM_BEGINNING)) { + ret = validate_new_stream_clock_class( + muxer_msg_iter, muxer_comp, + bt_message_stream_beginning_borrow_stream_const( + msg)); + if (ret) { + /* + * validate_new_stream_clock_class() logs + * errors. + */ + status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; + goto end; + } + } else if (unlikely(bt_message_get_type(msg) == + BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY)) { + const bt_clock_snapshot *cs; + bt_clock_snapshot_state cs_state; + + cs_state = bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const( + msg, &cs); + + if (cs_state != BT_CLOCK_SNAPSHOT_STATE_KNOWN) { + BT_LOGE("Message iterator inactivity message's " + "default clock snapshot is unknown: " + "msg-addr=%p", + msg); + status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; + goto end; + } + + ret = validate_clock_class(muxer_msg_iter, muxer_comp, + bt_clock_snapshot_borrow_clock_class_const(cs)); + if (ret) { + /* validate_clock_class() logs errors */ + status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; + goto end; + } + } + ret = get_msg_ts_ns(muxer_comp, muxer_msg_iter, msg, muxer_msg_iter->last_returned_ts_ns, &msg_ts_ns); if (ret) { @@ -858,7 +958,8 @@ end: static bt_self_message_iterator_status validate_muxer_upstream_msg_iter( - struct muxer_upstream_msg_iter *muxer_upstream_msg_iter) + struct muxer_upstream_msg_iter *muxer_upstream_msg_iter, + bool *is_ended) { bt_self_message_iterator_status status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK; @@ -877,7 +978,8 @@ bt_self_message_iterator_status validate_muxer_upstream_msg_iter( } /* muxer_upstream_msg_iter_next() logs details/errors */ - status = muxer_upstream_msg_iter_next(muxer_upstream_msg_iter); + status = muxer_upstream_msg_iter_next(muxer_upstream_msg_iter, + is_ended); end: return status; @@ -894,14 +996,16 @@ bt_self_message_iterator_status validate_muxer_upstream_msg_iters( BT_LOGV("Validating muxer's upstream message iterator wrappers: " "muxer-msg-iter-addr=%p", muxer_msg_iter); - for (i = 0; i < muxer_msg_iter->muxer_upstream_msg_iters->len; i++) { + for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len; + i++) { + bool is_ended = false; struct muxer_upstream_msg_iter *muxer_upstream_msg_iter = g_ptr_array_index( - muxer_msg_iter->muxer_upstream_msg_iters, + muxer_msg_iter->active_muxer_upstream_msg_iters, i); status = validate_muxer_upstream_msg_iter( - muxer_upstream_msg_iter); + muxer_upstream_msg_iter, &is_ended); if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) { if (status < 0) { BT_LOGE("Cannot validate muxer's upstream message iterator wrapper: " @@ -921,20 +1025,25 @@ bt_self_message_iterator_status validate_muxer_upstream_msg_iters( } /* - * Remove this muxer upstream message iterator - * if it's ended or canceled. + * Move this muxer upstream message iterator to the + * array of ended iterators if it's ended. */ - if (!muxer_upstream_msg_iter->msg_iter) { + if (unlikely(is_ended)) { + BT_LOGV("Muxer's upstream message iterator wrapper: ended or canceled: " + "muxer-msg-iter-addr=%p, " + "muxer-upstream-msg-iter-wrap-addr=%p", + muxer_msg_iter, muxer_upstream_msg_iter); + g_ptr_array_add( + muxer_msg_iter->ended_muxer_upstream_msg_iters, + muxer_upstream_msg_iter); + muxer_msg_iter->active_muxer_upstream_msg_iters->pdata[i] = NULL; + /* * Use g_ptr_array_remove_fast() because the * order of those elements is not important. */ - BT_LOGV("Removing muxer's upstream message iterator wrapper: ended or canceled: " - "muxer-msg-iter-addr=%p, " - "muxer-upstream-msg-iter-wrap-addr=%p", - muxer_msg_iter, muxer_upstream_msg_iter); g_ptr_array_remove_index_fast( - muxer_msg_iter->muxer_upstream_msg_iters, + muxer_msg_iter->active_muxer_upstream_msg_iters, i); i--; } @@ -973,11 +1082,11 @@ bt_self_message_iterator_status muxer_msg_iter_do_next_one( if (status < 0) { BT_LOGE("Cannot find the youngest upstream message iterator wrapper: " "status=%s", - bt_self_message_iterator_status_string(status)); + bt_common_self_message_iterator_status_string(status)); } else { BT_LOGV("Cannot find the youngest upstream message iterator wrapper: " "status=%s", - bt_self_message_iterator_status_string(status)); + bt_common_self_message_iterator_status_string(status)); } goto end; @@ -1062,10 +1171,16 @@ void destroy_muxer_msg_iter(struct muxer_msg_iter *muxer_msg_iter) BT_LOGD("Destroying muxer component's message iterator: " "muxer-msg-iter-addr=%p", muxer_msg_iter); - if (muxer_msg_iter->muxer_upstream_msg_iters) { - BT_LOGD_STR("Destroying muxer's upstream message iterator wrappers."); + if (muxer_msg_iter->active_muxer_upstream_msg_iters) { + BT_LOGD_STR("Destroying muxer's active upstream message iterator wrappers."); g_ptr_array_free( - muxer_msg_iter->muxer_upstream_msg_iters, TRUE); + muxer_msg_iter->active_muxer_upstream_msg_iters, TRUE); + } + + if (muxer_msg_iter->ended_muxer_upstream_msg_iters) { + BT_LOGD_STR("Destroying muxer's ended upstream message iterator wrappers."); + g_ptr_array_free( + muxer_msg_iter->ended_muxer_upstream_msg_iters, TRUE); } g_free(muxer_msg_iter); @@ -1171,10 +1286,18 @@ bt_self_message_iterator_status muxer_msg_iter_init( } muxer_msg_iter->last_returned_ts_ns = INT64_MIN; - muxer_msg_iter->muxer_upstream_msg_iters = + muxer_msg_iter->active_muxer_upstream_msg_iters = + g_ptr_array_new_with_free_func( + (GDestroyNotify) destroy_muxer_upstream_msg_iter); + if (!muxer_msg_iter->active_muxer_upstream_msg_iters) { + BT_LOGE_STR("Failed to allocate a GPtrArray."); + goto error; + } + + muxer_msg_iter->ended_muxer_upstream_msg_iters = g_ptr_array_new_with_free_func( (GDestroyNotify) destroy_muxer_upstream_msg_iter); - if (!muxer_msg_iter->muxer_upstream_msg_iters) { + if (!muxer_msg_iter->ended_muxer_upstream_msg_iters) { BT_LOGE_STR("Failed to allocate a GPtrArray."); goto error; } @@ -1208,8 +1331,7 @@ end: } BT_HIDDEN -void muxer_msg_iter_finalize( - bt_self_message_iterator *self_msg_iter) +void muxer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter) { struct muxer_msg_iter *muxer_msg_iter = bt_self_message_iterator_get_data(self_msg_iter); @@ -1260,11 +1382,11 @@ bt_self_message_iterator_status muxer_msg_iter_next( "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, " "msg-iter-addr=%p, status=%s", self_comp, muxer_comp, muxer_msg_iter, self_msg_iter, - bt_self_message_iterator_status_string(status)); + bt_common_self_message_iterator_status_string(status)); } else { BT_LOGV("Returning from muxer component's message iterator's \"next\" method: " "status=%s", - bt_self_message_iterator_status_string(status)); + bt_common_self_message_iterator_status_string(status)); } return status; @@ -1294,18 +1416,16 @@ end: return status; } -BT_HIDDEN -bt_bool muxer_msg_iter_can_seek_beginning( - bt_self_message_iterator *self_msg_iter) +static inline +bt_bool muxer_upstream_msg_iters_can_all_seek_beginning( + GPtrArray *muxer_upstream_msg_iters) { - struct muxer_msg_iter *muxer_msg_iter = - bt_self_message_iterator_get_data(self_msg_iter); uint64_t i; bt_bool ret = BT_TRUE; - for (i = 0; i < muxer_msg_iter->muxer_upstream_msg_iters->len; i++) { + for (i = 0; i < muxer_upstream_msg_iters->len; i++) { struct muxer_upstream_msg_iter *upstream_msg_iter = - muxer_msg_iter->muxer_upstream_msg_iters->pdata[i]; + muxer_upstream_msg_iters->pdata[i]; if (!bt_self_component_port_input_message_iterator_can_seek_beginning( upstream_msg_iter->msg_iter)) { @@ -1318,28 +1438,86 @@ end: return ret; } +BT_HIDDEN +bt_bool muxer_msg_iter_can_seek_beginning( + bt_self_message_iterator *self_msg_iter) +{ + struct muxer_msg_iter *muxer_msg_iter = + bt_self_message_iterator_get_data(self_msg_iter); + bt_bool ret = BT_TRUE; + + if (!muxer_upstream_msg_iters_can_all_seek_beginning( + muxer_msg_iter->active_muxer_upstream_msg_iters)) { + ret = BT_FALSE; + goto end; + } + + if (!muxer_upstream_msg_iters_can_all_seek_beginning( + muxer_msg_iter->ended_muxer_upstream_msg_iters)) { + ret = BT_FALSE; + goto end; + } + +end: + return ret; +} + BT_HIDDEN bt_self_message_iterator_status muxer_msg_iter_seek_beginning( bt_self_message_iterator *self_msg_iter) { struct muxer_msg_iter *muxer_msg_iter = bt_self_message_iterator_get_data(self_msg_iter); - int status; + bt_message_iterator_status status = BT_MESSAGE_ITERATOR_STATUS_OK; uint64_t i; - for (i = 0; i < muxer_msg_iter->muxer_upstream_msg_iters->len; i++) { + /* Seek all ended upstream iterators first */ + for (i = 0; i < muxer_msg_iter->ended_muxer_upstream_msg_iters->len; + i++) { + struct muxer_upstream_msg_iter *upstream_msg_iter = + muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i]; + + status = bt_self_component_port_input_message_iterator_seek_beginning( + upstream_msg_iter->msg_iter); + if (status != BT_MESSAGE_ITERATOR_STATUS_OK) { + goto end; + } + + empty_message_queue(upstream_msg_iter); + } + + /* Seek all previously active upstream iterators */ + for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len; + i++) { struct muxer_upstream_msg_iter *upstream_msg_iter = - muxer_msg_iter->muxer_upstream_msg_iters->pdata[i]; + muxer_msg_iter->active_muxer_upstream_msg_iters->pdata[i]; status = bt_self_component_port_input_message_iterator_seek_beginning( upstream_msg_iter->msg_iter); if (status != BT_MESSAGE_ITERATOR_STATUS_OK) { goto end; } + + empty_message_queue(upstream_msg_iter); } + /* Make them all active */ + for (i = 0; i < muxer_msg_iter->ended_muxer_upstream_msg_iters->len; + i++) { + struct muxer_upstream_msg_iter *upstream_msg_iter = + muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i]; + + g_ptr_array_add(muxer_msg_iter->active_muxer_upstream_msg_iters, + upstream_msg_iter); + muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i] = NULL; + } + + g_ptr_array_remove_range(muxer_msg_iter->ended_muxer_upstream_msg_iters, + 0, muxer_msg_iter->ended_muxer_upstream_msg_iters->len); muxer_msg_iter->last_returned_ts_ns = INT64_MIN; + muxer_msg_iter->clock_class_expectation = + MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY; end: - return status; + return (bt_self_message_iterator_status) status; }