- graph.add_component(MySink, 'comp')
-
- def test_raise_in_ports_connected_listener(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, config, params, obj):
- self._add_output_port('out')
-
- class MySink(bt2._UserSinkComponent):
- def __init__(self, config, params, obj):
- self._add_input_port('in')
-
- def _user_consume(self):
- raise bt2.Stop
-
- def ports_connected_listener(
- upstream_component, upstream_port, downstream_component, downstream_port
- ):
- raise ValueError('oh noes!')
-
- graph = bt2.Graph()
- graph.add_ports_connected_listener(ports_connected_listener)
- up = graph.add_component(MySource, 'down')
- down = graph.add_component(MySink, 'up')
-
- with self.assertRaises(bt2._Error):
- graph.connect_ports(up.output_ports['out'], down.input_ports['in'])