- def test_connect_ports_refused(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
- self._add_output_port('out')
-
- class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
- self._add_input_port('in')
-
- def _consume(self):
- raise bt2.Stop
-
- def _accept_port_connection(self, port, other_port):
- return False
-
- src = self._graph.add_component(MySource, 'src')
- sink = self._graph.add_component(MySink, 'sink')
-
- with self.assertRaises(bt2.PortConnectionRefused):
- conn = self._graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
-