flt.utils.muxer: do not release upstream iterators when they're ended
authorPhilippe Proulx <eeppeliteloop@gmail.com>
Fri, 1 Mar 2019 21:29:35 +0000 (16:29 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Fri, 3 May 2019 22:19:38 +0000 (18:19 -0400)
This patch makes an `flt.utils.muxer` message iterator NOT release (put
the reference of) one of its upstream message iterator when it's ended
(when bt_self_component_port_input_message_iterator_next() returns
`BT_MESSAGE_ITERATOR_STATUS_END`). This is needed to make it possible
for an `flt.utils.muxer` message iterator to seek its beginning without
creating new upstream message iterators.

Instead, when an upstream message iterator ends, its internal wrapper is
moved to an "ended" array of upstream message iterators. Only "active"
upstream message iterators are considered when actually muxing. When
seeking the beginning, we make all upstream iterators seek (active and
ended), and then put back the ended upstream message iterators into the
active upstream message iterator array.

Signed-off-by: Philippe Proulx <eeppeliteloop@gmail.com>
plugins/utils/muxer/muxer.c

index 05c844b0729109e929a8f4ef30e96f2cf5e801a0..ad285dfe4ad1b14c6de76a5662c93c92be69c1d4 100644 (file)
@@ -77,7 +77,16 @@ struct muxer_msg_iter {
         * another data structure is faster than this for our typical
         * use cases.
         */
-       GPtrArray *muxer_upstream_msg_iters;
+       GPtrArray *active_muxer_upstream_msg_iters;
+
+       /*
+        * Array of struct muxer_upstream_msg_iter * (owned by this).
+        *
+        * We move ended message iterators from
+        * `active_muxer_upstream_msg_iters` to this array so as to be
+        * able to restore them when seeking.
+        */
+       GPtrArray *ended_muxer_upstream_msg_iters;
 
        /* Last time returned in a message */
        int64_t last_returned_ts_ns;
@@ -93,6 +102,16 @@ struct muxer_msg_iter {
        unsigned char expected_clock_class_uuid[BABELTRACE_UUID_LEN];
 };
 
+static
+void empty_message_queue(struct muxer_upstream_msg_iter *upstream_msg_iter)
+{
+       const bt_message *msg;
+
+       while ((msg = g_queue_pop_head(upstream_msg_iter->msgs))) {
+               bt_message_put_ref(msg);
+       }
+}
+
 static
 void destroy_muxer_upstream_msg_iter(
                struct muxer_upstream_msg_iter *muxer_upstream_msg_iter)
@@ -106,16 +125,11 @@ void destroy_muxer_upstream_msg_iter(
                muxer_upstream_msg_iter,
                muxer_upstream_msg_iter->msg_iter,
                muxer_upstream_msg_iter->msgs->length);
-       bt_self_component_port_input_message_iterator_put_ref(muxer_upstream_msg_iter->msg_iter);
+       bt_self_component_port_input_message_iterator_put_ref(
+               muxer_upstream_msg_iter->msg_iter);
 
        if (muxer_upstream_msg_iter->msgs) {
-               const bt_message *msg;
-
-               while ((msg = g_queue_pop_head(
-                               muxer_upstream_msg_iter->msgs))) {
-                       bt_message_put_ref(msg);
-               }
-
+               empty_message_queue(muxer_upstream_msg_iter);
                g_queue_free(muxer_upstream_msg_iter->msgs);
        }
 
@@ -143,7 +157,7 @@ struct muxer_upstream_msg_iter *muxer_msg_iter_add_upstream_msg_iter(
                goto end;
        }
 
-       g_ptr_array_add(muxer_msg_iter->muxer_upstream_msg_iters,
+       g_ptr_array_add(muxer_msg_iter->active_muxer_upstream_msg_iters,
                muxer_upstream_msg_iter);
        BT_LOGD("Added muxer's upstream message iterator wrapper: "
                "addr=%p, muxer-msg-iter-addr=%p, msg-iter-addr=%p",
@@ -409,7 +423,8 @@ end:
 
 static
 bt_self_message_iterator_status muxer_upstream_msg_iter_next(
-               struct muxer_upstream_msg_iter *muxer_upstream_msg_iter)
+               struct muxer_upstream_msg_iter *muxer_upstream_msg_iter,
+               bool *is_ended)
 {
        bt_self_message_iterator_status status;
        bt_message_iterator_status input_port_iter_status;
@@ -461,7 +476,7 @@ bt_self_message_iterator_status muxer_upstream_msg_iter_next(
                 * won't be considered again to find the youngest
                 * message.
                 */
-               BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(muxer_upstream_msg_iter->msg_iter);
+               *is_ended = true;
                status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
                break;
        default:
@@ -813,10 +828,13 @@ muxer_msg_iter_youngest_upstream_msg_iter(
        BT_ASSERT(muxer_upstream_msg_iter);
        *muxer_upstream_msg_iter = NULL;
 
-       for (i = 0; i < muxer_msg_iter->muxer_upstream_msg_iters->len; i++) {
+       for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len;
+                       i++) {
                const bt_message *msg;
                struct muxer_upstream_msg_iter *cur_muxer_upstream_msg_iter =
-                       g_ptr_array_index(muxer_msg_iter->muxer_upstream_msg_iters, i);
+                       g_ptr_array_index(
+                               muxer_msg_iter->active_muxer_upstream_msg_iters,
+                               i);
                int64_t msg_ts_ns;
 
                if (!cur_muxer_upstream_msg_iter->msg_iter) {
@@ -858,7 +876,8 @@ end:
 
 static
 bt_self_message_iterator_status validate_muxer_upstream_msg_iter(
-       struct muxer_upstream_msg_iter *muxer_upstream_msg_iter)
+       struct muxer_upstream_msg_iter *muxer_upstream_msg_iter,
+       bool *is_ended)
 {
        bt_self_message_iterator_status status =
                BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
@@ -877,7 +896,8 @@ bt_self_message_iterator_status validate_muxer_upstream_msg_iter(
        }
 
        /* muxer_upstream_msg_iter_next() logs details/errors */
-       status = muxer_upstream_msg_iter_next(muxer_upstream_msg_iter);
+       status = muxer_upstream_msg_iter_next(muxer_upstream_msg_iter,
+               is_ended);
 
 end:
        return status;
@@ -890,18 +910,20 @@ bt_self_message_iterator_status validate_muxer_upstream_msg_iters(
        bt_self_message_iterator_status status =
                BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
        size_t i;
+       bool is_ended = false;
 
        BT_LOGV("Validating muxer's upstream message iterator wrappers: "
                "muxer-msg-iter-addr=%p", muxer_msg_iter);
 
-       for (i = 0; i < muxer_msg_iter->muxer_upstream_msg_iters->len; i++) {
+       for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len;
+                       i++) {
                struct muxer_upstream_msg_iter *muxer_upstream_msg_iter =
                        g_ptr_array_index(
-                               muxer_msg_iter->muxer_upstream_msg_iters,
+                               muxer_msg_iter->active_muxer_upstream_msg_iters,
                                i);
 
                status = validate_muxer_upstream_msg_iter(
-                       muxer_upstream_msg_iter);
+                       muxer_upstream_msg_iter, &is_ended);
                if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
                        if (status < 0) {
                                BT_LOGE("Cannot validate muxer's upstream message iterator wrapper: "
@@ -921,20 +943,25 @@ bt_self_message_iterator_status validate_muxer_upstream_msg_iters(
                }
 
                /*
-                * Remove this muxer upstream message iterator
-                * if it's ended or canceled.
+                * Move this muxer upstream message iterator to the
+                * array of ended iterators if it's ended.
                 */
-               if (!muxer_upstream_msg_iter->msg_iter) {
+               if (unlikely(is_ended)) {
+                       BT_LOGV("Muxer's upstream message iterator wrapper: ended or canceled: "
+                               "muxer-msg-iter-addr=%p, "
+                               "muxer-upstream-msg-iter-wrap-addr=%p",
+                               muxer_msg_iter, muxer_upstream_msg_iter);
+                       g_ptr_array_add(
+                               muxer_msg_iter->ended_muxer_upstream_msg_iters,
+                               muxer_upstream_msg_iter);
+                       muxer_msg_iter->active_muxer_upstream_msg_iters->pdata[i] = NULL;
+
                        /*
                         * Use g_ptr_array_remove_fast() because the
                         * order of those elements is not important.
                         */
-                       BT_LOGV("Removing muxer's upstream message iterator wrapper: ended or canceled: "
-                               "muxer-msg-iter-addr=%p, "
-                               "muxer-upstream-msg-iter-wrap-addr=%p",
-                               muxer_msg_iter, muxer_upstream_msg_iter);
                        g_ptr_array_remove_index_fast(
-                               muxer_msg_iter->muxer_upstream_msg_iters,
+                               muxer_msg_iter->active_muxer_upstream_msg_iters,
                                i);
                        i--;
                }
@@ -1062,10 +1089,16 @@ void destroy_muxer_msg_iter(struct muxer_msg_iter *muxer_msg_iter)
        BT_LOGD("Destroying muxer component's message iterator: "
                "muxer-msg-iter-addr=%p", muxer_msg_iter);
 
-       if (muxer_msg_iter->muxer_upstream_msg_iters) {
-               BT_LOGD_STR("Destroying muxer's upstream message iterator wrappers.");
+       if (muxer_msg_iter->active_muxer_upstream_msg_iters) {
+               BT_LOGD_STR("Destroying muxer's active upstream message iterator wrappers.");
+               g_ptr_array_free(
+                       muxer_msg_iter->active_muxer_upstream_msg_iters, TRUE);
+       }
+
+       if (muxer_msg_iter->ended_muxer_upstream_msg_iters) {
+               BT_LOGD_STR("Destroying muxer's ended upstream message iterator wrappers.");
                g_ptr_array_free(
-                       muxer_msg_iter->muxer_upstream_msg_iters, TRUE);
+                       muxer_msg_iter->ended_muxer_upstream_msg_iters, TRUE);
        }
 
        g_free(muxer_msg_iter);
@@ -1171,10 +1204,18 @@ bt_self_message_iterator_status muxer_msg_iter_init(
        }
 
        muxer_msg_iter->last_returned_ts_ns = INT64_MIN;
-       muxer_msg_iter->muxer_upstream_msg_iters =
+       muxer_msg_iter->active_muxer_upstream_msg_iters =
                g_ptr_array_new_with_free_func(
                        (GDestroyNotify) destroy_muxer_upstream_msg_iter);
-       if (!muxer_msg_iter->muxer_upstream_msg_iters) {
+       if (!muxer_msg_iter->active_muxer_upstream_msg_iters) {
+               BT_LOGE_STR("Failed to allocate a GPtrArray.");
+               goto error;
+       }
+
+       muxer_msg_iter->ended_muxer_upstream_msg_iters =
+               g_ptr_array_new_with_free_func(
+                       (GDestroyNotify) destroy_muxer_upstream_msg_iter);
+       if (!muxer_msg_iter->ended_muxer_upstream_msg_iters) {
                BT_LOGE_STR("Failed to allocate a GPtrArray.");
                goto error;
        }
@@ -1208,8 +1249,7 @@ end:
 }
 
 BT_HIDDEN
-void muxer_msg_iter_finalize(
-               bt_self_message_iterator *self_msg_iter)
+void muxer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
 {
        struct muxer_msg_iter *muxer_msg_iter =
                bt_self_message_iterator_get_data(self_msg_iter);
@@ -1294,18 +1334,16 @@ end:
        return status;
 }
 
-BT_HIDDEN
-bt_bool muxer_msg_iter_can_seek_beginning(
-               bt_self_message_iterator *self_msg_iter)
+static inline
+bt_bool muxer_upstream_msg_iters_can_all_seek_beginning(
+               GPtrArray *muxer_upstream_msg_iters)
 {
-       struct muxer_msg_iter *muxer_msg_iter =
-               bt_self_message_iterator_get_data(self_msg_iter);
        uint64_t i;
        bt_bool ret = BT_TRUE;
 
-       for (i = 0; i < muxer_msg_iter->muxer_upstream_msg_iters->len; i++) {
+       for (i = 0; i < muxer_upstream_msg_iters->len; i++) {
                struct muxer_upstream_msg_iter *upstream_msg_iter =
-                       muxer_msg_iter->muxer_upstream_msg_iters->pdata[i];
+                       muxer_upstream_msg_iters->pdata[i];
 
                if (!bt_self_component_port_input_message_iterator_can_seek_beginning(
                                upstream_msg_iter->msg_iter)) {
@@ -1318,6 +1356,30 @@ end:
        return ret;
 }
 
+BT_HIDDEN
+bt_bool muxer_msg_iter_can_seek_beginning(
+               bt_self_message_iterator *self_msg_iter)
+{
+       struct muxer_msg_iter *muxer_msg_iter =
+               bt_self_message_iterator_get_data(self_msg_iter);
+       bt_bool ret = BT_TRUE;
+
+       if (!muxer_upstream_msg_iters_can_all_seek_beginning(
+                       muxer_msg_iter->active_muxer_upstream_msg_iters)) {
+               ret = BT_FALSE;
+               goto end;
+       }
+
+       if (!muxer_upstream_msg_iters_can_all_seek_beginning(
+                       muxer_msg_iter->ended_muxer_upstream_msg_iters)) {
+               ret = BT_FALSE;
+               goto end;
+       }
+
+end:
+       return ret;
+}
+
 BT_HIDDEN
 bt_self_message_iterator_status muxer_msg_iter_seek_beginning(
                bt_self_message_iterator *self_msg_iter)
@@ -1327,18 +1389,52 @@ bt_self_message_iterator_status muxer_msg_iter_seek_beginning(
        int status;
        uint64_t i;
 
-       for (i = 0; i < muxer_msg_iter->muxer_upstream_msg_iters->len; i++) {
+       /* Seek all ended upstream iterators first */
+       for (i = 0; i < muxer_msg_iter->ended_muxer_upstream_msg_iters->len;
+                       i++) {
                struct muxer_upstream_msg_iter *upstream_msg_iter =
-                       muxer_msg_iter->muxer_upstream_msg_iters->pdata[i];
+                       muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i];
 
                status = bt_self_component_port_input_message_iterator_seek_beginning(
                        upstream_msg_iter->msg_iter);
                if (status != BT_MESSAGE_ITERATOR_STATUS_OK) {
                        goto end;
                }
+
+               empty_message_queue(upstream_msg_iter);
+       }
+
+       /* Seek all previously active upstream iterators */
+       for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len;
+                       i++) {
+               struct muxer_upstream_msg_iter *upstream_msg_iter =
+                       muxer_msg_iter->active_muxer_upstream_msg_iters->pdata[i];
+
+               status = bt_self_component_port_input_message_iterator_seek_beginning(
+                       upstream_msg_iter->msg_iter);
+               if (status != BT_MESSAGE_ITERATOR_STATUS_OK) {
+                       goto end;
+               }
+
+               empty_message_queue(upstream_msg_iter);
+       }
+
+       /* Make them all active */
+       for (i = 0; i < muxer_msg_iter->ended_muxer_upstream_msg_iters->len;
+                       i++) {
+               struct muxer_upstream_msg_iter *upstream_msg_iter =
+                       muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i];
+
+               g_ptr_array_add(muxer_msg_iter->active_muxer_upstream_msg_iters,
+                       upstream_msg_iter);
+               muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i] = NULL;
        }
 
+       g_ptr_array_remove_range(muxer_msg_iter->ended_muxer_upstream_msg_iters,
+               0, muxer_msg_iter->ended_muxer_upstream_msg_iters->len);
        muxer_msg_iter->last_returned_ts_ns = INT64_MIN;
+       muxer_msg_iter->clock_class_expectation =
+               MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY;
 
 end:
        return status;
This page took 0.029323 seconds and 4 git commands to generate.