lib: add sink component class's "graph is configured" method
[babeltrace.git] / plugins / utils / muxer / muxer.c
index a6a71b7a8ce3283faf27a21ca8e34ef90199a969..aa5cbb23645c1bef5522e5c2cfc0da25b478962d 100644 (file)
 #define ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME       "assume-absolute-clock-classes"
 
 struct muxer_comp {
-       /*
-        * Array of struct
-        * bt_self_message_iterator *
-        * (weak refs)
-        */
-       GPtrArray *muxer_msg_iters;
-
        /* Weak ref */
        bt_self_component_filter *self_comp;
 
@@ -86,17 +79,6 @@ struct muxer_msg_iter {
         */
        GPtrArray *muxer_upstream_msg_iters;
 
-       /*
-        * List of "recently" connected input ports (weak) to
-        * handle by this muxer message iterator.
-        * muxer_port_connected() adds entries to this list, and the
-        * entries are removed when a message iterator is created
-        * on the port's connection and put into
-        * muxer_upstream_msg_iters above by
-        * muxer_msg_iter_handle_newly_connected_ports().
-        */
-       GList *newly_connected_self_ports;
-
        /* Last time returned in a message */
        int64_t last_returned_ts_ns;
 
@@ -173,7 +155,7 @@ end:
 }
 
 static
-bt_self_component_status ensure_available_input_port(
+bt_self_component_status add_available_input_port(
                bt_self_component_filter *self_comp)
 {
        struct muxer_comp *muxer_comp = bt_self_component_get_data(
@@ -182,11 +164,6 @@ bt_self_component_status ensure_available_input_port(
        GString *port_name = NULL;
 
        BT_ASSERT(muxer_comp);
-
-       if (muxer_comp->available_input_ports >= 1) {
-               goto end;
-       }
-
        port_name = g_string_new("in");
        if (!port_name) {
                BT_LOGE_STR("Failed to allocate a GString.");
@@ -210,6 +187,7 @@ bt_self_component_status ensure_available_input_port(
        BT_LOGD("Added one input port to muxer component: "
                "port-name=\"%s\", comp-addr=%p",
                port_name->str, self_comp);
+
 end:
        if (port_name) {
                g_string_free(port_name, TRUE);
@@ -233,15 +211,6 @@ void destroy_muxer_comp(struct muxer_comp *muxer_comp)
                return;
        }
 
-       BT_LOGD("Destroying muxer component: muxer-comp-addr=%p, "
-               "muxer-msg-iter-count=%u", muxer_comp,
-               muxer_comp->muxer_msg_iters ?
-                       muxer_comp->muxer_msg_iters->len : 0);
-
-       if (muxer_comp->muxer_msg_iters) {
-               g_ptr_array_free(muxer_comp->muxer_msg_iters, TRUE);
-       }
-
        g_free(muxer_comp);
 }
 
@@ -331,8 +300,6 @@ BT_HIDDEN
 bt_self_component_status muxer_init(
                bt_self_component_filter *self_comp,
                const bt_value *params, void *init_data)
-
-
 {
        int ret;
        bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
@@ -354,17 +321,11 @@ bt_self_component_status muxer_init(
                goto error;
        }
 
-       muxer_comp->muxer_msg_iters = g_ptr_array_new();
-       if (!muxer_comp->muxer_msg_iters) {
-               BT_LOGE_STR("Failed to allocate a GPtrArray.");
-               goto error;
-       }
-
        muxer_comp->self_comp = self_comp;
        bt_self_component_set_data(
                bt_self_component_filter_as_self_component(self_comp),
                muxer_comp);
-       status = ensure_available_input_port(self_comp);
+       status = add_available_input_port(self_comp);
        if (status != BT_SELF_COMPONENT_STATUS_OK) {
                BT_LOGE("Cannot ensure that at least one muxer component's input port is available: "
                        "muxer-comp-addr=%p, status=%s",
@@ -415,8 +376,7 @@ void muxer_finalize(bt_self_component_filter *self_comp)
 
 static
 bt_self_component_port_input_message_iterator *
-create_msg_iter_on_input_port(
-               bt_self_component_port_input *self_port, int *ret)
+create_msg_iter_on_input_port(bt_self_component_port_input *self_port)
 {
        const bt_port *port = bt_self_component_port_as_port(
                bt_self_component_port_input_as_self_component_port(
@@ -424,8 +384,6 @@ create_msg_iter_on_input_port(
        bt_self_component_port_input_message_iterator *msg_iter =
                NULL;
 
-       BT_ASSERT(ret);
-       *ret = 0;
        BT_ASSERT(port);
        BT_ASSERT(bt_port_is_connected(port));
 
@@ -438,7 +396,6 @@ create_msg_iter_on_input_port(
                BT_LOGE("Cannot create upstream message iterator on input port: "
                        "port-addr=%p, port-name=\"%s\"",
                        port, bt_port_get_name(port));
-               *ret = -1;
                goto end;
        }
 
@@ -518,90 +475,6 @@ bt_self_message_iterator_status muxer_upstream_msg_iter_next(
        return status;
 }
 
-static
-int muxer_msg_iter_handle_newly_connected_ports(
-               struct muxer_msg_iter *muxer_msg_iter)
-{
-       int ret = 0;
-
-       BT_LOGV("Handling newly connected ports: "
-               "muxer-msg-iter-addr=%p", muxer_msg_iter);
-
-       /*
-        * Here we create one upstream message iterator for each
-        * newly connected port. We do NOT perform an initial "next" on
-        * those new upstream message iterators: they are
-        * invalidated, to be validated later. The list of newly
-        * connected ports to handle here is updated by
-        * muxer_port_connected().
-        */
-       while (true) {
-               GList *node = muxer_msg_iter->newly_connected_self_ports;
-               bt_self_component_port_input *self_port;
-               const bt_port *port;
-               bt_self_component_port_input_message_iterator *
-                       upstream_msg_iter = NULL;
-               struct muxer_upstream_msg_iter *muxer_upstream_msg_iter;
-
-               if (!node) {
-                       break;
-               }
-
-               self_port = node->data;
-               port = bt_self_component_port_as_port(
-                       bt_self_component_port_input_as_self_component_port(
-                               (self_port)));
-               BT_ASSERT(port);
-
-               if (!bt_port_is_connected(port)) {
-                       /*
-                        * Looks like this port is not connected
-                        * anymore: we can't create an upstream
-                        * message iterator on its (non-existing)
-                        * connection in this case.
-                        */
-                       goto remove_node;
-               }
-
-               upstream_msg_iter = create_msg_iter_on_input_port(
-                       self_port, &ret);
-               if (ret) {
-                       /* create_msg_iter_on_input_port() logs errors */
-                       BT_ASSERT(!upstream_msg_iter);
-                       goto error;
-               }
-
-               muxer_upstream_msg_iter =
-                       muxer_msg_iter_add_upstream_msg_iter(
-                               muxer_msg_iter, upstream_msg_iter);
-               BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(upstream_msg_iter);
-               if (!muxer_upstream_msg_iter) {
-                       /*
-                        * muxer_msg_iter_add_upstream_msg_iter()
-                        * logs errors.
-                        */
-                       goto error;
-               }
-
-remove_node:
-               bt_self_component_port_input_message_iterator_put_ref(upstream_msg_iter);
-               muxer_msg_iter->newly_connected_self_ports =
-                       g_list_delete_link(
-                               muxer_msg_iter->newly_connected_self_ports,
-                               node);
-       }
-
-       goto end;
-
-error:
-       if (ret >= 0) {
-               ret = -1;
-       }
-
-end:
-       return ret;
-}
-
 static
 int get_msg_ts_ns(struct muxer_comp *muxer_comp,
                struct muxer_msg_iter *muxer_msg_iter,
@@ -915,7 +788,6 @@ end:
  * This function does NOT:
  *
  * * Update any upstream message iterator.
- * * Check for newly connected ports.
  * * Check the upstream message iterators to retry.
  *
  * On sucess, this function sets *muxer_upstream_msg_iter to the
@@ -1078,47 +950,16 @@ bt_self_message_iterator_status muxer_msg_iter_do_next_one(
                struct muxer_msg_iter *muxer_msg_iter,
                const bt_message **msg)
 {
-       bt_self_message_iterator_status status =
-               BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
+       bt_self_message_iterator_status status;
        struct muxer_upstream_msg_iter *muxer_upstream_msg_iter = NULL;
        int64_t next_return_ts;
 
-       while (true) {
-               int ret = muxer_msg_iter_handle_newly_connected_ports(
-                       muxer_msg_iter);
-
-               if (ret) {
-                       BT_LOGE("Cannot handle newly connected input ports for muxer's message iterator: "
-                               "muxer-comp-addr=%p, muxer-msg-iter-addr=%p, "
-                               "ret=%d",
-                               muxer_comp, muxer_msg_iter, ret);
-                       status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
-                       goto end;
-               }
-
-               status = validate_muxer_upstream_msg_iters(muxer_msg_iter);
-               if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
-                       /* validate_muxer_upstream_msg_iters() logs details */
-                       goto end;
-               }
-
-               /*
-                * At this point, we know that all the existing upstream
-                * message iterators are valid. However the
-                * operations to validate them (during
-                * validate_muxer_upstream_msg_iters()) may have
-                * connected new ports. If no ports were connected
-                * during this operation, exit the loop.
-                */
-               if (!muxer_msg_iter->newly_connected_self_ports) {
-                       BT_LOGV("Not breaking this loop: muxer's message iterator still has newly connected input ports to handle: "
-                               "muxer-comp-addr=%p", muxer_comp);
-                       break;
-               }
+       status = validate_muxer_upstream_msg_iters(muxer_msg_iter);
+       if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
+               /* validate_muxer_upstream_msg_iters() logs details */
+               goto end;
        }
 
-       BT_ASSERT(!muxer_msg_iter->newly_connected_self_ports);
-
        /*
         * At this point we know that all the existing upstream
         * message iterators are valid. We can find the one,
@@ -1227,23 +1068,17 @@ void destroy_muxer_msg_iter(struct muxer_msg_iter *muxer_msg_iter)
                        muxer_msg_iter->muxer_upstream_msg_iters, TRUE);
        }
 
-       g_list_free(muxer_msg_iter->newly_connected_self_ports);
        g_free(muxer_msg_iter);
 }
 
 static
-int muxer_msg_iter_init_newly_connected_ports(struct muxer_comp *muxer_comp,
+int muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp,
                struct muxer_msg_iter *muxer_msg_iter)
 {
        int64_t count;
        int64_t i;
        int ret = 0;
 
-       /*
-        * Add the connected input ports to this muxer message
-        * iterator's list of newly connected ports. They will be
-        * handled by muxer_msg_iter_handle_newly_connected_ports().
-        */
        count = bt_component_filter_get_input_port_count(
                bt_self_component_filter_as_component_filter(
                        muxer_comp->self_comp));
@@ -1255,6 +1090,8 @@ int muxer_msg_iter_init_newly_connected_ports(struct muxer_comp *muxer_comp,
        }
 
        for (i = 0; i < count; i++) {
+               bt_self_component_port_input_message_iterator *upstream_msg_iter;
+               struct muxer_upstream_msg_iter *muxer_upstream_msg_iter;
                bt_self_component_port_input *self_port =
                        bt_self_component_filter_borrow_input_port_by_index(
                                muxer_comp->self_comp, i);
@@ -1267,29 +1104,28 @@ int muxer_msg_iter_init_newly_connected_ports(struct muxer_comp *muxer_comp,
                BT_ASSERT(port);
 
                if (!bt_port_is_connected(port)) {
-                       BT_LOGD("Skipping input port: not connected: "
-                               "muxer-comp-addr=%p, port-addr=%p, port-name\"%s\"",
-                               muxer_comp, port, bt_port_get_name(port));
+                       /* Skip non-connected port */
                        continue;
                }
 
-               muxer_msg_iter->newly_connected_self_ports =
-                       g_list_append(
-                               muxer_msg_iter->newly_connected_self_ports,
-                               self_port);
-               if (!muxer_msg_iter->newly_connected_self_ports) {
-                       BT_LOGE("Cannot append port to muxer's message iterator list of newly connected input ports: "
-                               "port-addr=%p, port-name=\"%s\", "
-                               "muxer-msg-iter-addr=%p", port,
-                               bt_port_get_name(port), muxer_msg_iter);
+               upstream_msg_iter = create_msg_iter_on_input_port(self_port);
+               if (!upstream_msg_iter) {
+                       /* create_msg_iter_on_input_port() logs errors */
+                       BT_ASSERT(!upstream_msg_iter);
                        ret = -1;
                        goto end;
                }
 
-               BT_LOGD("Appended port to muxer's message iterator list of newly connected input ports: "
-                       "port-addr=%p, port-name=\"%s\", "
-                       "muxer-msg-iter-addr=%p", port,
-                       bt_port_get_name(port), muxer_msg_iter);
+               muxer_upstream_msg_iter =
+                       muxer_msg_iter_add_upstream_msg_iter(
+                               muxer_msg_iter, upstream_msg_iter);
+               bt_self_component_port_input_message_iterator_put_ref(
+                       upstream_msg_iter);
+               if (!muxer_upstream_msg_iter) {
+                       /* muxer_msg_iter_add_upstream_msg_iter() logs errors */
+                       ret = -1;
+                       goto end;
+               }
        }
 
 end:
@@ -1343,19 +1179,10 @@ bt_self_message_iterator_status muxer_msg_iter_init(
                goto error;
        }
 
-       /*
-        * Add the muxer message iterator to the component's array
-        * of muxer message iterators here because
-        * muxer_msg_iter_init_newly_connected_ports() can cause
-        * muxer_port_connected() to be called, which adds the newly
-        * connected port to each muxer message iterator's list of
-        * newly connected ports.
-        */
-       g_ptr_array_add(muxer_comp->muxer_msg_iters, muxer_msg_iter);
-       ret = muxer_msg_iter_init_newly_connected_ports(muxer_comp,
+       ret = muxer_msg_iter_init_upstream_iterators(muxer_comp,
                muxer_msg_iter);
        if (ret) {
-               BT_LOGE("Cannot initialize newly connected input ports for muxer component's message iterator: "
+               BT_LOGE("Cannot initialize connected input ports for muxer component's message iterator: "
                        "comp-addr=%p, muxer-comp-addr=%p, "
                        "muxer-msg-iter-addr=%p, msg-iter-addr=%p, ret=%d",
                        self_comp, muxer_comp, muxer_msg_iter,
@@ -1363,8 +1190,7 @@ bt_self_message_iterator_status muxer_msg_iter_init(
                goto error;
        }
 
-       bt_self_message_iterator_set_data(self_msg_iter,
-               muxer_msg_iter);
+       bt_self_message_iterator_set_data(self_msg_iter, muxer_msg_iter);
        BT_LOGD("Initialized muxer component's message iterator: "
                "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, "
                "msg-iter-addr=%p",
@@ -1372,15 +1198,8 @@ bt_self_message_iterator_status muxer_msg_iter_init(
        goto end;
 
 error:
-       if (g_ptr_array_index(muxer_comp->muxer_msg_iters,
-                       muxer_comp->muxer_msg_iters->len - 1) == muxer_msg_iter) {
-               g_ptr_array_remove_index(muxer_comp->muxer_msg_iters,
-                       muxer_comp->muxer_msg_iters->len - 1);
-       }
-
        destroy_muxer_msg_iter(muxer_msg_iter);
-       bt_self_message_iterator_set_data(self_msg_iter,
-               NULL);
+       bt_self_message_iterator_set_data(self_msg_iter, NULL);
        status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
 
 end:
@@ -1406,9 +1225,7 @@ void muxer_msg_iter_finalize(
                "msg-iter-addr=%p",
                self_comp, muxer_comp, muxer_msg_iter, self_msg_iter);
 
-       if (muxer_comp) {
-               (void) g_ptr_array_remove_fast(muxer_comp->muxer_msg_iters,
-                       muxer_msg_iter);
+       if (muxer_msg_iter) {
                destroy_muxer_msg_iter(muxer_msg_iter);
        }
 }
@@ -1459,69 +1276,17 @@ bt_self_component_status muxer_input_port_connected(
                bt_self_component_port_input *self_port,
                const bt_port_output *other_port)
 {
-       bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
-       const bt_port *port = bt_self_component_port_as_port(
-               bt_self_component_port_input_as_self_component_port(
-                       self_port));
-       struct muxer_comp *muxer_comp =
-               bt_self_component_get_data(
-                       bt_self_component_filter_as_self_component(
-                               self_comp));
-       size_t i;
-       int ret;
-
-       BT_ASSERT(port);
-       BT_ASSERT(muxer_comp);
-       BT_LOGD("Port connected: "
-               "comp-addr=%p, muxer-comp-addr=%p, "
-               "port-addr=%p, port-name=\"%s\", "
-               "other-port-addr=%p, other-port-name=\"%s\"",
-               self_comp, muxer_comp, self_port, bt_port_get_name(port),
-               other_port,
-               bt_port_get_name(bt_port_output_as_port_const(other_port)));
-
-       for (i = 0; i < muxer_comp->muxer_msg_iters->len; i++) {
-               struct muxer_msg_iter *muxer_msg_iter =
-                       g_ptr_array_index(muxer_comp->muxer_msg_iters, i);
-
-               /*
-                * Add this port to the list of newly connected ports
-                * for this muxer message iterator. We append at
-                * the end of this list while
-                * muxer_msg_iter_handle_newly_connected_ports()
-                * removes the nodes from the beginning.
-                */
-               muxer_msg_iter->newly_connected_self_ports =
-                       g_list_append(
-                               muxer_msg_iter->newly_connected_self_ports,
-                               self_port);
-               if (!muxer_msg_iter->newly_connected_self_ports) {
-                       BT_LOGE("Cannot append port to muxer's message iterator list of newly connected input ports: "
-                               "port-addr=%p, port-name=\"%s\", "
-                               "muxer-msg-iter-addr=%p", self_port,
-                               bt_port_get_name(port), muxer_msg_iter);
-                       status = BT_SELF_COMPONENT_STATUS_ERROR;
-                       goto end;
-               }
-
-               BT_LOGD("Appended port to muxer's message iterator list of newly connected input ports: "
-                       "port-addr=%p, port-name=\"%s\", "
-                       "muxer-msg-iter-addr=%p", self_port,
-                       bt_port_get_name(port), muxer_msg_iter);
-       }
+       bt_self_component_status status;
 
-       /* One less available input port */
-       muxer_comp->available_input_ports--;
-       ret = ensure_available_input_port(self_comp);
-       if (ret) {
+       status = add_available_input_port(self_comp);
+       if (status) {
                /*
                 * Only way to report an error later since this
                 * method does not return anything.
                 */
-               BT_LOGE("Cannot ensure that at least one muxer component's input port is available: "
-                       "muxer-comp-addr=%p, status=%s",
-                       muxer_comp, bt_self_component_status_string(ret));
-               status = BT_SELF_COMPONENT_STATUS_ERROR;
+               BT_LOGE("Cannot add one muxer component's input port: "
+                       "status=%s",
+                       bt_self_component_status_string(status));
                goto end;
        }
 
This page took 0.029028 seconds and 4 git commands to generate.