flt.utils.muxer: do not release upstream iterators when they're ended
[babeltrace.git] / plugins / utils / muxer / muxer.c
index 05c844b0729109e929a8f4ef30e96f2cf5e801a0..ad285dfe4ad1b14c6de76a5662c93c92be69c1d4 100644 (file)
@@ -77,7 +77,16 @@ struct muxer_msg_iter {
         * another data structure is faster than this for our typical
         * use cases.
         */
-       GPtrArray *muxer_upstream_msg_iters;
+       GPtrArray *active_muxer_upstream_msg_iters;
+
+       /*
+        * Array of struct muxer_upstream_msg_iter * (owned by this).
+        *
+        * We move ended message iterators from
+        * `active_muxer_upstream_msg_iters` to this array so as to be
+        * able to restore them when seeking.
+        */
+       GPtrArray *ended_muxer_upstream_msg_iters;
 
        /* Last time returned in a message */
        int64_t last_returned_ts_ns;
@@ -93,6 +102,16 @@ struct muxer_msg_iter {
        unsigned char expected_clock_class_uuid[BABELTRACE_UUID_LEN];
 };
 
+static
+void empty_message_queue(struct muxer_upstream_msg_iter *upstream_msg_iter)
+{
+       const bt_message *msg;
+
+       while ((msg = g_queue_pop_head(upstream_msg_iter->msgs))) {
+               bt_message_put_ref(msg);
+       }
+}
+
 static
 void destroy_muxer_upstream_msg_iter(
                struct muxer_upstream_msg_iter *muxer_upstream_msg_iter)
@@ -106,16 +125,11 @@ void destroy_muxer_upstream_msg_iter(
                muxer_upstream_msg_iter,
                muxer_upstream_msg_iter->msg_iter,
                muxer_upstream_msg_iter->msgs->length);
-       bt_self_component_port_input_message_iterator_put_ref(muxer_upstream_msg_iter->msg_iter);
+       bt_self_component_port_input_message_iterator_put_ref(
+               muxer_upstream_msg_iter->msg_iter);
 
        if (muxer_upstream_msg_iter->msgs) {
-               const bt_message *msg;
-
-               while ((msg = g_queue_pop_head(
-                               muxer_upstream_msg_iter->msgs))) {
-                       bt_message_put_ref(msg);
-               }
-
+               empty_message_queue(muxer_upstream_msg_iter);
                g_queue_free(muxer_upstream_msg_iter->msgs);
        }
 
@@ -143,7 +157,7 @@ struct muxer_upstream_msg_iter *muxer_msg_iter_add_upstream_msg_iter(
                goto end;
        }
 
-       g_ptr_array_add(muxer_msg_iter->muxer_upstream_msg_iters,
+       g_ptr_array_add(muxer_msg_iter->active_muxer_upstream_msg_iters,
                muxer_upstream_msg_iter);
        BT_LOGD("Added muxer's upstream message iterator wrapper: "
                "addr=%p, muxer-msg-iter-addr=%p, msg-iter-addr=%p",
@@ -409,7 +423,8 @@ end:
 
 static
 bt_self_message_iterator_status muxer_upstream_msg_iter_next(
-               struct muxer_upstream_msg_iter *muxer_upstream_msg_iter)
+               struct muxer_upstream_msg_iter *muxer_upstream_msg_iter,
+               bool *is_ended)
 {
        bt_self_message_iterator_status status;
        bt_message_iterator_status input_port_iter_status;
@@ -461,7 +476,7 @@ bt_self_message_iterator_status muxer_upstream_msg_iter_next(
                 * won't be considered again to find the youngest
                 * message.
                 */
-               BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(muxer_upstream_msg_iter->msg_iter);
+               *is_ended = true;
                status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
                break;
        default:
@@ -813,10 +828,13 @@ muxer_msg_iter_youngest_upstream_msg_iter(
        BT_ASSERT(muxer_upstream_msg_iter);
        *muxer_upstream_msg_iter = NULL;
 
-       for (i = 0; i < muxer_msg_iter->muxer_upstream_msg_iters->len; i++) {
+       for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len;
+                       i++) {
                const bt_message *msg;
                struct muxer_upstream_msg_iter *cur_muxer_upstream_msg_iter =
-                       g_ptr_array_index(muxer_msg_iter->muxer_upstream_msg_iters, i);
+                       g_ptr_array_index(
+                               muxer_msg_iter->active_muxer_upstream_msg_iters,
+                               i);
                int64_t msg_ts_ns;
 
                if (!cur_muxer_upstream_msg_iter->msg_iter) {
@@ -858,7 +876,8 @@ end:
 
 static
 bt_self_message_iterator_status validate_muxer_upstream_msg_iter(
-       struct muxer_upstream_msg_iter *muxer_upstream_msg_iter)
+       struct muxer_upstream_msg_iter *muxer_upstream_msg_iter,
+       bool *is_ended)
 {
        bt_self_message_iterator_status status =
                BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
@@ -877,7 +896,8 @@ bt_self_message_iterator_status validate_muxer_upstream_msg_iter(
        }
 
        /* muxer_upstream_msg_iter_next() logs details/errors */
-       status = muxer_upstream_msg_iter_next(muxer_upstream_msg_iter);
+       status = muxer_upstream_msg_iter_next(muxer_upstream_msg_iter,
+               is_ended);
 
 end:
        return status;
@@ -890,18 +910,20 @@ bt_self_message_iterator_status validate_muxer_upstream_msg_iters(
        bt_self_message_iterator_status status =
                BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
        size_t i;
+       bool is_ended = false;
 
        BT_LOGV("Validating muxer's upstream message iterator wrappers: "
                "muxer-msg-iter-addr=%p", muxer_msg_iter);
 
-       for (i = 0; i < muxer_msg_iter->muxer_upstream_msg_iters->len; i++) {
+       for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len;
+                       i++) {
                struct muxer_upstream_msg_iter *muxer_upstream_msg_iter =
                        g_ptr_array_index(
-                               muxer_msg_iter->muxer_upstream_msg_iters,
+                               muxer_msg_iter->active_muxer_upstream_msg_iters,
                                i);
 
                status = validate_muxer_upstream_msg_iter(
-                       muxer_upstream_msg_iter);
+                       muxer_upstream_msg_iter, &is_ended);
                if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
                        if (status < 0) {
                                BT_LOGE("Cannot validate muxer's upstream message iterator wrapper: "
@@ -921,20 +943,25 @@ bt_self_message_iterator_status validate_muxer_upstream_msg_iters(
                }
 
                /*
-                * Remove this muxer upstream message iterator
-                * if it's ended or canceled.
+                * Move this muxer upstream message iterator to the
+                * array of ended iterators if it's ended.
                 */
-               if (!muxer_upstream_msg_iter->msg_iter) {
+               if (unlikely(is_ended)) {
+                       BT_LOGV("Muxer's upstream message iterator wrapper: ended or canceled: "
+                               "muxer-msg-iter-addr=%p, "
+                               "muxer-upstream-msg-iter-wrap-addr=%p",
+                               muxer_msg_iter, muxer_upstream_msg_iter);
+                       g_ptr_array_add(
+                               muxer_msg_iter->ended_muxer_upstream_msg_iters,
+                               muxer_upstream_msg_iter);
+                       muxer_msg_iter->active_muxer_upstream_msg_iters->pdata[i] = NULL;
+
                        /*
                         * Use g_ptr_array_remove_fast() because the
                         * order of those elements is not important.
                         */
-                       BT_LOGV("Removing muxer's upstream message iterator wrapper: ended or canceled: "
-                               "muxer-msg-iter-addr=%p, "
-                               "muxer-upstream-msg-iter-wrap-addr=%p",
-                               muxer_msg_iter, muxer_upstream_msg_iter);
                        g_ptr_array_remove_index_fast(
-                               muxer_msg_iter->muxer_upstream_msg_iters,
+                               muxer_msg_iter->active_muxer_upstream_msg_iters,
                                i);
                        i--;
                }
@@ -1062,10 +1089,16 @@ void destroy_muxer_msg_iter(struct muxer_msg_iter *muxer_msg_iter)
        BT_LOGD("Destroying muxer component's message iterator: "
                "muxer-msg-iter-addr=%p", muxer_msg_iter);
 
-       if (muxer_msg_iter->muxer_upstream_msg_iters) {
-               BT_LOGD_STR("Destroying muxer's upstream message iterator wrappers.");
+       if (muxer_msg_iter->active_muxer_upstream_msg_iters) {
+               BT_LOGD_STR("Destroying muxer's active upstream message iterator wrappers.");
+               g_ptr_array_free(
+                       muxer_msg_iter->active_muxer_upstream_msg_iters, TRUE);
+       }
+
+       if (muxer_msg_iter->ended_muxer_upstream_msg_iters) {
+               BT_LOGD_STR("Destroying muxer's ended upstream message iterator wrappers.");
                g_ptr_array_free(
-                       muxer_msg_iter->muxer_upstream_msg_iters, TRUE);
+                       muxer_msg_iter->ended_muxer_upstream_msg_iters, TRUE);
        }
 
        g_free(muxer_msg_iter);
@@ -1171,10 +1204,18 @@ bt_self_message_iterator_status muxer_msg_iter_init(
        }
 
        muxer_msg_iter->last_returned_ts_ns = INT64_MIN;
-       muxer_msg_iter->muxer_upstream_msg_iters =
+       muxer_msg_iter->active_muxer_upstream_msg_iters =
                g_ptr_array_new_with_free_func(
                        (GDestroyNotify) destroy_muxer_upstream_msg_iter);
-       if (!muxer_msg_iter->muxer_upstream_msg_iters) {
+       if (!muxer_msg_iter->active_muxer_upstream_msg_iters) {
+               BT_LOGE_STR("Failed to allocate a GPtrArray.");
+               goto error;
+       }
+
+       muxer_msg_iter->ended_muxer_upstream_msg_iters =
+               g_ptr_array_new_with_free_func(
+                       (GDestroyNotify) destroy_muxer_upstream_msg_iter);
+       if (!muxer_msg_iter->ended_muxer_upstream_msg_iters) {
                BT_LOGE_STR("Failed to allocate a GPtrArray.");
                goto error;
        }
@@ -1208,8 +1249,7 @@ end:
 }
 
 BT_HIDDEN
-void muxer_msg_iter_finalize(
-               bt_self_message_iterator *self_msg_iter)
+void muxer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
 {
        struct muxer_msg_iter *muxer_msg_iter =
                bt_self_message_iterator_get_data(self_msg_iter);
@@ -1294,18 +1334,16 @@ end:
        return status;
 }
 
-BT_HIDDEN
-bt_bool muxer_msg_iter_can_seek_beginning(
-               bt_self_message_iterator *self_msg_iter)
+static inline
+bt_bool muxer_upstream_msg_iters_can_all_seek_beginning(
+               GPtrArray *muxer_upstream_msg_iters)
 {
-       struct muxer_msg_iter *muxer_msg_iter =
-               bt_self_message_iterator_get_data(self_msg_iter);
        uint64_t i;
        bt_bool ret = BT_TRUE;
 
-       for (i = 0; i < muxer_msg_iter->muxer_upstream_msg_iters->len; i++) {
+       for (i = 0; i < muxer_upstream_msg_iters->len; i++) {
                struct muxer_upstream_msg_iter *upstream_msg_iter =
-                       muxer_msg_iter->muxer_upstream_msg_iters->pdata[i];
+                       muxer_upstream_msg_iters->pdata[i];
 
                if (!bt_self_component_port_input_message_iterator_can_seek_beginning(
                                upstream_msg_iter->msg_iter)) {
@@ -1318,6 +1356,30 @@ end:
        return ret;
 }
 
+BT_HIDDEN
+bt_bool muxer_msg_iter_can_seek_beginning(
+               bt_self_message_iterator *self_msg_iter)
+{
+       struct muxer_msg_iter *muxer_msg_iter =
+               bt_self_message_iterator_get_data(self_msg_iter);
+       bt_bool ret = BT_TRUE;
+
+       if (!muxer_upstream_msg_iters_can_all_seek_beginning(
+                       muxer_msg_iter->active_muxer_upstream_msg_iters)) {
+               ret = BT_FALSE;
+               goto end;
+       }
+
+       if (!muxer_upstream_msg_iters_can_all_seek_beginning(
+                       muxer_msg_iter->ended_muxer_upstream_msg_iters)) {
+               ret = BT_FALSE;
+               goto end;
+       }
+
+end:
+       return ret;
+}
+
 BT_HIDDEN
 bt_self_message_iterator_status muxer_msg_iter_seek_beginning(
                bt_self_message_iterator *self_msg_iter)
@@ -1327,18 +1389,52 @@ bt_self_message_iterator_status muxer_msg_iter_seek_beginning(
        int status;
        uint64_t i;
 
-       for (i = 0; i < muxer_msg_iter->muxer_upstream_msg_iters->len; i++) {
+       /* Seek all ended upstream iterators first */
+       for (i = 0; i < muxer_msg_iter->ended_muxer_upstream_msg_iters->len;
+                       i++) {
                struct muxer_upstream_msg_iter *upstream_msg_iter =
-                       muxer_msg_iter->muxer_upstream_msg_iters->pdata[i];
+                       muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i];
 
                status = bt_self_component_port_input_message_iterator_seek_beginning(
                        upstream_msg_iter->msg_iter);
                if (status != BT_MESSAGE_ITERATOR_STATUS_OK) {
                        goto end;
                }
+
+               empty_message_queue(upstream_msg_iter);
+       }
+
+       /* Seek all previously active upstream iterators */
+       for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len;
+                       i++) {
+               struct muxer_upstream_msg_iter *upstream_msg_iter =
+                       muxer_msg_iter->active_muxer_upstream_msg_iters->pdata[i];
+
+               status = bt_self_component_port_input_message_iterator_seek_beginning(
+                       upstream_msg_iter->msg_iter);
+               if (status != BT_MESSAGE_ITERATOR_STATUS_OK) {
+                       goto end;
+               }
+
+               empty_message_queue(upstream_msg_iter);
+       }
+
+       /* Make them all active */
+       for (i = 0; i < muxer_msg_iter->ended_muxer_upstream_msg_iters->len;
+                       i++) {
+               struct muxer_upstream_msg_iter *upstream_msg_iter =
+                       muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i];
+
+               g_ptr_array_add(muxer_msg_iter->active_muxer_upstream_msg_iters,
+                       upstream_msg_iter);
+               muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i] = NULL;
        }
 
+       g_ptr_array_remove_range(muxer_msg_iter->ended_muxer_upstream_msg_iters,
+               0, muxer_msg_iter->ended_muxer_upstream_msg_iters->len);
        muxer_msg_iter->last_returned_ts_ns = INT64_MIN;
+       muxer_msg_iter->clock_class_expectation =
+               MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY;
 
 end:
        return status;
This page took 0.027722 seconds and 4 git commands to generate.