bt2: check that port is connected when creating message iterator
[babeltrace.git] / tests / bindings / python / bt2 / test_message_iterator.py
index 54fd232907dadc4c7a51f122844b1d31bb54bbe7..a40283b34f5ffcaa75b5e2d6245e03f3748385e7 100644 (file)
@@ -100,6 +100,67 @@ class UserMessageIteratorTestCase(unittest.TestCase):
         self.assertTrue(src_iter_initialized)
         self.assertTrue(flt_iter_initialized)
 
+    # Test that creating a message iterator from a sink component on a
+    # non-connected inport port raises.
+    def test_create_from_sink_component_unconnected_port_raises(self):
+        class MySink(bt2._UserSinkComponent):
+            def __init__(comp_self, config, params, obj):
+                comp_self._input_port = comp_self._add_input_port('in')
+
+            def _user_graph_is_configured(comp_self):
+                with self.assertRaisesRegex(ValueError, 'input port is not connected'):
+                    comp_self._create_message_iterator(comp_self._input_port)
+
+                nonlocal seen
+                seen = True
+
+            def _user_consume(self):
+                raise bt2.Stop
+
+        seen = False
+        graph = bt2.Graph()
+        graph.add_component(MySink, 'snk')
+        graph.run()
+        self.assertTrue(seen)
+
+    # Test that creating a message iterator from a message iteartor on a
+    # non-connected inport port raises.
+    def test_create_from_message_iterator_unconnected_port_raises(self):
+        class MyFilterIter(bt2._UserMessageIterator):
+            def __init__(iter_self, config, port):
+                input_port = iter_self._component._input_ports['in']
+
+                with self.assertRaisesRegex(ValueError, 'input port is not connected'):
+                    iter_self._create_message_iterator(input_port)
+
+                nonlocal seen
+                seen = True
+
+        class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
+            def __init__(comp_self, config, params, obj):
+                comp_self._add_input_port('in')
+                comp_self._add_output_port('out')
+
+        class MySink(bt2._UserSinkComponent):
+            def __init__(comp_self, config, params, obj):
+                comp_self._input_port = comp_self._add_input_port('in')
+
+            def _user_graph_is_configured(comp_self):
+                comp_self._input_iter = comp_self._create_message_iterator(
+                    comp_self._input_port
+                )
+
+            def _user_consume(self):
+                raise bt2.Stop
+
+        seen = False
+        graph = bt2.Graph()
+        flt = graph.add_component(MyFilter, 'flt')
+        snk = graph.add_component(MySink, 'snk')
+        graph.connect_ports(flt.output_ports['out'], snk.input_ports['in'])
+        graph.run()
+        self.assertTrue(seen)
+
     def test_create_user_error(self):
         # This tests both error handling by
         # _UserSinkComponent._create_message_iterator
This page took 0.023736 seconds and 4 git commands to generate.