X-Git-Url: http://git.efficios.com/?p=babeltrace.git;a=blobdiff_plain;f=src%2Fplugins%2Futils%2Fmuxer%2Fmuxer.c;h=fa049cf0dd939b65272dc1d02dc1ab1f82bfdb42;hp=f340f38d227a77592b7fe30474a0be0a0b73a928;hb=30799132cd92de929a90ae6e366bfe5032cfd241;hpb=088b0bbe10be6143d390eff198c459cc03333650 diff --git a/src/plugins/utils/muxer/muxer.c b/src/plugins/utils/muxer/muxer.c index f340f38d..fa049cf0 100644 --- a/src/plugins/utils/muxer/muxer.c +++ b/src/plugins/utils/muxer/muxer.c @@ -43,7 +43,10 @@ struct muxer_upstream_msg_iter { bt_message_iterator *msg_iter; /* Contains `const bt_message *`, owned by this */ - GQueue *msgs; + GPtrArray *msgs; + + /* Index of the next message in `msgs` to return */ + guint next_msg; }; enum muxer_msg_iter_clock_class_expectation { @@ -104,11 +107,7 @@ struct muxer_msg_iter { static void empty_message_queue(struct muxer_upstream_msg_iter *upstream_msg_iter) { - const bt_message *msg; - - while ((msg = g_queue_pop_head(upstream_msg_iter->msgs))) { - bt_message_put_ref(msg); - } + g_ptr_array_set_size(upstream_msg_iter->msgs, 0); } static @@ -123,16 +122,17 @@ void destroy_muxer_upstream_msg_iter( muxer_comp = muxer_upstream_msg_iter->muxer_comp; BT_COMP_LOGD("Destroying muxer's upstream message iterator wrapper: " - "addr=%p, msg-iter-addr=%p, queue-len=%u", + "addr=%p, msg-iter-addr=%p, queue-len=%u, next-msg=%u", muxer_upstream_msg_iter, muxer_upstream_msg_iter->msg_iter, - muxer_upstream_msg_iter->msgs->length); + muxer_upstream_msg_iter->msgs->len, + muxer_upstream_msg_iter->next_msg); + bt_message_iterator_put_ref( muxer_upstream_msg_iter->msg_iter); if (muxer_upstream_msg_iter->msgs) { - empty_message_queue(muxer_upstream_msg_iter); - g_queue_free(muxer_upstream_msg_iter->msgs); + g_ptr_array_free(muxer_upstream_msg_iter->msgs, TRUE); } g_free(muxer_upstream_msg_iter); @@ -156,10 +156,11 @@ int muxer_msg_iter_add_upstream_msg_iter(struct muxer_msg_iter *muxer_msg_iter, muxer_upstream_msg_iter->muxer_comp = muxer_comp; muxer_upstream_msg_iter->msg_iter = self_msg_iter; bt_message_iterator_get_ref(muxer_upstream_msg_iter->msg_iter); - muxer_upstream_msg_iter->msgs = g_queue_new(); + muxer_upstream_msg_iter->msgs = + g_ptr_array_new_with_free_func((GDestroyNotify) bt_message_put_ref); if (!muxer_upstream_msg_iter->msgs) { BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, - "Failed to allocate a GQueue."); + "Failed to allocate a GPtrArray."); goto error; } @@ -411,6 +412,9 @@ bt_message_iterator_class_next_method_status muxer_upstream_msg_iter_next( BT_COMP_LOGD_STR("Validated upstream message iterator wrapper."); BT_ASSERT_DBG(count > 0); + g_ptr_array_set_size(muxer_upstream_msg_iter->msgs, count); + muxer_upstream_msg_iter->next_msg = 0; + /* Move messages to our queue */ for (i = 0; i < count; i++) { /* @@ -418,8 +422,8 @@ bt_message_iterator_class_next_method_status muxer_upstream_msg_iter_next( * (muxer_msg_iter_do_next_one()) consumes * from the head first. */ - g_queue_push_tail(muxer_upstream_msg_iter->msgs, - (void *) msgs[i]); + g_ptr_array_index(muxer_upstream_msg_iter->msgs, i) + = (gpointer *) msgs[i]; } status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; break; @@ -833,8 +837,10 @@ muxer_msg_iter_youngest_upstream_msg_iter( continue; } - BT_ASSERT_DBG(cur_muxer_upstream_msg_iter->msgs->length > 0); - msg = g_queue_peek_head(cur_muxer_upstream_msg_iter->msgs); + BT_ASSERT_DBG(cur_muxer_upstream_msg_iter->next_msg < + cur_muxer_upstream_msg_iter->msgs->len); + msg = g_ptr_array_index(cur_muxer_upstream_msg_iter->msgs, + cur_muxer_upstream_msg_iter->next_msg); BT_ASSERT_DBG(msg); if (G_UNLIKELY(bt_message_get_type(msg) == @@ -893,8 +899,11 @@ muxer_msg_iter_youngest_upstream_msg_iter( * current candidate message. We must break the tie * in a predictable manner. */ - const bt_message *selected_msg = g_queue_peek_head( - (*muxer_upstream_msg_iter)->msgs); + BT_ASSERT_DBG((*muxer_upstream_msg_iter)->next_msg < + (*muxer_upstream_msg_iter)->msgs->len); + const bt_message *selected_msg = + g_ptr_array_index((*muxer_upstream_msg_iter)->msgs, + (*muxer_upstream_msg_iter)->next_msg); BT_COMP_LOGD_STR("Two of the next message candidates have the same timestamps, pick one deterministically."); /* @@ -944,11 +953,12 @@ validate_muxer_upstream_msg_iter( "muxer-upstream-msg-iter-wrap-addr=%p", muxer_upstream_msg_iter); - if (muxer_upstream_msg_iter->msgs->length > 0 || + if (muxer_upstream_msg_iter->next_msg < muxer_upstream_msg_iter->msgs->len || !muxer_upstream_msg_iter->msg_iter) { BT_COMP_LOGD("Already valid or not considered: " - "queue-len=%u, upstream-msg-iter-addr=%p", - muxer_upstream_msg_iter->msgs->length, + "queue-len=%u, next-msg=%u, upstream-msg-iter-addr=%p", + muxer_upstream_msg_iter->msgs->len, + muxer_upstream_msg_iter->next_msg, muxer_upstream_msg_iter->msg_iter); status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; goto end; @@ -1099,7 +1109,11 @@ bt_message_iterator_class_next_method_status muxer_msg_iter_do_next_one( * Consume from the queue's head: other side * (muxer_upstream_msg_iter_next()) writes to the tail. */ - *msg = g_queue_pop_head(muxer_upstream_msg_iter->msgs); + *msg = g_ptr_array_index(muxer_upstream_msg_iter->msgs, + muxer_upstream_msg_iter->next_msg); + g_ptr_array_index(muxer_upstream_msg_iter->msgs, + muxer_upstream_msg_iter->next_msg) = NULL; + ++muxer_upstream_msg_iter->next_msg; BT_ASSERT_DBG(*msg); muxer_msg_iter->last_returned_ts_ns = next_return_ts;