)
self._graph.run()
+ def test_run_once(self):
+ class MyIter(_MyIter):
+ pass
+
+ class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ def __init__(self, params, obj):
+ self._add_output_port('out')
+
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params, obj):
+ self._input_port = self._add_input_port('in')
+
+ def _user_consume(comp_self):
+ nonlocal run_count
+ run_count += 1
+ raise bt2.TryAgain
+
+ run_count = 0
+ src = self._graph.add_component(MySource, 'src')
+ sink = self._graph.add_component(MySink, 'sink')
+ conn = self._graph.connect_ports(
+ src.output_ports['out'], sink.input_ports['in']
+ )
+
+ with self.assertRaises(bt2.TryAgain):
+ self._graph.run_once()
+
+ self.assertEqual(run_count, 1)
+
+ def test_run_once_stops(self):
+ class MyIter(_MyIter):
+ pass
+
+ class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ def __init__(self, params, obj):
+ self._add_output_port('out')
+
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params, obj):
+ self._input_port = self._add_input_port('in')
+
+ def _user_consume(comp_self):
+ raise bt2.Stop
+
+ src = self._graph.add_component(MySource, 'src')
+ sink = self._graph.add_component(MySink, 'sink')
+ conn = self._graph.connect_ports(
+ src.output_ports['out'], sink.input_ports['in']
+ )
+
+ with self.assertRaises(bt2.Stop):
+ self._graph.run_once()
+
def test_run_again(self):
class MyIter(_MyIter):
def __next__(self):