lib: create input port msg iterator from self {msg iterator, sink comp.}
authorPhilippe Proulx <eeppeliteloop@gmail.com>
Sat, 20 Jul 2019 21:16:24 +0000 (17:16 -0400)
committerPhilippe Proulx <eeppeliteloop@gmail.com>
Wed, 24 Jul 2019 14:17:16 +0000 (10:17 -0400)
This patch makes an input port message iterator have a link to the
upstream message iterators it needs and vice versa.

The motivation behind this is to eventually be able to transmit some
state or property automatically to all the upstream message iterators,
recursively, of a given message iterator without any special user code.

For example, consider this graph of message iterators (arrow means "is
an upstream message iterator of"):

    A <──┐
         ├──┬── C <──┬── F
    B <──┘  │        │
            │   E <──┘
    D <─────┘

                G <───── H

Here, setting the state/property on F would also set it on A, B, D, C,
and E. Setting it on C would set it on A, B, and D. Setting it on H
would only set it on G.

The message iterator graph, which can be considered like another,
"dynamic" graph on top of the static component graph used only to limit
a message iterator graph's topology, is always directed and acyclic,
where each node has a single parent. In other words, it is not permitted
that two message iterators use the same downstream message iterator, for
example:

    A <──┐
         ├───── C <───── F
    B <──┘      │        │
                │        │
    D <─────────┴────────┘

(D has both C and F as parents).

This is so that each message iterator and its upstream message iterators
constitute a single, private state.

This patch replaces
bt_self_component_port_input_message_iterator_create() with
bt_self_component_port_input_message_iterator_create_from_message_iterator()
and
bt_self_component_port_input_message_iterator_create_from_sink_component().

bt_self_component_port_input_message_iterator_create_from_sink_component()
does nothing with the self sink component parameter (except checking
that it's not `NULL`), but it's needed to make the functions type safe.

Internally, an input port message iterator contains both a weak pointer
to its downstream message iterator and an array of weak pointers to its
upstream message iterators.

When you create a message iterator with
bt_self_component_port_input_message_iterator_create_from_message_iterator(),
the function sets the new message iterator's downstream message iterator
to the provided self message iterator. It also adds the new message
iterator to the self message iterator's array of upstream message
iterators.

On message iterator finalization:

* The message iterator removes itself as the downstream message iterator
  of all its upstream message iterators. It also clears the upstream
  message iterator array.

* The message iterator removes itself as one of the upstream message
  iterators of its downstream message iterator.

I didn't find any race condition regarding the new message iterator
links, but I could be wrong. The future and more tests will tell us.

In the `bt2` Python package:

* _UserComponentInputPort.create_message_iterator() is removed.

* _UserMessageIterator._create_input_port_message_iterator() is added
  (accepts a user component input port).

* _UserSinkComponent._create_input_port_message_iterator() is added
  (accepts a user component input port).

Tests are updated accordingly.

I added a test for
_UserMessageIterator._create_input_port_message_iterator() specifically
(needs a filter message iterator) because we use
_UserSinkComponent._create_input_port_message_iterator() in the current
tests.

Signed-off-by: Philippe Proulx <eeppeliteloop@gmail.com>
Change-Id: I46cefca445353c453fbd9404b97687b81fa5f443
Reviewed-on: https://review.lttng.org/c/babeltrace/+/1732

18 files changed:
include/babeltrace2/graph/self-component-port-input-message-iterator.h
src/bindings/python/bt2/bt2/component.py
src/bindings/python/bt2/bt2/message_iterator.py
src/bindings/python/bt2/bt2/port.py
src/lib/graph/component-class-sink-colander.c
src/lib/graph/iterator.c
src/lib/graph/message/iterator.h
src/plugins/ctf/fs-sink/fs-sink.c
src/plugins/lttng-utils/debug-info/debug-info.c
src/plugins/text/details/details.c
src/plugins/text/pretty/pretty.c
src/plugins/utils/counter/counter.c
src/plugins/utils/dummy/dummy.c
src/plugins/utils/muxer/muxer.c
src/plugins/utils/trimmer/trimmer.c
tests/bindings/python/bt2/test_error.py
tests/bindings/python/bt2/test_graph.py
tests/bindings/python/bt2/test_message_iterator.py

index 67d6d83290d32d60bca40248864979fb6c73ae28..d4ad8b5966d2d334f2c4bf5179617246837009e5 100644 (file)
@@ -45,7 +45,13 @@ bt_self_component_port_input_message_iterator_as_message_iterator(
 }
 
 extern bt_self_component_port_input_message_iterator *
-bt_self_component_port_input_message_iterator_create(
+bt_self_component_port_input_message_iterator_create_from_message_iterator(
+               bt_self_message_iterator *self_msg_iter,
+               bt_self_component_port_input *input_port);
+
+extern bt_self_component_port_input_message_iterator *
+bt_self_component_port_input_message_iterator_create_from_sink_component(
+               bt_self_component_sink *self_comp,
                bt_self_component_port_input *input_port);
 
 extern bt_component *
index 6d6afc4bb70df6f4e9849129e9a5820f4ecaf039..10b1166300e337ee0fbfe82e828b9beb70e4f848 100644 (file)
@@ -852,3 +852,15 @@ class _UserSinkComponent(_UserComponent, _SinkComponent):
         )
         assert self_port_ptr
         return bt2.port._UserComponentInputPort._create_from_ptr(self_port_ptr)
+
+    def _create_input_port_message_iterator(self, input_port):
+        utils._check_type(input_port, bt2.port._UserComponentInputPort)
+
+        msg_iter_ptr = native_bt.self_component_port_input_message_iterator_create_from_sink_component(
+            self._bt_ptr, input_port._ptr
+        )
+
+        if msg_iter_ptr is None:
+            raise bt2.CreationError('cannot create message iterator object')
+
+        return bt2.message_iterator._UserComponentInputPortMessageIterator(msg_iter_ptr)
index 93595693e6f57962b40ccc6415bdd6a8e967ba12..07c707dba31c013182c0f9ffc3177ebc72b12108 100644 (file)
@@ -24,6 +24,7 @@ from bt2 import native_bt, object, utils
 import bt2.message
 import collections.abc
 import bt2.component
+import bt2.port
 import bt2
 
 
@@ -178,6 +179,18 @@ class _UserMessageIterator(_MessageIterator):
     def _bt_seek_beginning_from_native(self):
         self._seek_beginning()
 
+    def _create_input_port_message_iterator(self, input_port):
+        utils._check_type(input_port, bt2.port._UserComponentInputPort)
+
+        msg_iter_ptr = native_bt.self_component_port_input_message_iterator_create_from_message_iterator(
+            self._bt_ptr, input_port._ptr
+        )
+
+        if msg_iter_ptr is None:
+            raise bt2.CreationError('cannot create message iterator object')
+
+        return _UserComponentInputPortMessageIterator(msg_iter_ptr)
+
     def _create_event_message(
         self, event_class, parent=None, default_clock_snapshot=None
     ):
index d0d65d2327d59000b57525b9a98592efa8928b59..201e2778adc9daf83571165b6355d9b043639162 100644 (file)
@@ -114,15 +114,6 @@ class _UserComponentInputPort(_UserComponentPort, _InputPort):
         native_bt.self_component_port_input_as_self_component_port
     )
 
-    def create_message_iterator(self):
-        msg_iter_ptr = native_bt.self_component_port_input_message_iterator_create(
-            self._ptr
-        )
-        if msg_iter_ptr is None:
-            raise bt2._MemoryError('cannot create message iterator object')
-
-        return bt2.message_iterator._UserComponentInputPortMessageIterator(msg_iter_ptr)
-
 
 class _UserComponentOutputPort(_UserComponentPort, _OutputPort):
     _as_self_port_ptr = staticmethod(
index 771c626885931c54759354edbd02f46d7da58dd9..d1978c95eedc1b93d5e6a4b555686fad84b8f248 100644 (file)
@@ -110,8 +110,8 @@ colander_graph_is_configured(
        BT_ASSERT(colander_data);
        BT_OBJECT_PUT_REF_AND_RESET(colander_data->msg_iter);
        colander_data->msg_iter =
-               bt_self_component_port_input_message_iterator_create(
-                       self_port);
+               bt_self_component_port_input_message_iterator_create_from_sink_component(
+                       self_comp, self_port);
        if (!colander_data->msg_iter) {
                BT_LIB_LOGE_APPEND_CAUSE("Cannot create message iterator on "
                        "self component input port: %![port-]+p",
index a8fe3f15a95a8dc05a47938900aa5664fd22ca71..254e422f9889ddabcadc7b25afd957c49abb662a 100644 (file)
@@ -162,6 +162,16 @@ void bt_self_component_port_input_message_iterator_destroy(struct bt_object *obj
                iterator->auto_seek.msgs = NULL;
        }
 
+       if (iterator->upstream_msg_iters) {
+               /*
+                * At this point the message iterator is finalized, so
+                * it's detached from any upstream message iterator.
+                */
+               BT_ASSERT(iterator->upstream_msg_iters->len == 0);
+               g_ptr_array_free(iterator->upstream_msg_iters, TRUE);
+               iterator->upstream_msg_iters = NULL;
+       }
+
        destroy_base_message_iterator(obj);
 }
 
@@ -169,6 +179,7 @@ BT_HIDDEN
 void bt_self_component_port_input_message_iterator_try_finalize(
                struct bt_self_component_port_input_message_iterator *iterator)
 {
+       uint64_t i;
        typedef void (*method_t)(void *);
 
        struct bt_component_class *comp_class = NULL;
@@ -231,6 +242,27 @@ void bt_self_component_port_input_message_iterator_try_finalize(
                method(iterator);
        }
 
+       /* Detach upstream message iterators */
+       for (i = 0; i < iterator->upstream_msg_iters->len; i++) {
+               struct bt_self_component_port_input_message_iterator *upstream_msg_iter =
+                       iterator->upstream_msg_iters->pdata[i];
+
+               upstream_msg_iter->downstream_msg_iter = NULL;
+       }
+
+       g_ptr_array_set_size(iterator->upstream_msg_iters, 0);
+
+       /* Detach downstream message iterator */
+       if (iterator->downstream_msg_iter) {
+               gboolean existed;
+
+               BT_ASSERT(iterator->downstream_msg_iter->upstream_msg_iters);
+               existed = g_ptr_array_remove_fast(
+                       iterator->downstream_msg_iter->upstream_msg_iters,
+                       iterator);
+               BT_ASSERT(existed);
+       }
+
        iterator->upstream_component = NULL;
        iterator->upstream_port = NULL;
        set_self_comp_port_input_msg_iterator_state(iterator,
@@ -291,22 +323,51 @@ bt_bool can_seek_beginning_true(
 
 static
 struct bt_self_component_port_input_message_iterator *
-bt_self_component_port_input_message_iterator_create_initial(
-               struct bt_component *upstream_comp,
-               struct bt_port *upstream_port)
+create_self_component_input_port_message_iterator(
+               struct bt_self_message_iterator *self_downstream_msg_iter,
+               struct bt_self_component_port_input *self_port)
 {
+       typedef enum bt_component_class_message_iterator_init_method_status (*init_method_t)(
+                       void *, void *, void *);
+
        int ret;
-       struct bt_self_component_port_input_message_iterator *iterator = NULL;
+       init_method_t init_method = NULL;
+       struct bt_self_component_port_input_message_iterator *iterator =
+               NULL;
+       struct bt_self_component_port_input_message_iterator *downstream_msg_iter =
+               (void *) self_downstream_msg_iter;
+       struct bt_port *port = (void *) self_port;
+       struct bt_port *upstream_port;
+       struct bt_component *comp;
+       struct bt_component *upstream_comp;
+       struct bt_component_class *upstream_comp_cls;
 
-       BT_ASSERT(upstream_comp);
+       BT_ASSERT_PRE_NON_NULL(port, "Input port");
+       comp = bt_port_borrow_component_inline(port);
+       BT_ASSERT_PRE(bt_port_is_connected(port),
+               "Input port is not connected: %![port-]+p", port);
+       BT_ASSERT_PRE(comp, "Input port is not part of a component: %![port-]+p",
+               port);
+       BT_ASSERT_PRE(!bt_component_graph_is_canceled(comp),
+               "Input port's component's graph is canceled: "
+               "%![port-]+p, %![comp-]+c", port, comp);
+       BT_ASSERT(port->connection);
+       upstream_port = port->connection->upstream_port;
        BT_ASSERT(upstream_port);
-       BT_ASSERT(bt_port_is_connected(upstream_port));
-       BT_LIB_LOGI("Creating initial message iterator on self component input port: "
-               "%![up-comp-]+c, %![up-port-]+p", upstream_comp, upstream_port);
-       BT_ASSERT(bt_component_get_class_type(upstream_comp) ==
+       upstream_comp = bt_port_borrow_component_inline(upstream_port);
+       BT_ASSERT(upstream_comp);
+       BT_ASSERT_PRE(
+               bt_component_borrow_graph(upstream_comp)->config_state !=
+                       BT_GRAPH_CONFIGURATION_STATE_CONFIGURING,
+               "Graph is not configured: %!+g",
+               bt_component_borrow_graph(upstream_comp));
+       upstream_comp_cls = upstream_comp->class;
+       BT_ASSERT(upstream_comp->class->type ==
                BT_COMPONENT_CLASS_TYPE_SOURCE ||
-               bt_component_get_class_type(upstream_comp) ==
+               upstream_comp->class->type ==
                BT_COMPONENT_CLASS_TYPE_FILTER);
+       BT_LIB_LOGI("Creating message iterator on self component input port: "
+               "%![up-comp-]+c, %![up-port-]+p", upstream_comp, upstream_port);
        iterator = g_new0(
                struct bt_self_component_port_input_message_iterator, 1);
        if (!iterator) {
@@ -325,12 +386,16 @@ bt_self_component_port_input_message_iterator_create_initial(
        }
 
        iterator->last_ns_from_origin = INT64_MIN;
-
        iterator->auto_seek.msgs = g_queue_new();
        if (!iterator->auto_seek.msgs) {
                BT_LIB_LOGE_APPEND_CAUSE("Failed to allocate a GQueue.");
-               ret = -1;
-               goto end;
+               goto error;
+       }
+
+       iterator->upstream_msg_iters = g_ptr_array_new();
+       if (!iterator->upstream_msg_iters) {
+               BT_LIB_LOGE_APPEND_CAUSE("Failed to allocate a GPtrArray.");
+               goto error;
        }
 
        iterator->upstream_component = upstream_comp;
@@ -403,66 +468,6 @@ bt_self_component_port_input_message_iterator_create_initial(
                                can_seek_beginning_true;
        }
 
-       BT_LIB_LOGI("Created initial message iterator on self component input port: "
-               "%![up-port-]+p, %![up-comp-]+c, %![iter-]+i",
-               upstream_port, upstream_comp, iterator);
-       goto end;
-
-error:
-       BT_OBJECT_PUT_REF_AND_RESET(iterator);
-
-end:
-       return iterator;
-}
-
-struct bt_self_component_port_input_message_iterator *
-bt_self_component_port_input_message_iterator_create(
-               struct bt_self_component_port_input *self_port)
-{
-       typedef enum bt_component_class_message_iterator_init_method_status (*init_method_t)(
-                       void *, void *, void *);
-
-       init_method_t init_method = NULL;
-       struct bt_self_component_port_input_message_iterator *iterator =
-               NULL;
-       struct bt_port *port = (void *) self_port;
-       struct bt_port *upstream_port;
-       struct bt_component *comp;
-       struct bt_component *upstream_comp;
-       struct bt_component_class *upstream_comp_cls;
-
-       BT_ASSERT_PRE_NON_NULL(port, "Port");
-       comp = bt_port_borrow_component_inline(port);
-       BT_ASSERT_PRE(bt_port_is_connected(port),
-               "Port is not connected: %![port-]+p", port);
-       BT_ASSERT_PRE(comp, "Port is not part of a component: %![port-]+p",
-               port);
-       BT_ASSERT_PRE(!bt_component_graph_is_canceled(comp),
-               "Port's component's graph is canceled: "
-               "%![port-]+p, %![comp-]+c", port, comp);
-       BT_ASSERT(port->connection);
-       upstream_port = port->connection->upstream_port;
-       BT_ASSERT(upstream_port);
-       upstream_comp = bt_port_borrow_component_inline(upstream_port);
-       BT_ASSERT(upstream_comp);
-       BT_ASSERT_PRE(
-               bt_component_borrow_graph(upstream_comp)->config_state !=
-                       BT_GRAPH_CONFIGURATION_STATE_CONFIGURING,
-               "Graph is not configured: %!+g",
-               bt_component_borrow_graph(upstream_comp));
-       upstream_comp_cls = upstream_comp->class;
-       BT_ASSERT(upstream_comp->class->type ==
-               BT_COMPONENT_CLASS_TYPE_SOURCE ||
-               upstream_comp->class->type ==
-               BT_COMPONENT_CLASS_TYPE_FILTER);
-       iterator = bt_self_component_port_input_message_iterator_create_initial(
-               upstream_comp, upstream_port);
-       if (!iterator) {
-               BT_LIB_LOGE_APPEND_CAUSE(
-                       "Cannot create self component input port message iterator.");
-               goto error;
-       }
-
        switch (upstream_comp_cls->type) {
        case BT_COMPONENT_CLASS_TYPE_SOURCE:
        {
@@ -505,6 +510,18 @@ bt_self_component_port_input_message_iterator_create(
                }
        }
 
+       if (downstream_msg_iter) {
+               /* Set this message iterator's downstream message iterator */
+               iterator->downstream_msg_iter = downstream_msg_iter;
+
+               /*
+                * Add this message iterator to the downstream message
+                * iterator's array of upstream message iterators.
+                */
+               g_ptr_array_add(downstream_msg_iter->upstream_msg_iters,
+                       iterator);
+       }
+
        set_self_comp_port_input_msg_iterator_state(iterator,
                BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE);
        g_ptr_array_add(port->connection->iterators, iterator);
@@ -520,6 +537,26 @@ end:
        return iterator;
 }
 
+struct bt_self_component_port_input_message_iterator *
+bt_self_component_port_input_message_iterator_create_from_message_iterator(
+               struct bt_self_message_iterator *self_msg_iter,
+               struct bt_self_component_port_input *input_port)
+{
+       BT_ASSERT_PRE_NON_NULL(self_msg_iter, "Message iterator");
+       return create_self_component_input_port_message_iterator(self_msg_iter,
+               input_port);
+}
+
+struct bt_self_component_port_input_message_iterator *
+bt_self_component_port_input_message_iterator_create_from_sink_component(
+               struct bt_self_component_sink *self_comp,
+               struct bt_self_component_port_input *input_port)
+{
+       BT_ASSERT_PRE_NON_NULL(self_comp, "Sink component");
+       return create_self_component_input_port_message_iterator(NULL,
+               input_port);
+}
+
 void *bt_self_message_iterator_get_data(
                const struct bt_self_message_iterator *self_iterator)
 {
index b8a9dfa2f65e090cb0cd7840360d66e4ba261ffe..93d9aa23e122f81d8d0eef56c18bd07b7a3dc389 100644 (file)
@@ -103,6 +103,27 @@ struct bt_self_component_port_input_message_iterator {
        struct bt_connection *connection; /* Weak */
        struct bt_graph *graph; /* Weak */
 
+       /*
+        * Array of
+        * `struct bt_self_component_port_input_message_iterator *`
+        * (weak).
+        *
+        * This is an array of upstream message iterators on which this
+        * iterator depends. The references are weak: an upstream
+        * message iterator is responsible for removing its entry within
+        * this array on finalization/destruction.
+        */
+       GPtrArray *upstream_msg_iters;
+
+       /*
+        * Downstream message iterator which depends on this message
+        * iterator (weak).
+        *
+        * This can be `NULL` if this message iterator's owner is a sink
+        * component.
+        */
+       struct bt_self_component_port_input_message_iterator *downstream_msg_iter;
+
        struct {
                bt_self_component_port_input_message_iterator_next_method next;
                bt_self_component_port_input_message_iterator_seek_ns_from_origin_method seek_ns_from_origin;
index 239e7ad10ed3ffc087ade96dd152218b35a9269e..172e5b4f1f933be394e46c2ae8853ea435eb6c2d 100644 (file)
@@ -1104,7 +1104,8 @@ bt_component_class_sink_graph_is_configured_method_status ctf_fs_sink_graph_is_c
                        bt_self_component_sink_as_self_component(self_comp));
 
        fs_sink->upstream_iter =
-               bt_self_component_port_input_message_iterator_create(
+               bt_self_component_port_input_message_iterator_create_from_sink_component(
+                       self_comp,
                        bt_self_component_sink_borrow_input_port_by_name(
                                self_comp, in_port_name));
        if (!fs_sink->upstream_iter) {
index 10e5a7de6f710698009bb94925c0f4c75ef38d6c..fd8e1d2670b7235bb8f19b84002096ebc3a0937f 100644 (file)
@@ -1990,8 +1990,8 @@ bt_component_class_message_iterator_init_method_status debug_info_msg_iter_init(
        debug_info_msg_iter->self_comp = self_comp;
 
        /* Create an iterator on the upstream component. */
-       upstream_iterator = bt_self_component_port_input_message_iterator_create(
-               input_port);
+       upstream_iterator = bt_self_component_port_input_message_iterator_create_from_message_iterator(
+               self_msg_iter, input_port);
        if (!upstream_iterator) {
                status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_MEMORY_ERROR;
                goto error;
index 47a29326313fe5b1aba827dbc07f654bb873b3a6..34410187cb384adf6234ff6c6ba69af5770280c9 100644 (file)
@@ -453,8 +453,8 @@ details_graph_is_configured(bt_self_component_sink *comp)
                goto end;
        }
 
-       iterator = bt_self_component_port_input_message_iterator_create(
-               bt_self_component_sink_borrow_input_port_by_name(comp,
+       iterator = bt_self_component_port_input_message_iterator_create_from_sink_component(
+               comp, bt_self_component_sink_borrow_input_port_by_name(comp,
                        in_port_name));
        if (!iterator) {
                status = BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_MEMORY_ERROR;
index 5c0aae8209ab8ab487dba183ccc217f515c94663..d0e1fbbc6e1c356252a159f5ff8cbcbb997c91ea 100644 (file)
@@ -162,8 +162,8 @@ pretty_graph_is_configured(bt_self_component_sink *comp)
                        bt_self_component_sink_as_self_component(comp));
        BT_ASSERT(pretty);
        BT_ASSERT(!pretty->iterator);
-       pretty->iterator = bt_self_component_port_input_message_iterator_create(
-               bt_self_component_sink_borrow_input_port_by_name(comp,
+       pretty->iterator = bt_self_component_port_input_message_iterator_create_from_sink_component(
+               comp, bt_self_component_sink_borrow_input_port_by_name(comp,
                        in_port_name));
        if (!pretty->iterator) {
                status = BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_ERROR;
index 0bbecd0e5800ecb4362cd65162d6302145c05894..f61ee043b0fb26ec99cc49c2d3b37af7a5d838ee 100644 (file)
@@ -222,8 +222,8 @@ counter_graph_is_configured(
        counter = bt_self_component_get_data(
                bt_self_component_sink_as_self_component(comp));
        BT_ASSERT(counter);
-       iterator = bt_self_component_port_input_message_iterator_create(
-               bt_self_component_sink_borrow_input_port_by_name(comp,
+       iterator = bt_self_component_port_input_message_iterator_create_from_sink_component(
+               comp, bt_self_component_sink_borrow_input_port_by_name(comp,
                        in_port_name));
        if (!iterator) {
                status = BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_MEMORY_ERROR;
index a5516cc639f14707e0647640db2c7b5d3e064a59..681db3b65cce0349b7bcb36583a5e735c2a1cf39 100644 (file)
@@ -99,8 +99,8 @@ bt_component_class_sink_graph_is_configured_method_status dummy_graph_is_configu
        dummy = bt_self_component_get_data(
                bt_self_component_sink_as_self_component(comp));
        BT_ASSERT(dummy);
-       iterator = bt_self_component_port_input_message_iterator_create(
-               bt_self_component_sink_borrow_input_port_by_name(comp,
+       iterator = bt_self_component_port_input_message_iterator_create_from_sink_component(
+               comp, bt_self_component_sink_borrow_input_port_by_name(comp,
                        in_port_name));
        if (!iterator) {
                status = BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_MEMORY_ERROR;
index 9510d07a5e45a865d33f7855fbdc02534f384268..31d374fc0fcf4b242e1001c75a727a0408323b64 100644 (file)
@@ -73,6 +73,9 @@ enum muxer_msg_iter_clock_class_expectation {
 struct muxer_msg_iter {
        struct muxer_comp *muxer_comp;
 
+       /* Weak */
+       bt_self_message_iterator *self_msg_iter;
+
        /*
         * Array of struct muxer_upstream_msg_iter * (owned by this).
         *
@@ -427,6 +430,7 @@ void muxer_finalize(bt_self_component_filter *self_comp)
 static
 bt_self_component_port_input_message_iterator *
 create_msg_iter_on_input_port(struct muxer_comp *muxer_comp,
+               struct muxer_msg_iter *muxer_msg_iter,
                bt_self_component_port_input *self_port)
 {
        const bt_port *port = bt_self_component_port_as_port(
@@ -441,8 +445,8 @@ create_msg_iter_on_input_port(struct muxer_comp *muxer_comp,
        // TODO: Advance the iterator to >= the time of the latest
        //       returned message by the muxer message
        //       iterator which creates it.
-       msg_iter = bt_self_component_port_input_message_iterator_create(
-               self_port);
+       msg_iter = bt_self_component_port_input_message_iterator_create_from_message_iterator(
+               muxer_msg_iter->self_msg_iter, self_port);
        if (!msg_iter) {
                BT_COMP_LOGE("Cannot create upstream message iterator on input port: "
                        "port-addr=%p, port-name=\"%s\"",
@@ -1231,7 +1235,7 @@ int muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp,
                }
 
                upstream_msg_iter = create_msg_iter_on_input_port(muxer_comp,
-                       self_port);
+                       muxer_msg_iter, self_port);
                if (!upstream_msg_iter) {
                        /* create_msg_iter_on_input_port() logs errors */
                        BT_ASSERT(!upstream_msg_iter);
@@ -1292,6 +1296,7 @@ bt_component_class_message_iterator_init_method_status muxer_msg_iter_init(
        }
 
        muxer_msg_iter->muxer_comp = muxer_comp;
+       muxer_msg_iter->self_msg_iter = self_msg_iter;
        muxer_msg_iter->last_returned_ts_ns = INT64_MIN;
        muxer_msg_iter->active_muxer_upstream_msg_iters =
                g_ptr_array_new_with_free_func(
index 7cf741cf1ae0f0563b9f56f279a5e13b5156d83a..5c7ec431cc9ec64ff78157d477682173b7eb0fd3 100644 (file)
@@ -685,7 +685,8 @@ bt_component_class_message_iterator_init_method_status trimmer_msg_iter_init(
        trimmer_it->begin = trimmer_it->trimmer_comp->begin;
        trimmer_it->end = trimmer_it->trimmer_comp->end;
        trimmer_it->upstream_iter =
-               bt_self_component_port_input_message_iterator_create(
+               bt_self_component_port_input_message_iterator_create_from_message_iterator(
+                       self_msg_iter,
                        bt_self_component_filter_borrow_input_port_by_name(
                                self_comp, in_port_name));
        if (!trimmer_it->upstream_iter) {
index 2f9bebce48aad2d6f67b5010c0f11067209f6964..8caef1034d6e511841c10340540851e2fe5b88b4 100644 (file)
@@ -45,7 +45,7 @@ class WorkingSink(bt2._UserSinkComponent):
         self._in = self._add_input_port('in')
 
     def _graph_is_configured(self):
-        self._iter = self._in.create_message_iterator()
+        self._iter = self._create_input_port_message_iterator(self._in)
 
     def _consume(self):
         next(self._iter)
@@ -56,7 +56,7 @@ class SinkWithExceptionChaining(bt2._UserSinkComponent):
         self._in = self._add_input_port('in')
 
     def _graph_is_configured(self):
-        self._iter = self._in.create_message_iterator()
+        self._iter = self._create_input_port_message_iterator(self._in)
 
     def _consume(self):
         try:
index f6f5dcaae9eab4a5bc70d06e8f1fcb873d19a1f2..4f6aade1febc787f8544e736beac3edd5cb55647 100644 (file)
@@ -224,7 +224,9 @@ class GraphTestCase(unittest.TestCase):
                 return next(self._msg_iter)
 
             def _graph_is_configured(self):
-                self._msg_iter = self._input_ports['in'].create_message_iterator()
+                self._msg_iter = self._create_input_port_message_iterator(
+                    self._input_ports['in']
+                )
 
         graph = bt2.Graph()
         up = graph.add_component(MySource, 'down')
@@ -280,7 +282,9 @@ class GraphTestCase(unittest.TestCase):
                 comp_self._at += 1
 
             def _graph_is_configured(self):
-                self._msg_iter = self._input_port.create_message_iterator()
+                self._msg_iter = self._create_input_port_message_iterator(
+                    self._input_port
+                )
 
         src = self._graph.add_component(MySource, 'src')
         sink = self._graph.add_component(MySink, 'sink')
@@ -329,7 +333,9 @@ class GraphTestCase(unittest.TestCase):
                 comp_self._at += 1
 
             def _graph_is_configured(self):
-                self._msg_iter = self._input_port.create_message_iterator()
+                self._msg_iter = self._create_input_port_message_iterator(
+                    self._input_port
+                )
 
         src = self._graph.add_component(MySource, 'src')
         sink = self._graph.add_component(MySink, 'sink')
@@ -386,7 +392,9 @@ class GraphTestCase(unittest.TestCase):
                 comp_self._at += 1
 
             def _graph_is_configured(self):
-                self._msg_iter = self._input_port.create_message_iterator()
+                self._msg_iter = self._create_input_port_message_iterator(
+                    self._input_port
+                )
 
         src = self._graph.add_component(MySource, 'src')
         sink = self._graph.add_component(MySink, 'sink')
index 0f9182f26b8ea9fb95e4758c8395f64c6913fcb3..f780cf2c75a9a3052c00c92903253d717fabff37 100644 (file)
@@ -25,7 +25,7 @@ import bt2
 
 class UserMessageIteratorTestCase(unittest.TestCase):
     @staticmethod
-    def _create_graph(src_comp_cls):
+    def _create_graph(src_comp_cls, flt_comp_cls=None):
         class MySink(bt2._UserSinkComponent):
             def __init__(self, params):
                 self._add_input_port('in')
@@ -34,12 +34,28 @@ class UserMessageIteratorTestCase(unittest.TestCase):
                 next(self._msg_iter)
 
             def _graph_is_configured(self):
-                self._msg_iter = self._input_ports['in'].create_message_iterator()
+                self._msg_iter = self._create_input_port_message_iterator(
+                    self._input_ports['in']
+                )
 
         graph = bt2.Graph()
         src_comp = graph.add_component(src_comp_cls, 'src')
+
+        if flt_comp_cls is not None:
+            flt_comp = graph.add_component(flt_comp_cls, 'flt')
+
         sink_comp = graph.add_component(MySink, 'sink')
-        graph.connect_ports(src_comp.output_ports['out'], sink_comp.input_ports['in'])
+
+        if flt_comp_cls is not None:
+            assert flt_comp is not None
+            graph.connect_ports(
+                src_comp.output_ports['out'], flt_comp.input_ports['in']
+            )
+            out_port = flt_comp.output_ports['out']
+        else:
+            out_port = src_comp.output_ports['out']
+
+        graph.connect_ports(out_port, sink_comp.input_ports['in'])
         return graph
 
     def test_init(self):
@@ -67,6 +83,39 @@ class UserMessageIteratorTestCase(unittest.TestCase):
         )
         self.assertEqual(the_output_port_from_iter.user_data, 'user data')
 
+    def test_create_from_message_iterator(self):
+        class MySourceIter(bt2._UserMessageIterator):
+            def __init__(self, self_port_output):
+                nonlocal src_iter_initialized
+                src_iter_initialized = True
+
+        class MySource(bt2._UserSourceComponent, message_iterator_class=MySourceIter):
+            def __init__(self, params):
+                self._add_output_port('out')
+
+        class MyFilterIter(bt2._UserMessageIterator):
+            def __init__(self, self_port_output):
+                nonlocal flt_iter_initialized
+                flt_iter_initialized = True
+                self._up_iter = self._create_input_port_message_iterator(
+                    self._component._input_ports['in']
+                )
+
+            def __next__(self):
+                return next(self._up_iter)
+
+        class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
+            def __init__(self, params):
+                self._add_input_port('in')
+                self._add_output_port('out')
+
+        src_iter_initialized = False
+        flt_iter_initialized = False
+        graph = self._create_graph(MySource, MyFilter)
+        graph.run()
+        self.assertTrue(src_iter_initialized)
+        self.assertTrue(flt_iter_initialized)
+
     def test_finalize(self):
         class MyIter(bt2._UserMessageIterator):
             def _finalize(self):
@@ -208,7 +257,9 @@ class UserMessageIteratorTestCase(unittest.TestCase):
         class MyFilterIter(bt2._UserMessageIterator):
             def __init__(self, port):
                 input_port = port.user_data
-                self._upstream_iter = input_port.create_message_iterator()
+                self._upstream_iter = self._create_input_port_message_iterator(
+                    input_port
+                )
 
             def __next__(self):
                 return next(self._upstream_iter)
This page took 0.03933 seconds and 4 git commands to generate.