flt.utils.muxer: replace queue with array
authorSimon Marchi <simon.marchi@efficios.com>
Sun, 22 May 2022 19:46:36 +0000 (15:46 -0400)
committerPhilippe Proulx <eeppeliteloop@gmail.com>
Wed, 15 Jun 2022 18:15:50 +0000 (14:15 -0400)
While profiling, I noticed "a lot" of time spent in g_queue* functions,
in the context of the muxer.  Using a dummy output and an LTTng kernel
trace as input, 2.7% of the time was spent in g_queue_pop_head and
1.92% in g_queue_push_tail.  Under those, most of the time was spent in
memory allocation functions.

For something fast path like the muxer's message queues, I think we
would benefit on reducing the number of allocation/deallocations.

To improve that, replace the queue with a GPtrArray.  Incoming messages
are put in this array, and the index of the next message to be returned
is kept in a separate field.  When a message from that queue is
returned, return the message at that index.  Write NULL at that index
(the ownership of the message is transferred from the queue) and
increment the next message index.  The queue is considered empty when
the next message index is equal to the array length.  At this point, all
entries in the array are expected to be NULL, and more messages need to
be obtained from the upstream message iterator.  The array is resized
(g_ptr_array_set_size) to the size of the new message batch.  In
practice, the message count is always the same, so the size of the array
won't change, and g_ptr_array_set_size just does trivial work.

Performance results I get locally:

Before:

$ time ./src/cli/babeltrace2 /home/simark/lttng-traces/auto-20180226-075238 -o dummy
./src/cli/babeltrace2 /home/simark/lttng-traces/auto-20180226-075238 -o dummy  3.77s user 0.02s system 99% cpu 3.791 total
$ time ./src/cli/babeltrace2 /home/simark/lttng-traces/auto-20180226-075238 -o dummy
./src/cli/babeltrace2 /home/simark/lttng-traces/auto-20180226-075238 -o dummy  3.78s user 0.03s system 99% cpu 3.822 total

After:

$ time ./src/cli/babeltrace2 /home/simark/lttng-traces/auto-20180226-075238 -o dummy
./src/cli/babeltrace2 /home/simark/lttng-traces/auto-20180226-075238 -o dummy  3.52s user 0.06s system 99% cpu 3.577 total
$ time ./src/cli/babeltrace2 /home/simark/lttng-traces/auto-20180226-075238 -o dummy
./src/cli/babeltrace2 /home/simark/lttng-traces/auto-20180226-075238 -o dummy  3.52s user 0.04s system 99% cpu 3.563 total

This is with Babeltrace configured with:

    --enable-python-bindings --enable-python-plugins --disable-man-pages 'CFLAGS=-gdwarf-5 -g3 -O2' 'CXXFLAGS=-gdwarf-5 -g3 -O2' --prefix=/tmp/babeltrace 'CC=ccache clang' 'CXX=ccache clang++'

Change-Id: I0ce20994981be0479f0529880b4bdbca53a03fd6
Reviewed-on: https://review.lttng.org/c/babeltrace/+/8107
CI-Build: Simon Marchi <simon.marchi@efficios.com>
Tested-by: jenkins <jenkins@lttng.org>
Reviewed-by: Philippe Proulx <eeppeliteloop@gmail.com>
src/plugins/utils/muxer/muxer.c

index f340f38d227a77592b7fe30474a0be0a0b73a928..fa049cf0dd939b65272dc1d02dc1ab1f82bfdb42 100644 (file)
@@ -43,7 +43,10 @@ struct muxer_upstream_msg_iter {
        bt_message_iterator *msg_iter;
 
        /* Contains `const bt_message *`, owned by this */
        bt_message_iterator *msg_iter;
 
        /* Contains `const bt_message *`, owned by this */
-       GQueue *msgs;
+       GPtrArray *msgs;
+
+       /* Index of the next message in `msgs` to return */
+       guint next_msg;
 };
 
 enum muxer_msg_iter_clock_class_expectation {
 };
 
 enum muxer_msg_iter_clock_class_expectation {
@@ -104,11 +107,7 @@ struct muxer_msg_iter {
 static
 void empty_message_queue(struct muxer_upstream_msg_iter *upstream_msg_iter)
 {
 static
 void empty_message_queue(struct muxer_upstream_msg_iter *upstream_msg_iter)
 {
-       const bt_message *msg;
-
-       while ((msg = g_queue_pop_head(upstream_msg_iter->msgs))) {
-               bt_message_put_ref(msg);
-       }
+       g_ptr_array_set_size(upstream_msg_iter->msgs, 0);
 }
 
 static
 }
 
 static
@@ -123,16 +122,17 @@ void destroy_muxer_upstream_msg_iter(
 
        muxer_comp = muxer_upstream_msg_iter->muxer_comp;
        BT_COMP_LOGD("Destroying muxer's upstream message iterator wrapper: "
 
        muxer_comp = muxer_upstream_msg_iter->muxer_comp;
        BT_COMP_LOGD("Destroying muxer's upstream message iterator wrapper: "
-               "addr=%p, msg-iter-addr=%p, queue-len=%u",
+               "addr=%p, msg-iter-addr=%p, queue-len=%u, next-msg=%u",
                muxer_upstream_msg_iter,
                muxer_upstream_msg_iter->msg_iter,
                muxer_upstream_msg_iter,
                muxer_upstream_msg_iter->msg_iter,
-               muxer_upstream_msg_iter->msgs->length);
+               muxer_upstream_msg_iter->msgs->len,
+               muxer_upstream_msg_iter->next_msg);
+
        bt_message_iterator_put_ref(
                muxer_upstream_msg_iter->msg_iter);
 
        if (muxer_upstream_msg_iter->msgs) {
        bt_message_iterator_put_ref(
                muxer_upstream_msg_iter->msg_iter);
 
        if (muxer_upstream_msg_iter->msgs) {
-               empty_message_queue(muxer_upstream_msg_iter);
-               g_queue_free(muxer_upstream_msg_iter->msgs);
+               g_ptr_array_free(muxer_upstream_msg_iter->msgs, TRUE);
        }
 
        g_free(muxer_upstream_msg_iter);
        }
 
        g_free(muxer_upstream_msg_iter);
@@ -156,10 +156,11 @@ int muxer_msg_iter_add_upstream_msg_iter(struct muxer_msg_iter *muxer_msg_iter,
        muxer_upstream_msg_iter->muxer_comp = muxer_comp;
        muxer_upstream_msg_iter->msg_iter = self_msg_iter;
        bt_message_iterator_get_ref(muxer_upstream_msg_iter->msg_iter);
        muxer_upstream_msg_iter->muxer_comp = muxer_comp;
        muxer_upstream_msg_iter->msg_iter = self_msg_iter;
        bt_message_iterator_get_ref(muxer_upstream_msg_iter->msg_iter);
-       muxer_upstream_msg_iter->msgs = g_queue_new();
+       muxer_upstream_msg_iter->msgs =
+               g_ptr_array_new_with_free_func((GDestroyNotify) bt_message_put_ref);
        if (!muxer_upstream_msg_iter->msgs) {
                BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
        if (!muxer_upstream_msg_iter->msgs) {
                BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
-                       "Failed to allocate a GQueue.");
+                       "Failed to allocate a GPtrArray.");
                goto error;
        }
 
                goto error;
        }
 
@@ -411,6 +412,9 @@ bt_message_iterator_class_next_method_status muxer_upstream_msg_iter_next(
                BT_COMP_LOGD_STR("Validated upstream message iterator wrapper.");
                BT_ASSERT_DBG(count > 0);
 
                BT_COMP_LOGD_STR("Validated upstream message iterator wrapper.");
                BT_ASSERT_DBG(count > 0);
 
+               g_ptr_array_set_size(muxer_upstream_msg_iter->msgs, count);
+               muxer_upstream_msg_iter->next_msg = 0;
+
                /* Move messages to our queue */
                for (i = 0; i < count; i++) {
                        /*
                /* Move messages to our queue */
                for (i = 0; i < count; i++) {
                        /*
@@ -418,8 +422,8 @@ bt_message_iterator_class_next_method_status muxer_upstream_msg_iter_next(
                         * (muxer_msg_iter_do_next_one()) consumes
                         * from the head first.
                         */
                         * (muxer_msg_iter_do_next_one()) consumes
                         * from the head first.
                         */
-                       g_queue_push_tail(muxer_upstream_msg_iter->msgs,
-                               (void *) msgs[i]);
+                       g_ptr_array_index(muxer_upstream_msg_iter->msgs, i)
+                               = (gpointer *) msgs[i];
                }
                status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
                break;
                }
                status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
                break;
@@ -833,8 +837,10 @@ muxer_msg_iter_youngest_upstream_msg_iter(
                        continue;
                }
 
                        continue;
                }
 
-               BT_ASSERT_DBG(cur_muxer_upstream_msg_iter->msgs->length > 0);
-               msg = g_queue_peek_head(cur_muxer_upstream_msg_iter->msgs);
+               BT_ASSERT_DBG(cur_muxer_upstream_msg_iter->next_msg <
+                       cur_muxer_upstream_msg_iter->msgs->len);
+               msg = g_ptr_array_index(cur_muxer_upstream_msg_iter->msgs,
+                       cur_muxer_upstream_msg_iter->next_msg);
                BT_ASSERT_DBG(msg);
 
                if (G_UNLIKELY(bt_message_get_type(msg) ==
                BT_ASSERT_DBG(msg);
 
                if (G_UNLIKELY(bt_message_get_type(msg) ==
@@ -893,8 +899,11 @@ muxer_msg_iter_youngest_upstream_msg_iter(
                         * current candidate message. We must break the tie
                         * in a predictable manner.
                         */
                         * current candidate message. We must break the tie
                         * in a predictable manner.
                         */
-                       const bt_message *selected_msg = g_queue_peek_head(
-                               (*muxer_upstream_msg_iter)->msgs);
+                       BT_ASSERT_DBG((*muxer_upstream_msg_iter)->next_msg <
+                               (*muxer_upstream_msg_iter)->msgs->len);
+                       const bt_message *selected_msg =
+                               g_ptr_array_index((*muxer_upstream_msg_iter)->msgs,
+                                       (*muxer_upstream_msg_iter)->next_msg);
                        BT_COMP_LOGD_STR("Two of the next message candidates have the same timestamps, pick one deterministically.");
 
                        /*
                        BT_COMP_LOGD_STR("Two of the next message candidates have the same timestamps, pick one deterministically.");
 
                        /*
@@ -944,11 +953,12 @@ validate_muxer_upstream_msg_iter(
                "muxer-upstream-msg-iter-wrap-addr=%p",
                muxer_upstream_msg_iter);
 
                "muxer-upstream-msg-iter-wrap-addr=%p",
                muxer_upstream_msg_iter);
 
-       if (muxer_upstream_msg_iter->msgs->length > 0 ||
+       if (muxer_upstream_msg_iter->next_msg < muxer_upstream_msg_iter->msgs->len ||
                        !muxer_upstream_msg_iter->msg_iter) {
                BT_COMP_LOGD("Already valid or not considered: "
                        !muxer_upstream_msg_iter->msg_iter) {
                BT_COMP_LOGD("Already valid or not considered: "
-                       "queue-len=%u, upstream-msg-iter-addr=%p",
-                       muxer_upstream_msg_iter->msgs->length,
+                       "queue-len=%u, next-msg=%u, upstream-msg-iter-addr=%p",
+                       muxer_upstream_msg_iter->msgs->len,
+                       muxer_upstream_msg_iter->next_msg,
                        muxer_upstream_msg_iter->msg_iter);
                status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
                goto end;
                        muxer_upstream_msg_iter->msg_iter);
                status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
                goto end;
@@ -1099,7 +1109,11 @@ bt_message_iterator_class_next_method_status muxer_msg_iter_do_next_one(
         * Consume from the queue's head: other side
         * (muxer_upstream_msg_iter_next()) writes to the tail.
         */
         * Consume from the queue's head: other side
         * (muxer_upstream_msg_iter_next()) writes to the tail.
         */
-       *msg = g_queue_pop_head(muxer_upstream_msg_iter->msgs);
+       *msg = g_ptr_array_index(muxer_upstream_msg_iter->msgs,
+               muxer_upstream_msg_iter->next_msg);
+       g_ptr_array_index(muxer_upstream_msg_iter->msgs,
+               muxer_upstream_msg_iter->next_msg) = NULL;
+       ++muxer_upstream_msg_iter->next_msg;
        BT_ASSERT_DBG(*msg);
        muxer_msg_iter->last_returned_ts_ns = next_return_ts;
 
        BT_ASSERT_DBG(*msg);
        muxer_msg_iter->last_returned_ts_ns = next_return_ts;
 
This page took 0.028214 seconds and 4 git commands to generate.