From 30799132cd92de929a90ae6e366bfe5032cfd241 Mon Sep 17 00:00:00 2001 From: Simon Marchi Date: Sun, 22 May 2022 15:46:36 -0400 Subject: [PATCH] flt.utils.muxer: replace queue with array While profiling, I noticed "a lot" of time spent in g_queue* functions, in the context of the muxer. Using a dummy output and an LTTng kernel trace as input, 2.7% of the time was spent in g_queue_pop_head and 1.92% in g_queue_push_tail. Under those, most of the time was spent in memory allocation functions. For something fast path like the muxer's message queues, I think we would benefit on reducing the number of allocation/deallocations. To improve that, replace the queue with a GPtrArray. Incoming messages are put in this array, and the index of the next message to be returned is kept in a separate field. When a message from that queue is returned, return the message at that index. Write NULL at that index (the ownership of the message is transferred from the queue) and increment the next message index. The queue is considered empty when the next message index is equal to the array length. At this point, all entries in the array are expected to be NULL, and more messages need to be obtained from the upstream message iterator. The array is resized (g_ptr_array_set_size) to the size of the new message batch. In practice, the message count is always the same, so the size of the array won't change, and g_ptr_array_set_size just does trivial work. Performance results I get locally: Before: $ time ./src/cli/babeltrace2 /home/simark/lttng-traces/auto-20180226-075238 -o dummy ./src/cli/babeltrace2 /home/simark/lttng-traces/auto-20180226-075238 -o dummy 3.77s user 0.02s system 99% cpu 3.791 total $ time ./src/cli/babeltrace2 /home/simark/lttng-traces/auto-20180226-075238 -o dummy ./src/cli/babeltrace2 /home/simark/lttng-traces/auto-20180226-075238 -o dummy 3.78s user 0.03s system 99% cpu 3.822 total After: $ time ./src/cli/babeltrace2 /home/simark/lttng-traces/auto-20180226-075238 -o dummy ./src/cli/babeltrace2 /home/simark/lttng-traces/auto-20180226-075238 -o dummy 3.52s user 0.06s system 99% cpu 3.577 total $ time ./src/cli/babeltrace2 /home/simark/lttng-traces/auto-20180226-075238 -o dummy ./src/cli/babeltrace2 /home/simark/lttng-traces/auto-20180226-075238 -o dummy 3.52s user 0.04s system 99% cpu 3.563 total This is with Babeltrace configured with: --enable-python-bindings --enable-python-plugins --disable-man-pages 'CFLAGS=-gdwarf-5 -g3 -O2' 'CXXFLAGS=-gdwarf-5 -g3 -O2' --prefix=/tmp/babeltrace 'CC=ccache clang' 'CXX=ccache clang++' Change-Id: I0ce20994981be0479f0529880b4bdbca53a03fd6 Reviewed-on: https://review.lttng.org/c/babeltrace/+/8107 CI-Build: Simon Marchi Tested-by: jenkins Reviewed-by: Philippe Proulx --- src/plugins/utils/muxer/muxer.c | 58 ++++++++++++++++++++------------- 1 file changed, 36 insertions(+), 22 deletions(-) diff --git a/src/plugins/utils/muxer/muxer.c b/src/plugins/utils/muxer/muxer.c index f340f38d..fa049cf0 100644 --- a/src/plugins/utils/muxer/muxer.c +++ b/src/plugins/utils/muxer/muxer.c @@ -43,7 +43,10 @@ struct muxer_upstream_msg_iter { bt_message_iterator *msg_iter; /* Contains `const bt_message *`, owned by this */ - GQueue *msgs; + GPtrArray *msgs; + + /* Index of the next message in `msgs` to return */ + guint next_msg; }; enum muxer_msg_iter_clock_class_expectation { @@ -104,11 +107,7 @@ struct muxer_msg_iter { static void empty_message_queue(struct muxer_upstream_msg_iter *upstream_msg_iter) { - const bt_message *msg; - - while ((msg = g_queue_pop_head(upstream_msg_iter->msgs))) { - bt_message_put_ref(msg); - } + g_ptr_array_set_size(upstream_msg_iter->msgs, 0); } static @@ -123,16 +122,17 @@ void destroy_muxer_upstream_msg_iter( muxer_comp = muxer_upstream_msg_iter->muxer_comp; BT_COMP_LOGD("Destroying muxer's upstream message iterator wrapper: " - "addr=%p, msg-iter-addr=%p, queue-len=%u", + "addr=%p, msg-iter-addr=%p, queue-len=%u, next-msg=%u", muxer_upstream_msg_iter, muxer_upstream_msg_iter->msg_iter, - muxer_upstream_msg_iter->msgs->length); + muxer_upstream_msg_iter->msgs->len, + muxer_upstream_msg_iter->next_msg); + bt_message_iterator_put_ref( muxer_upstream_msg_iter->msg_iter); if (muxer_upstream_msg_iter->msgs) { - empty_message_queue(muxer_upstream_msg_iter); - g_queue_free(muxer_upstream_msg_iter->msgs); + g_ptr_array_free(muxer_upstream_msg_iter->msgs, TRUE); } g_free(muxer_upstream_msg_iter); @@ -156,10 +156,11 @@ int muxer_msg_iter_add_upstream_msg_iter(struct muxer_msg_iter *muxer_msg_iter, muxer_upstream_msg_iter->muxer_comp = muxer_comp; muxer_upstream_msg_iter->msg_iter = self_msg_iter; bt_message_iterator_get_ref(muxer_upstream_msg_iter->msg_iter); - muxer_upstream_msg_iter->msgs = g_queue_new(); + muxer_upstream_msg_iter->msgs = + g_ptr_array_new_with_free_func((GDestroyNotify) bt_message_put_ref); if (!muxer_upstream_msg_iter->msgs) { BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, - "Failed to allocate a GQueue."); + "Failed to allocate a GPtrArray."); goto error; } @@ -411,6 +412,9 @@ bt_message_iterator_class_next_method_status muxer_upstream_msg_iter_next( BT_COMP_LOGD_STR("Validated upstream message iterator wrapper."); BT_ASSERT_DBG(count > 0); + g_ptr_array_set_size(muxer_upstream_msg_iter->msgs, count); + muxer_upstream_msg_iter->next_msg = 0; + /* Move messages to our queue */ for (i = 0; i < count; i++) { /* @@ -418,8 +422,8 @@ bt_message_iterator_class_next_method_status muxer_upstream_msg_iter_next( * (muxer_msg_iter_do_next_one()) consumes * from the head first. */ - g_queue_push_tail(muxer_upstream_msg_iter->msgs, - (void *) msgs[i]); + g_ptr_array_index(muxer_upstream_msg_iter->msgs, i) + = (gpointer *) msgs[i]; } status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; break; @@ -833,8 +837,10 @@ muxer_msg_iter_youngest_upstream_msg_iter( continue; } - BT_ASSERT_DBG(cur_muxer_upstream_msg_iter->msgs->length > 0); - msg = g_queue_peek_head(cur_muxer_upstream_msg_iter->msgs); + BT_ASSERT_DBG(cur_muxer_upstream_msg_iter->next_msg < + cur_muxer_upstream_msg_iter->msgs->len); + msg = g_ptr_array_index(cur_muxer_upstream_msg_iter->msgs, + cur_muxer_upstream_msg_iter->next_msg); BT_ASSERT_DBG(msg); if (G_UNLIKELY(bt_message_get_type(msg) == @@ -893,8 +899,11 @@ muxer_msg_iter_youngest_upstream_msg_iter( * current candidate message. We must break the tie * in a predictable manner. */ - const bt_message *selected_msg = g_queue_peek_head( - (*muxer_upstream_msg_iter)->msgs); + BT_ASSERT_DBG((*muxer_upstream_msg_iter)->next_msg < + (*muxer_upstream_msg_iter)->msgs->len); + const bt_message *selected_msg = + g_ptr_array_index((*muxer_upstream_msg_iter)->msgs, + (*muxer_upstream_msg_iter)->next_msg); BT_COMP_LOGD_STR("Two of the next message candidates have the same timestamps, pick one deterministically."); /* @@ -944,11 +953,12 @@ validate_muxer_upstream_msg_iter( "muxer-upstream-msg-iter-wrap-addr=%p", muxer_upstream_msg_iter); - if (muxer_upstream_msg_iter->msgs->length > 0 || + if (muxer_upstream_msg_iter->next_msg < muxer_upstream_msg_iter->msgs->len || !muxer_upstream_msg_iter->msg_iter) { BT_COMP_LOGD("Already valid or not considered: " - "queue-len=%u, upstream-msg-iter-addr=%p", - muxer_upstream_msg_iter->msgs->length, + "queue-len=%u, next-msg=%u, upstream-msg-iter-addr=%p", + muxer_upstream_msg_iter->msgs->len, + muxer_upstream_msg_iter->next_msg, muxer_upstream_msg_iter->msg_iter); status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; goto end; @@ -1099,7 +1109,11 @@ bt_message_iterator_class_next_method_status muxer_msg_iter_do_next_one( * Consume from the queue's head: other side * (muxer_upstream_msg_iter_next()) writes to the tail. */ - *msg = g_queue_pop_head(muxer_upstream_msg_iter->msgs); + *msg = g_ptr_array_index(muxer_upstream_msg_iter->msgs, + muxer_upstream_msg_iter->next_msg); + g_ptr_array_index(muxer_upstream_msg_iter->msgs, + muxer_upstream_msg_iter->next_msg) = NULL; + ++muxer_upstream_msg_iter->next_msg; BT_ASSERT_DBG(*msg); muxer_msg_iter->last_returned_ts_ns = next_return_ts; -- 2.34.1