From: Philippe Proulx Date: Sat, 20 Jul 2019 21:16:24 +0000 (-0400) Subject: lib: create input port msg iterator from self {msg iterator, sink comp.} X-Git-Url: https://git.efficios.com/?a=commitdiff_plain;h=ca02df0ad8ae9a1a3640956d91ca31059d0b203a;p=babeltrace.git lib: create input port msg iterator from self {msg iterator, sink comp.} This patch makes an input port message iterator have a link to the upstream message iterators it needs and vice versa. The motivation behind this is to eventually be able to transmit some state or property automatically to all the upstream message iterators, recursively, of a given message iterator without any special user code. For example, consider this graph of message iterators (arrow means "is an upstream message iterator of"): A <──┐ ├──┬── C <──┬── F B <──┘ │ │ │ E <──┘ D <─────┘ G <───── H Here, setting the state/property on F would also set it on A, B, D, C, and E. Setting it on C would set it on A, B, and D. Setting it on H would only set it on G. The message iterator graph, which can be considered like another, "dynamic" graph on top of the static component graph used only to limit a message iterator graph's topology, is always directed and acyclic, where each node has a single parent. In other words, it is not permitted that two message iterators use the same downstream message iterator, for example: A <──┐ ├───── C <───── F B <──┘ │ │ │ │ D <─────────┴────────┘ (D has both C and F as parents). This is so that each message iterator and its upstream message iterators constitute a single, private state. This patch replaces bt_self_component_port_input_message_iterator_create() with bt_self_component_port_input_message_iterator_create_from_message_iterator() and bt_self_component_port_input_message_iterator_create_from_sink_component(). bt_self_component_port_input_message_iterator_create_from_sink_component() does nothing with the self sink component parameter (except checking that it's not `NULL`), but it's needed to make the functions type safe. Internally, an input port message iterator contains both a weak pointer to its downstream message iterator and an array of weak pointers to its upstream message iterators. When you create a message iterator with bt_self_component_port_input_message_iterator_create_from_message_iterator(), the function sets the new message iterator's downstream message iterator to the provided self message iterator. It also adds the new message iterator to the self message iterator's array of upstream message iterators. On message iterator finalization: * The message iterator removes itself as the downstream message iterator of all its upstream message iterators. It also clears the upstream message iterator array. * The message iterator removes itself as one of the upstream message iterators of its downstream message iterator. I didn't find any race condition regarding the new message iterator links, but I could be wrong. The future and more tests will tell us. In the `bt2` Python package: * _UserComponentInputPort.create_message_iterator() is removed. * _UserMessageIterator._create_input_port_message_iterator() is added (accepts a user component input port). * _UserSinkComponent._create_input_port_message_iterator() is added (accepts a user component input port). Tests are updated accordingly. I added a test for _UserMessageIterator._create_input_port_message_iterator() specifically (needs a filter message iterator) because we use _UserSinkComponent._create_input_port_message_iterator() in the current tests. Signed-off-by: Philippe Proulx Change-Id: I46cefca445353c453fbd9404b97687b81fa5f443 Reviewed-on: https://review.lttng.org/c/babeltrace/+/1732 --- diff --git a/include/babeltrace2/graph/self-component-port-input-message-iterator.h b/include/babeltrace2/graph/self-component-port-input-message-iterator.h index 67d6d832..d4ad8b59 100644 --- a/include/babeltrace2/graph/self-component-port-input-message-iterator.h +++ b/include/babeltrace2/graph/self-component-port-input-message-iterator.h @@ -45,7 +45,13 @@ bt_self_component_port_input_message_iterator_as_message_iterator( } extern bt_self_component_port_input_message_iterator * -bt_self_component_port_input_message_iterator_create( +bt_self_component_port_input_message_iterator_create_from_message_iterator( + bt_self_message_iterator *self_msg_iter, + bt_self_component_port_input *input_port); + +extern bt_self_component_port_input_message_iterator * +bt_self_component_port_input_message_iterator_create_from_sink_component( + bt_self_component_sink *self_comp, bt_self_component_port_input *input_port); extern bt_component * diff --git a/src/bindings/python/bt2/bt2/component.py b/src/bindings/python/bt2/bt2/component.py index 6d6afc4b..10b11663 100644 --- a/src/bindings/python/bt2/bt2/component.py +++ b/src/bindings/python/bt2/bt2/component.py @@ -852,3 +852,15 @@ class _UserSinkComponent(_UserComponent, _SinkComponent): ) assert self_port_ptr return bt2.port._UserComponentInputPort._create_from_ptr(self_port_ptr) + + def _create_input_port_message_iterator(self, input_port): + utils._check_type(input_port, bt2.port._UserComponentInputPort) + + msg_iter_ptr = native_bt.self_component_port_input_message_iterator_create_from_sink_component( + self._bt_ptr, input_port._ptr + ) + + if msg_iter_ptr is None: + raise bt2.CreationError('cannot create message iterator object') + + return bt2.message_iterator._UserComponentInputPortMessageIterator(msg_iter_ptr) diff --git a/src/bindings/python/bt2/bt2/message_iterator.py b/src/bindings/python/bt2/bt2/message_iterator.py index 93595693..07c707db 100644 --- a/src/bindings/python/bt2/bt2/message_iterator.py +++ b/src/bindings/python/bt2/bt2/message_iterator.py @@ -24,6 +24,7 @@ from bt2 import native_bt, object, utils import bt2.message import collections.abc import bt2.component +import bt2.port import bt2 @@ -178,6 +179,18 @@ class _UserMessageIterator(_MessageIterator): def _bt_seek_beginning_from_native(self): self._seek_beginning() + def _create_input_port_message_iterator(self, input_port): + utils._check_type(input_port, bt2.port._UserComponentInputPort) + + msg_iter_ptr = native_bt.self_component_port_input_message_iterator_create_from_message_iterator( + self._bt_ptr, input_port._ptr + ) + + if msg_iter_ptr is None: + raise bt2.CreationError('cannot create message iterator object') + + return _UserComponentInputPortMessageIterator(msg_iter_ptr) + def _create_event_message( self, event_class, parent=None, default_clock_snapshot=None ): diff --git a/src/bindings/python/bt2/bt2/port.py b/src/bindings/python/bt2/bt2/port.py index d0d65d23..201e2778 100644 --- a/src/bindings/python/bt2/bt2/port.py +++ b/src/bindings/python/bt2/bt2/port.py @@ -114,15 +114,6 @@ class _UserComponentInputPort(_UserComponentPort, _InputPort): native_bt.self_component_port_input_as_self_component_port ) - def create_message_iterator(self): - msg_iter_ptr = native_bt.self_component_port_input_message_iterator_create( - self._ptr - ) - if msg_iter_ptr is None: - raise bt2._MemoryError('cannot create message iterator object') - - return bt2.message_iterator._UserComponentInputPortMessageIterator(msg_iter_ptr) - class _UserComponentOutputPort(_UserComponentPort, _OutputPort): _as_self_port_ptr = staticmethod( diff --git a/src/lib/graph/component-class-sink-colander.c b/src/lib/graph/component-class-sink-colander.c index 771c6268..d1978c95 100644 --- a/src/lib/graph/component-class-sink-colander.c +++ b/src/lib/graph/component-class-sink-colander.c @@ -110,8 +110,8 @@ colander_graph_is_configured( BT_ASSERT(colander_data); BT_OBJECT_PUT_REF_AND_RESET(colander_data->msg_iter); colander_data->msg_iter = - bt_self_component_port_input_message_iterator_create( - self_port); + bt_self_component_port_input_message_iterator_create_from_sink_component( + self_comp, self_port); if (!colander_data->msg_iter) { BT_LIB_LOGE_APPEND_CAUSE("Cannot create message iterator on " "self component input port: %![port-]+p", diff --git a/src/lib/graph/iterator.c b/src/lib/graph/iterator.c index a8fe3f15..254e422f 100644 --- a/src/lib/graph/iterator.c +++ b/src/lib/graph/iterator.c @@ -162,6 +162,16 @@ void bt_self_component_port_input_message_iterator_destroy(struct bt_object *obj iterator->auto_seek.msgs = NULL; } + if (iterator->upstream_msg_iters) { + /* + * At this point the message iterator is finalized, so + * it's detached from any upstream message iterator. + */ + BT_ASSERT(iterator->upstream_msg_iters->len == 0); + g_ptr_array_free(iterator->upstream_msg_iters, TRUE); + iterator->upstream_msg_iters = NULL; + } + destroy_base_message_iterator(obj); } @@ -169,6 +179,7 @@ BT_HIDDEN void bt_self_component_port_input_message_iterator_try_finalize( struct bt_self_component_port_input_message_iterator *iterator) { + uint64_t i; typedef void (*method_t)(void *); struct bt_component_class *comp_class = NULL; @@ -231,6 +242,27 @@ void bt_self_component_port_input_message_iterator_try_finalize( method(iterator); } + /* Detach upstream message iterators */ + for (i = 0; i < iterator->upstream_msg_iters->len; i++) { + struct bt_self_component_port_input_message_iterator *upstream_msg_iter = + iterator->upstream_msg_iters->pdata[i]; + + upstream_msg_iter->downstream_msg_iter = NULL; + } + + g_ptr_array_set_size(iterator->upstream_msg_iters, 0); + + /* Detach downstream message iterator */ + if (iterator->downstream_msg_iter) { + gboolean existed; + + BT_ASSERT(iterator->downstream_msg_iter->upstream_msg_iters); + existed = g_ptr_array_remove_fast( + iterator->downstream_msg_iter->upstream_msg_iters, + iterator); + BT_ASSERT(existed); + } + iterator->upstream_component = NULL; iterator->upstream_port = NULL; set_self_comp_port_input_msg_iterator_state(iterator, @@ -291,22 +323,51 @@ bt_bool can_seek_beginning_true( static struct bt_self_component_port_input_message_iterator * -bt_self_component_port_input_message_iterator_create_initial( - struct bt_component *upstream_comp, - struct bt_port *upstream_port) +create_self_component_input_port_message_iterator( + struct bt_self_message_iterator *self_downstream_msg_iter, + struct bt_self_component_port_input *self_port) { + typedef enum bt_component_class_message_iterator_init_method_status (*init_method_t)( + void *, void *, void *); + int ret; - struct bt_self_component_port_input_message_iterator *iterator = NULL; + init_method_t init_method = NULL; + struct bt_self_component_port_input_message_iterator *iterator = + NULL; + struct bt_self_component_port_input_message_iterator *downstream_msg_iter = + (void *) self_downstream_msg_iter; + struct bt_port *port = (void *) self_port; + struct bt_port *upstream_port; + struct bt_component *comp; + struct bt_component *upstream_comp; + struct bt_component_class *upstream_comp_cls; - BT_ASSERT(upstream_comp); + BT_ASSERT_PRE_NON_NULL(port, "Input port"); + comp = bt_port_borrow_component_inline(port); + BT_ASSERT_PRE(bt_port_is_connected(port), + "Input port is not connected: %![port-]+p", port); + BT_ASSERT_PRE(comp, "Input port is not part of a component: %![port-]+p", + port); + BT_ASSERT_PRE(!bt_component_graph_is_canceled(comp), + "Input port's component's graph is canceled: " + "%![port-]+p, %![comp-]+c", port, comp); + BT_ASSERT(port->connection); + upstream_port = port->connection->upstream_port; BT_ASSERT(upstream_port); - BT_ASSERT(bt_port_is_connected(upstream_port)); - BT_LIB_LOGI("Creating initial message iterator on self component input port: " - "%![up-comp-]+c, %![up-port-]+p", upstream_comp, upstream_port); - BT_ASSERT(bt_component_get_class_type(upstream_comp) == + upstream_comp = bt_port_borrow_component_inline(upstream_port); + BT_ASSERT(upstream_comp); + BT_ASSERT_PRE( + bt_component_borrow_graph(upstream_comp)->config_state != + BT_GRAPH_CONFIGURATION_STATE_CONFIGURING, + "Graph is not configured: %!+g", + bt_component_borrow_graph(upstream_comp)); + upstream_comp_cls = upstream_comp->class; + BT_ASSERT(upstream_comp->class->type == BT_COMPONENT_CLASS_TYPE_SOURCE || - bt_component_get_class_type(upstream_comp) == + upstream_comp->class->type == BT_COMPONENT_CLASS_TYPE_FILTER); + BT_LIB_LOGI("Creating message iterator on self component input port: " + "%![up-comp-]+c, %![up-port-]+p", upstream_comp, upstream_port); iterator = g_new0( struct bt_self_component_port_input_message_iterator, 1); if (!iterator) { @@ -325,12 +386,16 @@ bt_self_component_port_input_message_iterator_create_initial( } iterator->last_ns_from_origin = INT64_MIN; - iterator->auto_seek.msgs = g_queue_new(); if (!iterator->auto_seek.msgs) { BT_LIB_LOGE_APPEND_CAUSE("Failed to allocate a GQueue."); - ret = -1; - goto end; + goto error; + } + + iterator->upstream_msg_iters = g_ptr_array_new(); + if (!iterator->upstream_msg_iters) { + BT_LIB_LOGE_APPEND_CAUSE("Failed to allocate a GPtrArray."); + goto error; } iterator->upstream_component = upstream_comp; @@ -403,66 +468,6 @@ bt_self_component_port_input_message_iterator_create_initial( can_seek_beginning_true; } - BT_LIB_LOGI("Created initial message iterator on self component input port: " - "%![up-port-]+p, %![up-comp-]+c, %![iter-]+i", - upstream_port, upstream_comp, iterator); - goto end; - -error: - BT_OBJECT_PUT_REF_AND_RESET(iterator); - -end: - return iterator; -} - -struct bt_self_component_port_input_message_iterator * -bt_self_component_port_input_message_iterator_create( - struct bt_self_component_port_input *self_port) -{ - typedef enum bt_component_class_message_iterator_init_method_status (*init_method_t)( - void *, void *, void *); - - init_method_t init_method = NULL; - struct bt_self_component_port_input_message_iterator *iterator = - NULL; - struct bt_port *port = (void *) self_port; - struct bt_port *upstream_port; - struct bt_component *comp; - struct bt_component *upstream_comp; - struct bt_component_class *upstream_comp_cls; - - BT_ASSERT_PRE_NON_NULL(port, "Port"); - comp = bt_port_borrow_component_inline(port); - BT_ASSERT_PRE(bt_port_is_connected(port), - "Port is not connected: %![port-]+p", port); - BT_ASSERT_PRE(comp, "Port is not part of a component: %![port-]+p", - port); - BT_ASSERT_PRE(!bt_component_graph_is_canceled(comp), - "Port's component's graph is canceled: " - "%![port-]+p, %![comp-]+c", port, comp); - BT_ASSERT(port->connection); - upstream_port = port->connection->upstream_port; - BT_ASSERT(upstream_port); - upstream_comp = bt_port_borrow_component_inline(upstream_port); - BT_ASSERT(upstream_comp); - BT_ASSERT_PRE( - bt_component_borrow_graph(upstream_comp)->config_state != - BT_GRAPH_CONFIGURATION_STATE_CONFIGURING, - "Graph is not configured: %!+g", - bt_component_borrow_graph(upstream_comp)); - upstream_comp_cls = upstream_comp->class; - BT_ASSERT(upstream_comp->class->type == - BT_COMPONENT_CLASS_TYPE_SOURCE || - upstream_comp->class->type == - BT_COMPONENT_CLASS_TYPE_FILTER); - iterator = bt_self_component_port_input_message_iterator_create_initial( - upstream_comp, upstream_port); - if (!iterator) { - BT_LIB_LOGE_APPEND_CAUSE( - "Cannot create self component input port message iterator."); - goto error; - } - switch (upstream_comp_cls->type) { case BT_COMPONENT_CLASS_TYPE_SOURCE: { @@ -505,6 +510,18 @@ bt_self_component_port_input_message_iterator_create( } } + if (downstream_msg_iter) { + /* Set this message iterator's downstream message iterator */ + iterator->downstream_msg_iter = downstream_msg_iter; + + /* + * Add this message iterator to the downstream message + * iterator's array of upstream message iterators. + */ + g_ptr_array_add(downstream_msg_iter->upstream_msg_iters, + iterator); + } + set_self_comp_port_input_msg_iterator_state(iterator, BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE); g_ptr_array_add(port->connection->iterators, iterator); @@ -520,6 +537,26 @@ end: return iterator; } +struct bt_self_component_port_input_message_iterator * +bt_self_component_port_input_message_iterator_create_from_message_iterator( + struct bt_self_message_iterator *self_msg_iter, + struct bt_self_component_port_input *input_port) +{ + BT_ASSERT_PRE_NON_NULL(self_msg_iter, "Message iterator"); + return create_self_component_input_port_message_iterator(self_msg_iter, + input_port); +} + +struct bt_self_component_port_input_message_iterator * +bt_self_component_port_input_message_iterator_create_from_sink_component( + struct bt_self_component_sink *self_comp, + struct bt_self_component_port_input *input_port) +{ + BT_ASSERT_PRE_NON_NULL(self_comp, "Sink component"); + return create_self_component_input_port_message_iterator(NULL, + input_port); +} + void *bt_self_message_iterator_get_data( const struct bt_self_message_iterator *self_iterator) { diff --git a/src/lib/graph/message/iterator.h b/src/lib/graph/message/iterator.h index b8a9dfa2..93d9aa23 100644 --- a/src/lib/graph/message/iterator.h +++ b/src/lib/graph/message/iterator.h @@ -103,6 +103,27 @@ struct bt_self_component_port_input_message_iterator { struct bt_connection *connection; /* Weak */ struct bt_graph *graph; /* Weak */ + /* + * Array of + * `struct bt_self_component_port_input_message_iterator *` + * (weak). + * + * This is an array of upstream message iterators on which this + * iterator depends. The references are weak: an upstream + * message iterator is responsible for removing its entry within + * this array on finalization/destruction. + */ + GPtrArray *upstream_msg_iters; + + /* + * Downstream message iterator which depends on this message + * iterator (weak). + * + * This can be `NULL` if this message iterator's owner is a sink + * component. + */ + struct bt_self_component_port_input_message_iterator *downstream_msg_iter; + struct { bt_self_component_port_input_message_iterator_next_method next; bt_self_component_port_input_message_iterator_seek_ns_from_origin_method seek_ns_from_origin; diff --git a/src/plugins/ctf/fs-sink/fs-sink.c b/src/plugins/ctf/fs-sink/fs-sink.c index 239e7ad1..172e5b4f 100644 --- a/src/plugins/ctf/fs-sink/fs-sink.c +++ b/src/plugins/ctf/fs-sink/fs-sink.c @@ -1104,7 +1104,8 @@ bt_component_class_sink_graph_is_configured_method_status ctf_fs_sink_graph_is_c bt_self_component_sink_as_self_component(self_comp)); fs_sink->upstream_iter = - bt_self_component_port_input_message_iterator_create( + bt_self_component_port_input_message_iterator_create_from_sink_component( + self_comp, bt_self_component_sink_borrow_input_port_by_name( self_comp, in_port_name)); if (!fs_sink->upstream_iter) { diff --git a/src/plugins/lttng-utils/debug-info/debug-info.c b/src/plugins/lttng-utils/debug-info/debug-info.c index 10e5a7de..fd8e1d26 100644 --- a/src/plugins/lttng-utils/debug-info/debug-info.c +++ b/src/plugins/lttng-utils/debug-info/debug-info.c @@ -1990,8 +1990,8 @@ bt_component_class_message_iterator_init_method_status debug_info_msg_iter_init( debug_info_msg_iter->self_comp = self_comp; /* Create an iterator on the upstream component. */ - upstream_iterator = bt_self_component_port_input_message_iterator_create( - input_port); + upstream_iterator = bt_self_component_port_input_message_iterator_create_from_message_iterator( + self_msg_iter, input_port); if (!upstream_iterator) { status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_MEMORY_ERROR; goto error; diff --git a/src/plugins/text/details/details.c b/src/plugins/text/details/details.c index 47a29326..34410187 100644 --- a/src/plugins/text/details/details.c +++ b/src/plugins/text/details/details.c @@ -453,8 +453,8 @@ details_graph_is_configured(bt_self_component_sink *comp) goto end; } - iterator = bt_self_component_port_input_message_iterator_create( - bt_self_component_sink_borrow_input_port_by_name(comp, + iterator = bt_self_component_port_input_message_iterator_create_from_sink_component( + comp, bt_self_component_sink_borrow_input_port_by_name(comp, in_port_name)); if (!iterator) { status = BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_MEMORY_ERROR; diff --git a/src/plugins/text/pretty/pretty.c b/src/plugins/text/pretty/pretty.c index 5c0aae82..d0e1fbbc 100644 --- a/src/plugins/text/pretty/pretty.c +++ b/src/plugins/text/pretty/pretty.c @@ -162,8 +162,8 @@ pretty_graph_is_configured(bt_self_component_sink *comp) bt_self_component_sink_as_self_component(comp)); BT_ASSERT(pretty); BT_ASSERT(!pretty->iterator); - pretty->iterator = bt_self_component_port_input_message_iterator_create( - bt_self_component_sink_borrow_input_port_by_name(comp, + pretty->iterator = bt_self_component_port_input_message_iterator_create_from_sink_component( + comp, bt_self_component_sink_borrow_input_port_by_name(comp, in_port_name)); if (!pretty->iterator) { status = BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_ERROR; diff --git a/src/plugins/utils/counter/counter.c b/src/plugins/utils/counter/counter.c index 0bbecd0e..f61ee043 100644 --- a/src/plugins/utils/counter/counter.c +++ b/src/plugins/utils/counter/counter.c @@ -222,8 +222,8 @@ counter_graph_is_configured( counter = bt_self_component_get_data( bt_self_component_sink_as_self_component(comp)); BT_ASSERT(counter); - iterator = bt_self_component_port_input_message_iterator_create( - bt_self_component_sink_borrow_input_port_by_name(comp, + iterator = bt_self_component_port_input_message_iterator_create_from_sink_component( + comp, bt_self_component_sink_borrow_input_port_by_name(comp, in_port_name)); if (!iterator) { status = BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_MEMORY_ERROR; diff --git a/src/plugins/utils/dummy/dummy.c b/src/plugins/utils/dummy/dummy.c index a5516cc6..681db3b6 100644 --- a/src/plugins/utils/dummy/dummy.c +++ b/src/plugins/utils/dummy/dummy.c @@ -99,8 +99,8 @@ bt_component_class_sink_graph_is_configured_method_status dummy_graph_is_configu dummy = bt_self_component_get_data( bt_self_component_sink_as_self_component(comp)); BT_ASSERT(dummy); - iterator = bt_self_component_port_input_message_iterator_create( - bt_self_component_sink_borrow_input_port_by_name(comp, + iterator = bt_self_component_port_input_message_iterator_create_from_sink_component( + comp, bt_self_component_sink_borrow_input_port_by_name(comp, in_port_name)); if (!iterator) { status = BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_MEMORY_ERROR; diff --git a/src/plugins/utils/muxer/muxer.c b/src/plugins/utils/muxer/muxer.c index 9510d07a..31d374fc 100644 --- a/src/plugins/utils/muxer/muxer.c +++ b/src/plugins/utils/muxer/muxer.c @@ -73,6 +73,9 @@ enum muxer_msg_iter_clock_class_expectation { struct muxer_msg_iter { struct muxer_comp *muxer_comp; + /* Weak */ + bt_self_message_iterator *self_msg_iter; + /* * Array of struct muxer_upstream_msg_iter * (owned by this). * @@ -427,6 +430,7 @@ void muxer_finalize(bt_self_component_filter *self_comp) static bt_self_component_port_input_message_iterator * create_msg_iter_on_input_port(struct muxer_comp *muxer_comp, + struct muxer_msg_iter *muxer_msg_iter, bt_self_component_port_input *self_port) { const bt_port *port = bt_self_component_port_as_port( @@ -441,8 +445,8 @@ create_msg_iter_on_input_port(struct muxer_comp *muxer_comp, // TODO: Advance the iterator to >= the time of the latest // returned message by the muxer message // iterator which creates it. - msg_iter = bt_self_component_port_input_message_iterator_create( - self_port); + msg_iter = bt_self_component_port_input_message_iterator_create_from_message_iterator( + muxer_msg_iter->self_msg_iter, self_port); if (!msg_iter) { BT_COMP_LOGE("Cannot create upstream message iterator on input port: " "port-addr=%p, port-name=\"%s\"", @@ -1231,7 +1235,7 @@ int muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, } upstream_msg_iter = create_msg_iter_on_input_port(muxer_comp, - self_port); + muxer_msg_iter, self_port); if (!upstream_msg_iter) { /* create_msg_iter_on_input_port() logs errors */ BT_ASSERT(!upstream_msg_iter); @@ -1292,6 +1296,7 @@ bt_component_class_message_iterator_init_method_status muxer_msg_iter_init( } muxer_msg_iter->muxer_comp = muxer_comp; + muxer_msg_iter->self_msg_iter = self_msg_iter; muxer_msg_iter->last_returned_ts_ns = INT64_MIN; muxer_msg_iter->active_muxer_upstream_msg_iters = g_ptr_array_new_with_free_func( diff --git a/src/plugins/utils/trimmer/trimmer.c b/src/plugins/utils/trimmer/trimmer.c index 7cf741cf..5c7ec431 100644 --- a/src/plugins/utils/trimmer/trimmer.c +++ b/src/plugins/utils/trimmer/trimmer.c @@ -685,7 +685,8 @@ bt_component_class_message_iterator_init_method_status trimmer_msg_iter_init( trimmer_it->begin = trimmer_it->trimmer_comp->begin; trimmer_it->end = trimmer_it->trimmer_comp->end; trimmer_it->upstream_iter = - bt_self_component_port_input_message_iterator_create( + bt_self_component_port_input_message_iterator_create_from_message_iterator( + self_msg_iter, bt_self_component_filter_borrow_input_port_by_name( self_comp, in_port_name)); if (!trimmer_it->upstream_iter) { diff --git a/tests/bindings/python/bt2/test_error.py b/tests/bindings/python/bt2/test_error.py index 2f9bebce..8caef103 100644 --- a/tests/bindings/python/bt2/test_error.py +++ b/tests/bindings/python/bt2/test_error.py @@ -45,7 +45,7 @@ class WorkingSink(bt2._UserSinkComponent): self._in = self._add_input_port('in') def _graph_is_configured(self): - self._iter = self._in.create_message_iterator() + self._iter = self._create_input_port_message_iterator(self._in) def _consume(self): next(self._iter) @@ -56,7 +56,7 @@ class SinkWithExceptionChaining(bt2._UserSinkComponent): self._in = self._add_input_port('in') def _graph_is_configured(self): - self._iter = self._in.create_message_iterator() + self._iter = self._create_input_port_message_iterator(self._in) def _consume(self): try: diff --git a/tests/bindings/python/bt2/test_graph.py b/tests/bindings/python/bt2/test_graph.py index f6f5dcaa..4f6aade1 100644 --- a/tests/bindings/python/bt2/test_graph.py +++ b/tests/bindings/python/bt2/test_graph.py @@ -224,7 +224,9 @@ class GraphTestCase(unittest.TestCase): return next(self._msg_iter) def _graph_is_configured(self): - self._msg_iter = self._input_ports['in'].create_message_iterator() + self._msg_iter = self._create_input_port_message_iterator( + self._input_ports['in'] + ) graph = bt2.Graph() up = graph.add_component(MySource, 'down') @@ -280,7 +282,9 @@ class GraphTestCase(unittest.TestCase): comp_self._at += 1 def _graph_is_configured(self): - self._msg_iter = self._input_port.create_message_iterator() + self._msg_iter = self._create_input_port_message_iterator( + self._input_port + ) src = self._graph.add_component(MySource, 'src') sink = self._graph.add_component(MySink, 'sink') @@ -329,7 +333,9 @@ class GraphTestCase(unittest.TestCase): comp_self._at += 1 def _graph_is_configured(self): - self._msg_iter = self._input_port.create_message_iterator() + self._msg_iter = self._create_input_port_message_iterator( + self._input_port + ) src = self._graph.add_component(MySource, 'src') sink = self._graph.add_component(MySink, 'sink') @@ -386,7 +392,9 @@ class GraphTestCase(unittest.TestCase): comp_self._at += 1 def _graph_is_configured(self): - self._msg_iter = self._input_port.create_message_iterator() + self._msg_iter = self._create_input_port_message_iterator( + self._input_port + ) src = self._graph.add_component(MySource, 'src') sink = self._graph.add_component(MySink, 'sink') diff --git a/tests/bindings/python/bt2/test_message_iterator.py b/tests/bindings/python/bt2/test_message_iterator.py index 0f9182f2..f780cf2c 100644 --- a/tests/bindings/python/bt2/test_message_iterator.py +++ b/tests/bindings/python/bt2/test_message_iterator.py @@ -25,7 +25,7 @@ import bt2 class UserMessageIteratorTestCase(unittest.TestCase): @staticmethod - def _create_graph(src_comp_cls): + def _create_graph(src_comp_cls, flt_comp_cls=None): class MySink(bt2._UserSinkComponent): def __init__(self, params): self._add_input_port('in') @@ -34,12 +34,28 @@ class UserMessageIteratorTestCase(unittest.TestCase): next(self._msg_iter) def _graph_is_configured(self): - self._msg_iter = self._input_ports['in'].create_message_iterator() + self._msg_iter = self._create_input_port_message_iterator( + self._input_ports['in'] + ) graph = bt2.Graph() src_comp = graph.add_component(src_comp_cls, 'src') + + if flt_comp_cls is not None: + flt_comp = graph.add_component(flt_comp_cls, 'flt') + sink_comp = graph.add_component(MySink, 'sink') - graph.connect_ports(src_comp.output_ports['out'], sink_comp.input_ports['in']) + + if flt_comp_cls is not None: + assert flt_comp is not None + graph.connect_ports( + src_comp.output_ports['out'], flt_comp.input_ports['in'] + ) + out_port = flt_comp.output_ports['out'] + else: + out_port = src_comp.output_ports['out'] + + graph.connect_ports(out_port, sink_comp.input_ports['in']) return graph def test_init(self): @@ -67,6 +83,39 @@ class UserMessageIteratorTestCase(unittest.TestCase): ) self.assertEqual(the_output_port_from_iter.user_data, 'user data') + def test_create_from_message_iterator(self): + class MySourceIter(bt2._UserMessageIterator): + def __init__(self, self_port_output): + nonlocal src_iter_initialized + src_iter_initialized = True + + class MySource(bt2._UserSourceComponent, message_iterator_class=MySourceIter): + def __init__(self, params): + self._add_output_port('out') + + class MyFilterIter(bt2._UserMessageIterator): + def __init__(self, self_port_output): + nonlocal flt_iter_initialized + flt_iter_initialized = True + self._up_iter = self._create_input_port_message_iterator( + self._component._input_ports['in'] + ) + + def __next__(self): + return next(self._up_iter) + + class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter): + def __init__(self, params): + self._add_input_port('in') + self._add_output_port('out') + + src_iter_initialized = False + flt_iter_initialized = False + graph = self._create_graph(MySource, MyFilter) + graph.run() + self.assertTrue(src_iter_initialized) + self.assertTrue(flt_iter_initialized) + def test_finalize(self): class MyIter(bt2._UserMessageIterator): def _finalize(self): @@ -208,7 +257,9 @@ class UserMessageIteratorTestCase(unittest.TestCase): class MyFilterIter(bt2._UserMessageIterator): def __init__(self, port): input_port = port.user_data - self._upstream_iter = input_port.create_message_iterator() + self._upstream_iter = self._create_input_port_message_iterator( + input_port + ) def __next__(self): return next(self._upstream_iter)