# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
-from bt2 import value
-import collections
import unittest
-import copy
import bt2
def tearDown(self):
del self._graph
- def test_create_empty(self):
- graph = bt2.Graph()
+ def test_create_default(self):
+ bt2.Graph()
+
+ def test_create_known_mip_version(self):
+ bt2.Graph(0)
+
+ def test_create_invalid_mip_version_type(self):
+ with self.assertRaises(TypeError):
+ bt2.Graph('')
+
+ def test_create_unknown_mip_version(self):
+ with self.assertRaisesRegex(ValueError, 'unknown MIP version'):
+ bt2.Graph(1)
def test_add_component_user_cls(self):
class MySink(bt2._UserSinkComponent):
- def _consume(self):
- pass
-
- def _graph_is_configured(self):
+ def _user_consume(self):
pass
comp = self._graph.add_component(MySink, 'salut')
def test_add_component_gen_cls(self):
class MySink(bt2._UserSinkComponent):
- def _consume(self):
- pass
-
- def _graph_is_configured(self):
+ def _user_consume(self):
pass
comp = self._graph.add_component(MySink, 'salut')
comp_params = None
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, params, obj):
nonlocal comp_params
comp_params = params
- def _consume(self):
- pass
-
- def _graph_is_configured(self):
+ def _user_consume(self):
pass
params = {'hello': 23, 'path': '/path/to/stuff'}
- comp = self._graph.add_component(MySink, 'salut', params)
+ self._graph.add_component(MySink, 'salut', params)
self.assertEqual(params, comp_params)
del comp_params
+ def test_add_component_obj_python_comp_cls(self):
+ comp_obj = None
+
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params, obj):
+ nonlocal comp_obj
+ comp_obj = obj
+
+ def _user_consume(self):
+ pass
+
+ obj = object()
+ self._graph.add_component(MySink, 'salut', obj=obj)
+ self.assertIs(comp_obj, obj)
+ del comp_obj
+
+ def test_add_component_obj_none_python_comp_cls(self):
+ comp_obj = None
+
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params, obj):
+ nonlocal comp_obj
+ comp_obj = obj
+
+ def _user_consume(self):
+ pass
+
+ self._graph.add_component(MySink, 'salut')
+ self.assertIsNone(comp_obj)
+ del comp_obj
+
+ def test_add_component_obj_non_python_comp_cls(self):
+ plugin = bt2.find_plugin('text', find_in_user_dir=False, find_in_sys_dir=False)
+ assert plugin is not None
+ cc = plugin.source_component_classes['dmesg']
+ assert cc is not None
+
+ with self.assertRaises(ValueError):
+ self._graph.add_component(cc, 'salut', obj=57)
+
def test_add_component_invalid_cls_type(self):
with self.assertRaises(TypeError):
self._graph.add_component(int, 'salut')
def test_add_component_invalid_logging_level_type(self):
class MySink(bt2._UserSinkComponent):
- def _consume(self):
- pass
-
- def _graph_is_configured(self):
+ def _user_consume(self):
pass
with self.assertRaises(TypeError):
def test_add_component_invalid_logging_level_value(self):
class MySink(bt2._UserSinkComponent):
- def _consume(self):
- pass
-
- def _graph_is_configured(self):
+ def _user_consume(self):
pass
with self.assertRaises(ValueError):
def test_add_component_logging_level(self):
class MySink(bt2._UserSinkComponent):
- def _consume(self):
- pass
-
- def _graph_is_configured(self):
+ def _user_consume(self):
pass
comp = self._graph.add_component(
raise bt2.Stop
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_input_port('in')
- def _consume(self):
+ def _user_consume(self):
raise bt2.Stop
- def _graph_is_configured(self):
- pass
-
src = self._graph.add_component(MySource, 'src')
sink = self._graph.add_component(MySink, 'sink')
raise bt2.Stop
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_input_port('in')
- def _consume(self):
+ def _user_consume(self):
raise bt2.Stop
- def _graph_is_configured(self):
- pass
-
src = self._graph.add_component(MySource, 'src')
sink = self._graph.add_component(MySink, 'sink')
with self.assertRaises(TypeError):
- conn = self._graph.connect_ports(
- sink.input_ports['in'], src.output_ports['out']
- )
+ self._graph.connect_ports(sink.input_ports['in'], src.output_ports['out'])
+
+ def test_add_interrupter(self):
+ class MyIter(bt2._UserMessageIterator):
+ def __next__(self):
+ raise TypeError
+
+ class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ def __init__(self, params, obj):
+ self._add_output_port('out')
+
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params, obj):
+ self._add_input_port('in')
+
+ def _user_consume(self):
+ next(self._msg_iter)
+
+ def _user_graph_is_configured(self):
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_ports['in']
+ )
- def test_cancel(self):
- self.assertFalse(self._graph.is_canceled)
- self._graph.cancel()
- self.assertTrue(self._graph.is_canceled)
+ # add two interrupters, set one of them
+ interrupter1 = bt2.Interrupter()
+ interrupter2 = bt2.Interrupter()
+ self._graph.add_interrupter(interrupter1)
+ src = self._graph.add_component(MySource, 'src')
+ sink = self._graph.add_component(MySink, 'sink')
+ self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
+ self._graph.add_interrupter(interrupter2)
+
+ with self.assertRaises(bt2._Error):
+ self._graph.run()
+
+ interrupter2.set()
+
+ with self.assertRaises(bt2.TryAgain):
+ self._graph.run()
+
+ interrupter2.reset()
+
+ with self.assertRaises(bt2._Error):
+ self._graph.run()
- # Test that Graph.run() raises bt2.Canceled if the graph gets canceled
- # during execution.
- def test_cancel_while_running(self):
+ # Test that Graph.run() raises bt2.Interrupted if the graph gets
+ # interrupted during execution.
+ def test_interrupt_while_running(self):
class MyIter(_MyIter):
def __next__(self):
return self._create_stream_beginning_message(self._stream)
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_input_port('in')
- def _consume(self):
- # Pretend that somebody asynchronously cancelled the graph.
+ def _user_consume(self):
+ # Pretend that somebody asynchronously interrupted the graph.
nonlocal graph
- graph.cancel()
-
+ graph.interrupt()
return next(self._msg_iter)
- def _graph_is_configured(self):
- self._msg_iter = self._input_ports['in'].create_message_iterator()
+ def _user_graph_is_configured(self):
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_ports['in']
+ )
- graph = bt2.Graph()
- up = graph.add_component(MySource, 'down')
- down = graph.add_component(MySink, 'up')
- graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
- with self.assertRaises(bt2.Canceled):
- graph.run()
+ graph = self._graph
+ up = self._graph.add_component(MySource, 'down')
+ down = self._graph.add_component(MySink, 'up')
+ self._graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
+
+ with self.assertRaises(bt2.TryAgain):
+ self._graph.run()
def test_run(self):
class MyIter(_MyIter):
return msg
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._input_port = self._add_input_port('in')
self._at = 0
- def _consume(comp_self):
+ def _user_consume(comp_self):
msg = next(comp_self._msg_iter)
if comp_self._at == 0:
- self.assertIsInstance(msg, bt2.message._StreamBeginningMessage)
+ self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
elif comp_self._at == 1:
- self.assertIsInstance(msg, bt2.message._PacketBeginningMessage)
+ self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
elif comp_self._at >= 2 and comp_self._at <= 6:
- self.assertIsInstance(msg, bt2.message._EventMessage)
+ self.assertIs(type(msg), bt2._EventMessageConst)
self.assertEqual(msg.event.cls.name, 'salut')
elif comp_self._at == 7:
- self.assertIsInstance(msg, bt2.message._PacketEndMessage)
+ self.assertIs(type(msg), bt2._PacketEndMessageConst)
elif comp_self._at == 8:
- self.assertIsInstance(msg, bt2.message._StreamEndMessage)
+ self.assertIs(type(msg), bt2._StreamEndMessageConst)
comp_self._at += 1
- def _graph_is_configured(self):
- self._msg_iter = self._input_port.create_message_iterator()
+ def _user_graph_is_configured(self):
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_port
+ )
src = self._graph.add_component(MySource, 'src')
sink = self._graph.add_component(MySink, 'sink')
- conn = self._graph.connect_ports(
- src.output_ports['out'], sink.input_ports['in']
- )
+ self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
self._graph.run()
+ def test_run_once(self):
+ class MyIter(_MyIter):
+ pass
+
+ class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ def __init__(self, params, obj):
+ self._add_output_port('out')
+
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params, obj):
+ self._input_port = self._add_input_port('in')
+
+ def _user_consume(comp_self):
+ nonlocal run_count
+ run_count += 1
+ raise bt2.TryAgain
+
+ run_count = 0
+ src = self._graph.add_component(MySource, 'src')
+ sink = self._graph.add_component(MySink, 'sink')
+ self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
+
+ with self.assertRaises(bt2.TryAgain):
+ self._graph.run_once()
+
+ self.assertEqual(run_count, 1)
+
+ def test_run_once_stops(self):
+ class MyIter(_MyIter):
+ pass
+
+ class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ def __init__(self, params, obj):
+ self._add_output_port('out')
+
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params, obj):
+ self._input_port = self._add_input_port('in')
+
+ def _user_consume(comp_self):
+ raise bt2.Stop
+
+ src = self._graph.add_component(MySource, 'src')
+ sink = self._graph.add_component(MySink, 'sink')
+ self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
+
+ with self.assertRaises(bt2.Stop):
+ self._graph.run_once()
+
def test_run_again(self):
class MyIter(_MyIter):
def __next__(self):
return msg
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._input_port = self._add_input_port('in')
self._at = 0
- def _consume(comp_self):
+ def _user_consume(comp_self):
msg = next(comp_self._msg_iter)
if comp_self._at == 0:
- self.assertIsInstance(msg, bt2.message._StreamBeginningMessage)
+ self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
elif comp_self._at == 1:
- self.assertIsInstance(msg, bt2.message._PacketBeginningMessage)
+ self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
elif comp_self._at == 2:
- self.assertIsInstance(msg, bt2.message._EventMessage)
+ self.assertIs(type(msg), bt2._EventMessageConst)
raise bt2.TryAgain
else:
pass
comp_self._at += 1
- def _graph_is_configured(self):
- self._msg_iter = self._input_port.create_message_iterator()
+ def _user_graph_is_configured(self):
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_port
+ )
src = self._graph.add_component(MySource, 'src')
sink = self._graph.add_component(MySink, 'sink')
- conn = self._graph.connect_ports(
- src.output_ports['out'], sink.input_ports['in']
- )
+ self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
with self.assertRaises(bt2.TryAgain):
self._graph.run()
return msg
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._input_port = self._add_input_port('in')
self._at = 0
- def _consume(comp_self):
+ def _user_consume(comp_self):
msg = next(comp_self._msg_iter)
if comp_self._at == 0:
- self.assertIsInstance(msg, bt2.message._StreamBeginningMessage)
+ self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
elif comp_self._at == 1:
- self.assertIsInstance(msg, bt2.message._PacketBeginningMessage)
+ self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
elif comp_self._at == 2:
- self.assertIsInstance(msg, bt2.message._EventMessage)
+ self.assertIs(type(msg), bt2._EventMessageConst)
elif comp_self._at == 3:
nonlocal raised_in_sink
raised_in_sink = True
comp_self._at += 1
- def _graph_is_configured(self):
- self._msg_iter = self._input_port.create_message_iterator()
+ def _user_graph_is_configured(self):
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_port
+ )
src = self._graph.add_component(MySource, 'src')
sink = self._graph.add_component(MySink, 'sink')
- conn = self._graph.connect_ports(
- src.output_ports['out'], sink.input_ports['in']
- )
+ self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
with self.assertRaises(bt2._Error):
self._graph.run()
raise bt2.Stop
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_output_port('out')
self._add_output_port('zero')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_input_port('in')
- def _consume(self):
+ def _user_consume(self):
raise bt2.Stop
- def _graph_is_configured(self):
- pass
-
- def _port_connected(self, port, other_port):
+ def _user_port_connected(self, port, other_port):
self._add_input_port('taste')
def port_added_listener(component, port):
raise bt2.Stop
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_output_port('out')
self._add_output_port('zero')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_input_port('in')
- def _consume(self):
+ def _user_consume(self):
raise bt2.Stop
- def _graph_is_configured(self):
- pass
-
- def _port_connected(self, port, other_port):
+ def _user_port_connected(self, port, other_port):
self._add_input_port('taste')
with self.assertRaises(TypeError):
def test_raise_in_component_init(self):
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, params, obj):
raise ValueError('oops!')
- def _consume(self):
+ def _user_consume(self):
raise bt2.Stop
- def _graph_is_configured(self):
- pass
-
graph = bt2.Graph()
with self.assertRaises(bt2._Error):
def test_raise_in_port_added_listener(self):
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_input_port('in')
- def _consume(self):
+ def _user_consume(self):
raise bt2.Stop
- def _graph_is_configured(self):
- pass
-
def port_added_listener(component, port):
raise ValueError('oh noes!')
raise bt2.Stop
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_input_port('in')
- def _consume(self):
+ def _user_consume(self):
raise bt2.Stop
- def _graph_is_configured(self):
- pass
-
def ports_connected_listener(
upstream_component, upstream_port, downstream_component, downstream_port
):
with self.assertRaises(bt2._Error):
graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
+
+
+if __name__ == '__main__':
+ unittest.main()