sink.input_ports['in'], src.output_ports['out']
)
- def test_cancel(self):
- self.assertFalse(self._graph.is_canceled)
- self._graph.cancel()
- self.assertTrue(self._graph.is_canceled)
-
- # Test that Graph.run() raises bt2.Canceled if the graph gets canceled
- # during execution.
- def test_cancel_while_running(self):
+ def test_add_interrupter(self):
+ class MyIter(bt2._UserMessageIterator):
+ def __next__(self):
+ raise TypeError
+
+ class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ def __init__(self, params):
+ self._add_output_port('out')
+
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params):
+ self._add_input_port('in')
+
+ def _consume(self):
+ next(self._msg_iter)
+
+ def _graph_is_configured(self):
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_ports['in']
+ )
+
+ # add two interrupters, set one of them
+ interrupter1 = bt2.Interrupter()
+ interrupter2 = bt2.Interrupter()
+ self._graph.add_interrupter(interrupter1)
+ src = self._graph.add_component(MySource, 'src')
+ sink = self._graph.add_component(MySink, 'sink')
+ self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
+ self._graph.add_interrupter(interrupter2)
+
+ with self.assertRaises(bt2._Error):
+ self._graph.run()
+
+ interrupter2.set()
+
+ with self.assertRaises(bt2.TryAgain):
+ self._graph.run()
+
+ interrupter2.reset()
+
+ with self.assertRaises(bt2._Error):
+ self._graph.run()
+
+ # Test that Graph.run() raises bt2.Interrupted if the graph gets
+ # interrupted during execution.
+ def test_interrupt_while_running(self):
class MyIter(_MyIter):
def __next__(self):
return self._create_stream_beginning_message(self._stream)
self._add_input_port('in')
def _consume(self):
- # Pretend that somebody asynchronously cancelled the graph.
+ # Pretend that somebody asynchronously interrupted the graph.
nonlocal graph
- graph.cancel()
-
+ graph.interrupt()
return next(self._msg_iter)
def _graph_is_configured(self):
- self._msg_iter = self._input_ports['in'].create_message_iterator()
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_ports['in']
+ )
- graph = bt2.Graph()
- up = graph.add_component(MySource, 'down')
- down = graph.add_component(MySink, 'up')
- graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
- with self.assertRaises(bt2.Canceled):
- graph.run()
+ graph = self._graph
+ up = self._graph.add_component(MySource, 'down')
+ down = self._graph.add_component(MySink, 'up')
+ self._graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
+
+ with self.assertRaises(bt2.TryAgain):
+ self._graph.run()
def test_run(self):
class MyIter(_MyIter):
msg = next(comp_self._msg_iter)
if comp_self._at == 0:
- self.assertIsInstance(msg, bt2.message._StreamBeginningMessage)
+ self.assertIsInstance(msg, bt2._StreamBeginningMessage)
elif comp_self._at == 1:
- self.assertIsInstance(msg, bt2.message._PacketBeginningMessage)
+ self.assertIsInstance(msg, bt2._PacketBeginningMessage)
elif comp_self._at >= 2 and comp_self._at <= 6:
- self.assertIsInstance(msg, bt2.message._EventMessage)
+ self.assertIsInstance(msg, bt2._EventMessage)
self.assertEqual(msg.event.cls.name, 'salut')
elif comp_self._at == 7:
- self.assertIsInstance(msg, bt2.message._PacketEndMessage)
+ self.assertIsInstance(msg, bt2._PacketEndMessage)
elif comp_self._at == 8:
- self.assertIsInstance(msg, bt2.message._StreamEndMessage)
+ self.assertIsInstance(msg, bt2._StreamEndMessage)
comp_self._at += 1
def _graph_is_configured(self):
- self._msg_iter = self._input_port.create_message_iterator()
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_port
+ )
src = self._graph.add_component(MySource, 'src')
sink = self._graph.add_component(MySink, 'sink')
def _consume(comp_self):
msg = next(comp_self._msg_iter)
if comp_self._at == 0:
- self.assertIsInstance(msg, bt2.message._StreamBeginningMessage)
+ self.assertIsInstance(msg, bt2._StreamBeginningMessage)
elif comp_self._at == 1:
- self.assertIsInstance(msg, bt2.message._PacketBeginningMessage)
+ self.assertIsInstance(msg, bt2._PacketBeginningMessage)
elif comp_self._at == 2:
- self.assertIsInstance(msg, bt2.message._EventMessage)
+ self.assertIsInstance(msg, bt2._EventMessage)
raise bt2.TryAgain
else:
pass
comp_self._at += 1
def _graph_is_configured(self):
- self._msg_iter = self._input_port.create_message_iterator()
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_port
+ )
src = self._graph.add_component(MySource, 'src')
sink = self._graph.add_component(MySink, 'sink')
def _consume(comp_self):
msg = next(comp_self._msg_iter)
if comp_self._at == 0:
- self.assertIsInstance(msg, bt2.message._StreamBeginningMessage)
+ self.assertIsInstance(msg, bt2._StreamBeginningMessage)
elif comp_self._at == 1:
- self.assertIsInstance(msg, bt2.message._PacketBeginningMessage)
+ self.assertIsInstance(msg, bt2._PacketBeginningMessage)
elif comp_self._at == 2:
- self.assertIsInstance(msg, bt2.message._EventMessage)
+ self.assertIsInstance(msg, bt2._EventMessage)
elif comp_self._at == 3:
nonlocal raised_in_sink
raised_in_sink = True
comp_self._at += 1
def _graph_is_configured(self):
- self._msg_iter = self._input_port.create_message_iterator()
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_port
+ )
src = self._graph.add_component(MySource, 'src')
sink = self._graph.add_component(MySink, 'sink')
src.output_ports['out'], sink.input_ports['in']
)
- with self.assertRaises(bt2.Error):
+ with self.assertRaises(bt2._Error):
self._graph.run()
def test_listeners(self):
graph = bt2.Graph()
- with self.assertRaises(bt2.Error):
+ with self.assertRaises(bt2._Error):
graph.add_component(MySink, 'comp')
def test_raise_in_port_added_listener(self):
graph = bt2.Graph()
graph.add_port_added_listener(port_added_listener)
- with self.assertRaises(bt2.Error):
+ with self.assertRaises(bt2._Error):
graph.add_component(MySink, 'comp')
def test_raise_in_ports_connected_listener(self):
up = graph.add_component(MySource, 'down')
down = graph.add_component(MySink, 'up')
- with self.assertRaises(bt2.Error):
+ with self.assertRaises(bt2._Error):
graph.connect_ports(up.output_ports['out'], down.input_ports['in'])