+
+
+# Proxy sink component class.
+#
+# This sink accepts a list of a single item as its initialization
+# object. This sink creates a single input port `in`. When it consumes
+# from this port, it puts the returned message in the initialization
+# list as the first item.
+class TestProxySink(bt2._UserSinkComponent):
+ def __init__(self, params, msg_list):
+ assert msg_list is not None
+ self._msg_list = msg_list
+ self._add_input_port('in')
+
+ def _user_graph_is_configured(self):
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_ports['in']
+ )
+
+ def _user_consume(self):
+ assert self._msg_list[0] is None
+ self._msg_list[0] = next(self._msg_iter)
+
+
+# This is a helper message iterator for tests.
+#
+# The constructor accepts a graph and an output port.
+#
+# Internally, it adds a proxy sink to the graph and connects the
+# received output port to the proxy sink's input port. Its __next__()
+# method then uses the proxy sink to transfer the consumed message to
+# the output port message iterator's user.
+#
+# This message iterator cannot seek.
+class TestOutputPortMessageIterator(collections.abc.Iterator):
+ def __init__(self, graph, output_port):
+ self._graph = graph
+ self._msg_list = [None]
+ sink = graph.add_component(TestProxySink, 'test-proxy-sink', obj=self._msg_list)
+ graph.connect_ports(output_port, sink.input_ports['in'])
+
+ def __next__(self):
+ assert self._msg_list[0] is None
+ self._graph.run_once()
+ msg = self._msg_list[0]
+ assert msg is not None
+ self._msg_list[0] = None
+ return msg