class _MyIter(bt2._UserMessageIterator):
- def __init__(self, self_output_port):
+ def __init__(self, config, self_output_port):
self._build_meta()
self._at = 0
comp_params = None
class MySink(bt2._UserSinkComponent):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
nonlocal comp_params
comp_params = params
comp_obj = None
class MySink(bt2._UserSinkComponent):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
nonlocal comp_obj
comp_obj = obj
comp_obj = None
class MySink(bt2._UserSinkComponent):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
nonlocal comp_obj
comp_obj = obj
raise bt2.Stop
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
self._add_input_port('in')
def _user_consume(self):
raise bt2.Stop
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
self._add_input_port('in')
def _user_consume(self):
raise TypeError
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
self._add_input_port('in')
def _user_consume(self):
return self._create_stream_beginning_message(self._stream)
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
self._add_input_port('in')
def _user_consume(self):
return msg
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
self._input_port = self._add_input_port('in')
self._at = 0
pass
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
self._input_port = self._add_input_port('in')
def _user_consume(comp_self):
pass
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
self._input_port = self._add_input_port('in')
def _user_consume(comp_self):
return msg
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
self._input_port = self._add_input_port('in')
self._at = 0
return msg
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
self._input_port = self._add_input_port('in')
self._at = 0
raise bt2.Stop
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
self._add_output_port('out')
self._add_output_port('zero')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
self._add_input_port('in')
def _user_consume(self):
raise bt2.Stop
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
self._add_output_port('out')
self._add_output_port('zero')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
self._add_input_port('in')
def _user_consume(self):
def test_raise_in_component_init(self):
class MySink(bt2._UserSinkComponent):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
raise ValueError('oops!')
def _user_consume(self):
def test_raise_in_port_added_listener(self):
class MySink(bt2._UserSinkComponent):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
self._add_input_port('in')
def _user_consume(self):
raise bt2.Stop
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
self._add_input_port('in')
def _user_consume(self):
with self.assertRaises(bt2._Error):
graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
+
+
+if __name__ == '__main__':
+ unittest.main()