comp_params = None
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, params, obj):
nonlocal comp_params
comp_params = params
self.assertEqual(params, comp_params)
del comp_params
+ def test_add_component_obj_python_comp_cls(self):
+ comp_obj = None
+
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params, obj):
+ nonlocal comp_obj
+ comp_obj = obj
+
+ def _user_consume(self):
+ pass
+
+ obj = object()
+ comp = self._graph.add_component(MySink, 'salut', obj=obj)
+ self.assertIs(comp_obj, obj)
+ del comp_obj
+
+ def test_add_component_obj_none_python_comp_cls(self):
+ comp_obj = None
+
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params, obj):
+ nonlocal comp_obj
+ comp_obj = obj
+
+ def _user_consume(self):
+ pass
+
+ comp = self._graph.add_component(MySink, 'salut')
+ self.assertIsNone(comp_obj)
+ del comp_obj
+
+ def test_add_component_obj_non_python_comp_cls(self):
+ comp_obj = None
+
+ plugin = bt2.find_plugin('text', find_in_user_dir=False, find_in_sys_dir=False)
+ assert plugin is not None
+ cc = plugin.source_component_classes['dmesg']
+ assert cc is not None
+
+ with self.assertRaises(ValueError):
+ comp = self._graph.add_component(cc, 'salut', obj=57)
+
def test_add_component_invalid_cls_type(self):
with self.assertRaises(TypeError):
self._graph.add_component(int, 'salut')
raise bt2.Stop
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_input_port('in')
def _user_consume(self):
raise bt2.Stop
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_input_port('in')
def _user_consume(self):
raise TypeError
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_input_port('in')
def _user_consume(self):
return self._create_stream_beginning_message(self._stream)
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_input_port('in')
def _user_consume(self):
return msg
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._input_port = self._add_input_port('in')
self._at = 0
)
self._graph.run()
+ def test_run_once(self):
+ class MyIter(_MyIter):
+ pass
+
+ class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ def __init__(self, params, obj):
+ self._add_output_port('out')
+
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params, obj):
+ self._input_port = self._add_input_port('in')
+
+ def _user_consume(comp_self):
+ nonlocal run_count
+ run_count += 1
+ raise bt2.TryAgain
+
+ run_count = 0
+ src = self._graph.add_component(MySource, 'src')
+ sink = self._graph.add_component(MySink, 'sink')
+ conn = self._graph.connect_ports(
+ src.output_ports['out'], sink.input_ports['in']
+ )
+
+ with self.assertRaises(bt2.TryAgain):
+ self._graph.run_once()
+
+ self.assertEqual(run_count, 1)
+
+ def test_run_once_stops(self):
+ class MyIter(_MyIter):
+ pass
+
+ class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ def __init__(self, params, obj):
+ self._add_output_port('out')
+
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params, obj):
+ self._input_port = self._add_input_port('in')
+
+ def _user_consume(comp_self):
+ raise bt2.Stop
+
+ src = self._graph.add_component(MySource, 'src')
+ sink = self._graph.add_component(MySink, 'sink')
+ conn = self._graph.connect_ports(
+ src.output_ports['out'], sink.input_ports['in']
+ )
+
+ with self.assertRaises(bt2.Stop):
+ self._graph.run_once()
+
def test_run_again(self):
class MyIter(_MyIter):
def __next__(self):
return msg
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._input_port = self._add_input_port('in')
self._at = 0
return msg
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._input_port = self._add_input_port('in')
self._at = 0
raise bt2.Stop
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_output_port('out')
self._add_output_port('zero')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_input_port('in')
def _user_consume(self):
raise bt2.Stop
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_output_port('out')
self._add_output_port('zero')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_input_port('in')
def _user_consume(self):
def test_raise_in_component_init(self):
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, params, obj):
raise ValueError('oops!')
def _user_consume(self):
def test_raise_in_port_added_listener(self):
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_input_port('in')
def _user_consume(self):
raise bt2.Stop
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_input_port('in')
def _user_consume(self):