- calls = []
- self._graph.add_listener(bt2.GraphListenerType.PORT_ADDED,
- port_added_listener)
- self._graph.add_listener(bt2.GraphListenerType.PORT_REMOVED,
- port_removed_listener)
- self._graph.add_listener(bt2.GraphListenerType.PORTS_CONNECTED,
- ports_connected_listener)
- self._graph.add_listener(bt2.GraphListenerType.PORTS_DISCONNECTED,
- ports_disconnected_listener)
- src = self._graph.add_component(MySource, 'src')
- sink = self._graph.add_component(MySink, 'sink')
- self._graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
- sink.input_ports['in'].disconnect()
- self.assertIs(calls[0][0], port_added_listener)
- self.assertEqual(calls[0][1].name, 'out')
- self.assertIs(calls[1][0], port_added_listener)
- self.assertEqual(calls[1][1].name, 'zero')
- self.assertIs(calls[2][0], port_added_listener)
- self.assertEqual(calls[2][1].name, 'in')
- self.assertIs(calls[3][0], port_removed_listener)
- self.assertEqual(calls[3][1].name, 'zero')
- self.assertIs(calls[4][0], port_added_listener)
- self.assertEqual(calls[4][1].name, 'taste')
- self.assertIs(calls[5][0], ports_connected_listener)
- self.assertEqual(calls[5][1].name, 'out')
- self.assertEqual(calls[5][2].name, 'in')
- self.assertIs(calls[6][0], port_removed_listener)
- self.assertEqual(calls[6][1].name, 'in')
- self.assertIs(calls[7][0], ports_disconnected_listener)
- self.assertEqual(calls[7][1].name, 'src')
- self.assertEqual(calls[7][2].name, 'sink')
- self.assertEqual(calls[7][3].name, 'out')
- self.assertEqual(calls[7][4].name, 'in')
- del calls
+ def test_raise_in_port_added_listener(self):
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params):
+ self._add_input_port('in')
+
+ def _consume(self):
+ raise bt2.Stop
+
+ def port_added_listener(component, port):
+ raise ValueError('oh noes!')
+
+ graph = bt2.Graph()
+ graph.add_port_added_listener(port_added_listener)
+
+ with self.assertRaises(bt2.Error):
+ graph.add_component(MySink, 'comp')
+
+ def test_raise_in_ports_connected_listener(self):
+ class MyIter(bt2._UserMessageIterator):
+ def __next__(self):
+ raise bt2.Stop
+
+ class MySource(bt2._UserSourceComponent,
+ message_iterator_class=MyIter):
+ def __init__(self, params):
+ self._add_output_port('out')
+
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params):
+ self._add_input_port('in')
+
+ def _consume(self):
+ raise bt2.Stop
+
+ def ports_connected_listener(upstream_component, upstream_port,
+ downstream_component, downstream_port):
+ raise ValueError('oh noes!')
+
+ graph = bt2.Graph()
+ graph.add_ports_connected_listener(ports_connected_listener)
+ up = graph.add_component(MySource, 'down')
+ down = graph.add_component(MySink, 'up')
+
+ with self.assertRaises(bt2.Error):
+ graph.connect_ports(up.output_ports['out'], down.input_ports['in'])