+ # Test that creating a message iterator from a sink component on a
+ # non-connected input port raises.
+ def test_create_from_sink_component_unconnected_port_raises(self):
+ class MySink(bt2._UserSinkComponent):
+ def __init__(comp_self, config, params, obj):
+ comp_self._input_port = comp_self._add_input_port("in")
+
+ def _user_graph_is_configured(comp_self):
+ with self.assertRaisesRegex(ValueError, "input port is not connected"):
+ comp_self._create_message_iterator(comp_self._input_port)
+
+ nonlocal seen
+ seen = True
+
+ def _user_consume(self):
+ raise bt2.Stop
+
+ seen = False
+ graph = bt2.Graph()
+ graph.add_component(MySink, "snk")
+ graph.run()
+ self.assertTrue(seen)
+
+ # Test that creating a message iterator from a message iterator on a
+ # non-connected input port raises.
+ def test_create_from_message_iterator_unconnected_port_raises(self):
+ class MyFilterIter(bt2._UserMessageIterator):
+ def __init__(iter_self, config, port):
+ input_port = iter_self._component._input_ports["in"]
+
+ with self.assertRaisesRegex(ValueError, "input port is not connected"):
+ iter_self._create_message_iterator(input_port)
+
+ nonlocal seen
+ seen = True
+
+ class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
+ def __init__(comp_self, config, params, obj):
+ comp_self._add_input_port("in")
+ comp_self._add_output_port("out")
+
+ class MySink(bt2._UserSinkComponent):
+ def __init__(comp_self, config, params, obj):
+ comp_self._input_port = comp_self._add_input_port("in")
+
+ def _user_graph_is_configured(comp_self):
+ comp_self._input_iter = comp_self._create_message_iterator(
+ comp_self._input_port
+ )
+
+ def _user_consume(self):
+ raise bt2.Stop
+
+ seen = False
+ graph = bt2.Graph()
+ flt = graph.add_component(MyFilter, "flt")
+ snk = graph.add_component(MySink, "snk")
+ graph.connect_ports(flt.output_ports["out"], snk.input_ports["in"])
+ graph.run()
+ self.assertTrue(seen)
+