@staticmethod
def _create_graph(src_comp_cls, flt_comp_cls=None):
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_input_port('in')
def _user_consume(self):
the_output_port_from_iter = self_port_output
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
nonlocal the_output_port_from_source
the_output_port_from_source = self._add_output_port('out', 'user data')
src_iter_initialized = True
class MySource(bt2._UserSourceComponent, message_iterator_class=MySourceIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_output_port('out')
class MyFilterIter(bt2._UserMessageIterator):
return next(self._up_iter)
class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_input_port('in')
self._add_output_port('out')
finalized = True
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_output_port('out')
finalized = False
salut = self._component._salut
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_output_port('out')
self._salut = 23
addr = self.addr
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_output_port('out')
addr = None
return self._msgs.pop(0)
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
tc = self._create_trace_class()
sc = tc.create_stream_class(supports_packets=True)
ec = sc.create_event_class()
raise StopIteration
class MySource(bt2._UserSourceComponent, message_iterator_class=MySourceIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
tc = self._create_trace_class()
sc = tc.create_stream_class(supports_packets=True)
ec = sc.create_event_class()
return self._upstream_iter.can_seek_beginning
class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
input_port = self._add_input_port('in')
self._add_output_port('out', input_port)
raise bt2.TryAgain
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_output_port('out')
graph = bt2.Graph()
return msg
class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params):
+ def __init__(self, params, obj):
self._add_output_port('out')
trace_class = self._create_trace_class()