conn = self._graph.connect_ports(src.output_ports['out'],
sink.input_ports['in'])
+ def test_connect_ports_cannot_consume_accept(self):
+ class MyIter(bt2._UserNotificationIterator):
+ def __next__(self):
+ raise bt2.Stop
+
+ class MySource(bt2._UserSourceComponent,
+ notification_iterator_class=MyIter):
+ def __init__(self, params):
+ self._add_output_port('out')
+
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params):
+ self._add_input_port('in')
+
+ def _consume(self):
+ raise bt2.Stop
+
+ def _accept_port_connection(self, port, other_port):
+ nonlocal exc
+
+ try:
+ self.graph.run()
+ except Exception as e:
+ exc = e
+
+ return True
+
+ exc = None
+ src = self._graph.add_component(MySource, 'src')
+ sink = self._graph.add_component(MySink, 'sink')
+ self._graph.connect_ports(src.output_ports['out'],
+ sink.input_ports['in'])
+ self.assertIs(type(exc), bt2.CannotConsumeGraph)
+
+ def test_connect_ports_cannot_consume_connected(self):
+ class MyIter(bt2._UserNotificationIterator):
+ def __next__(self):
+ raise bt2.Stop
+
+ class MySource(bt2._UserSourceComponent,
+ notification_iterator_class=MyIter):
+ def __init__(self, params):
+ self._add_output_port('out')
+
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params):
+ self._add_input_port('in')
+
+ def _consume(self):
+ raise bt2.Stop
+
+ def _port_connected(self, port, other_port):
+ nonlocal exc
+
+ try:
+ self.graph.run()
+ except Exception as e:
+ exc = e
+
+ return True
+
+ exc = None
+ src = self._graph.add_component(MySource, 'src')
+ sink = self._graph.add_component(MySink, 'sink')
+ self._graph.connect_ports(src.output_ports['out'],
+ sink.input_ports['in'])
+ self._graph.run()
+ self.assertIs(type(exc), bt2.CannotConsumeGraph)
+
def test_cancel(self):
self.assertFalse(self._graph.is_canceled)
self._graph.cancel()
with self.assertRaises(bt2.Error):
self._graph.run()
+ def test_run_cannot_consume(self):
+ class MyIter(bt2._UserNotificationIterator):
+ pass
+
+ class MySource(bt2._UserSourceComponent,
+ notification_iterator_class=MyIter):
+ def __init__(self, params):
+ self._add_output_port('out')
+
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params):
+ self._add_input_port('in')
+ self._at = 0
+
+ def _consume(comp_self):
+ nonlocal exc
+
+ try:
+ print('going in')
+ comp_self.graph.run()
+ print('going out')
+ except Exception as e:
+ exc = e
+
+ raise bt2.Stop
+
+ exc = None
+ src = self._graph.add_component(MySource, 'src')
+ sink = self._graph.add_component(MySink, 'sink')
+ conn = self._graph.connect_ports(src.output_ports['out'],
+ sink.input_ports['in'])
+ self._graph.run()
+ self.assertIs(type(exc), bt2.CannotConsumeGraph)
+
def test_listeners(self):
class MyIter(bt2._UserNotificationIterator):
def __next__(self):