return graph.add_component(comp_cls, name)
def test_src_add_output_port(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ class MySource(
+ bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
port = comp_self._add_output_port('out')
self.assertEqual(port.name, 'out')
self.assertIs(type(comp.output_ports['out']), bt2_port._OutputPortConst)
def test_flt_add_output_port(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyIter):
+ class MyFilter(
+ bt2._UserFilterComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
port = comp_self._add_output_port('out')
self.assertEqual(port.name, 'out')
self.assertEqual(len(comp.output_ports), 1)
def test_flt_add_input_port(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyIter):
+ class MyFilter(
+ bt2._UserFilterComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
port = comp_self._add_input_port('in')
self.assertEqual(port.name, 'in')
self.assertEqual(len(comp.input_ports), 1)
def test_user_src_output_ports_getitem(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ class MySource(
+ bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
port1 = comp_self._add_output_port('clear')
port2 = comp_self._add_output_port('print')
self._create_comp(MySource)
def test_user_flt_output_ports_getitem(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyIter):
+ class MyFilter(
+ bt2._UserFilterComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
port1 = comp_self._add_output_port('clear')
port2 = comp_self._add_output_port('print')
self._create_comp(MyFilter)
def test_user_flt_input_ports_getitem(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyIter):
+ class MyFilter(
+ bt2._UserFilterComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
port1 = comp_self._add_input_port('clear')
port2 = comp_self._add_input_port('print')
self._create_comp(MySink)
def test_user_src_output_ports_getitem_invalid_key(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ class MySource(
+ bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
comp_self._add_output_port('clear')
comp_self._add_output_port('print')
self._create_comp(MySource)
def test_user_flt_output_ports_getitem_invalid_key(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyIter):
+ class MyFilter(
+ bt2._UserFilterComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
comp_self._add_output_port('clear')
comp_self._add_output_port('print')
self._create_comp(MyFilter)
def test_user_flt_input_ports_getitem_invalid_key(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyIter):
+ class MyFilter(
+ bt2._UserFilterComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
comp_self._add_input_port('clear')
comp_self._add_input_port('print')
self._create_comp(MySink)
def test_user_src_output_ports_len(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ class MySource(
+ bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
comp_self._add_output_port('clear')
comp_self._add_output_port('print')
self._create_comp(MySource)
def test_user_flt_output_ports_len(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyIter):
+ class MyFilter(
+ bt2._UserFilterComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
comp_self._add_output_port('clear')
comp_self._add_output_port('print')
self._create_comp(MyFilter)
def test_user_flt_input_ports_len(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyIter):
+ class MyFilter(
+ bt2._UserFilterComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
comp_self._add_input_port('clear')
comp_self._add_input_port('print')
self._create_comp(MySink)
def test_user_src_output_ports_iter(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ class MySource(
+ bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
port1 = comp_self._add_output_port('clear')
port2 = comp_self._add_output_port('print')
self._create_comp(MySource)
def test_user_flt_output_ports_iter(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyIter):
+ class MyFilter(
+ bt2._UserFilterComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
port1 = comp_self._add_output_port('clear')
port2 = comp_self._add_output_port('print')
self._create_comp(MyFilter)
def test_user_flt_input_ports_iter(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyIter):
+ class MyFilter(
+ bt2._UserFilterComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
port1 = comp_self._add_input_port('clear')
port2 = comp_self._add_input_port('print')
self._create_comp(MySink)
def test_gen_src_output_ports_getitem(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
port1 = None
port2 = None
port3 = None
- class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ class MySource(
+ bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
nonlocal port1, port2, port3
port1 = comp_self._add_output_port('clear')
del port3
def test_gen_flt_output_ports_getitem(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
port1 = None
port2 = None
port3 = None
- class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyIter):
+ class MyFilter(
+ bt2._UserFilterComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
nonlocal port1, port2, port3
port1 = comp_self._add_output_port('clear')
del port3
def test_gen_flt_input_ports_getitem(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
port1 = None
port2 = None
port3 = None
- class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyIter):
+ class MyFilter(
+ bt2._UserFilterComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
nonlocal port1, port2, port3
port1 = comp_self._add_input_port('clear')
del port3
def test_gen_src_output_ports_getitem_invalid_key(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ class MySource(
+ bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
comp_self._add_output_port('clear')
comp_self._add_output_port('print')
comp.output_ports['hello']
def test_gen_flt_output_ports_getitem_invalid_key(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyIter):
+ class MyFilter(
+ bt2._UserFilterComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
comp_self._add_output_port('clear')
comp_self._add_output_port('print')
comp.output_ports['hello']
def test_gen_flt_input_ports_getitem_invalid_key(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyIter):
+ class MyFilter(
+ bt2._UserFilterComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
comp_self._add_input_port('clear')
comp_self._add_input_port('print')
comp.input_ports['hello']
def test_gen_src_output_ports_len(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ class MySource(
+ bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
comp_self._add_output_port('clear')
comp_self._add_output_port('print')
self.assertEqual(len(comp.output_ports), 3)
def test_gen_flt_output_ports_len(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyIter):
+ class MyFilter(
+ bt2._UserFilterComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
comp_self._add_output_port('clear')
comp_self._add_output_port('print')
self.assertEqual(len(comp.output_ports), 3)
def test_gen_flt_input_ports_len(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyIter):
+ class MyFilter(
+ bt2._UserFilterComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
comp_self._add_input_port('clear')
comp_self._add_input_port('print')
self.assertEqual(len(comp.input_ports), 3)
def test_gen_src_output_ports_iter(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
port1 = None
port2 = None
port3 = None
- class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ class MySource(
+ bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
nonlocal port1, port2, port3
port1 = comp_self._add_output_port('clear')
del port3
def test_gen_flt_output_ports_iter(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
port1 = None
port2 = None
port3 = None
- class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyIter):
+ class MyFilter(
+ bt2._UserFilterComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
nonlocal port1, port2, port3
port1 = comp_self._add_output_port('clear')
del port3
def test_gen_flt_input_ports_iter(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
port1 = None
port2 = None
port3 = None
- class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyIter):
+ class MyFilter(
+ bt2._UserFilterComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
nonlocal port1, port2, port3
port1 = comp_self._add_input_port('clear')
self._create_comp(MySink)
def test_source_self_port_user_data(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MySource(bt2._UserFilterComponent, message_iterator_class=MyIter):
+ class MySource(
+ bt2._UserFilterComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
nonlocal user_datas
self.assertEqual(user_datas, [None, 2])
def test_filter_self_port_user_data(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyIter):
+ class MyFilter(
+ bt2._UserFilterComponent, message_iterator_class=bt2._UserMessageIterator
+ ):
def __init__(comp_self, config, params, obj):
nonlocal user_datas