lib: remove output port message iterator
[babeltrace.git] / tests / bindings / python / bt2 / utils.py
index 1f9a6c2768ae2e0635d55c136425b5250ac8e030..7707a531d1a7576c9d32328dda43cf2ed831eea1 100644 (file)
 #
 
 import bt2
+import collections.abc
 
 # Run callable `func` in the context of a component's __init__ method.  The
 # callable is passed the Component being instantiated.
 #
 # The value returned by the callable is returned by run_in_component_init.
-
-
 def run_in_component_init(func):
     class MySink(bt2._UserSinkComponent):
         def __init__(self, params, obj):
@@ -52,10 +51,56 @@ def run_in_component_init(func):
 
 
 # Create an empty trace class with default values.
-
-
 def get_default_trace_class():
     def f(comp_self):
         return comp_self._create_trace_class()
 
     return run_in_component_init(f)
+
+
+# Proxy sink component class.
+#
+# This sink accepts a list of a single item as its initialization
+# object. This sink creates a single input port `in`. When it consumes
+# from this port, it puts the returned message in the initialization
+# list as the first item.
+class TestProxySink(bt2._UserSinkComponent):
+    def __init__(self, params, msg_list):
+        assert msg_list is not None
+        self._msg_list = msg_list
+        self._add_input_port('in')
+
+    def _user_graph_is_configured(self):
+        self._msg_iter = self._create_input_port_message_iterator(
+            self._input_ports['in']
+        )
+
+    def _user_consume(self):
+        assert self._msg_list[0] is None
+        self._msg_list[0] = next(self._msg_iter)
+
+
+# This is a helper message iterator for tests.
+#
+# The constructor accepts a graph and an output port.
+#
+# Internally, it adds a proxy sink to the graph and connects the
+# received output port to the proxy sink's input port. Its __next__()
+# method then uses the proxy sink to transfer the consumed message to
+# the output port message iterator's user.
+#
+# This message iterator cannot seek.
+class TestOutputPortMessageIterator(collections.abc.Iterator):
+    def __init__(self, graph, output_port):
+        self._graph = graph
+        self._msg_list = [None]
+        sink = graph.add_component(TestProxySink, 'test-proxy-sink', obj=self._msg_list)
+        graph.connect_ports(output_port, sink.input_ports['in'])
+
+    def __next__(self):
+        assert self._msg_list[0] is None
+        self._graph.run_once()
+        msg = self._msg_list[0]
+        assert msg is not None
+        self._msg_list[0] = None
+        return msg
This page took 0.02472 seconds and 4 git commands to generate.