# We deliberately use a different variable for returning the result than
# the variable bound to the MySink.__init__ context and delete res_bound.
# We deliberately use a different variable for returning the result than
# the variable bound to the MySink.__init__ context and delete res_bound.
- msg.event.payload_field['giraffe'] = 1
- msg.event.specific_context_field['ant'] = -1
- msg.event.common_context_field['cpu_id'] = 1
+ msg.event.payload_field["giraffe"] = 1
+ msg.event.specific_context_field["ant"] = -1
+ msg.event.common_context_field["cpu_id"] = 1
stream_class = tc.create_stream_class(
default_clock_class=clock_class,
stream_class = tc.create_stream_class(
default_clock_class=clock_class,
- trace = tc(environment={'patate': 12})
- stream = trace.create_stream(stream_class, user_attributes={'salut': 23})
+ trace = tc(environment={"patate": 12})
+ stream = trace.create_stream(stream_class, user_attributes={"salut": 23})
- _src_comp = _graph.add_component(MySrc, 'my_source')
- _msg_iter = TestOutputPortMessageIterator(_graph, _src_comp.output_ports['out'])
+ _src_comp = _graph.add_component(MySrc, "my_source")
+ _msg_iter = TestOutputPortMessageIterator(_graph, _src_comp.output_ports["out"])
def __init__(self, config, params, msg_list):
assert msg_list is not None
self._msg_list = msg_list
def __init__(self, config, params, msg_list):
assert msg_list is not None
self._msg_list = msg_list
- sink = graph.add_component(TestProxySink, 'test-proxy-sink', obj=self._msg_list)
- graph.connect_ports(output_port, sink.input_ports['in'])
+ sink = graph.add_component(TestProxySink, "test-proxy-sink", obj=self._msg_list)
+ graph.connect_ports(output_port, sink.input_ports["in"])
# The field is part of a dummy stream, itself part of a dummy trace created
# from trace class `tc`.
def create_const_field(tc, field_class, field_value_setter_fn):
# The field is part of a dummy stream, itself part of a dummy trace created
# from trace class `tc`.
def create_const_field(tc, field_class, field_value_setter_fn):
class MyIter(bt2._UserMessageIterator):
def __init__(self, config, self_port_output):
class MyIter(bt2._UserMessageIterator):
def __init__(self, config, self_port_output):
class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
def __init__(self, config, params, obj):
class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
def __init__(self, config, params, obj):
- src_comp = graph.add_component(MySrc, 'my_source', None)
- msg_iter = TestOutputPortMessageIterator(graph, src_comp.output_ports['out'])
+ src_comp = graph.add_component(MySrc, "my_source", None)
+ msg_iter = TestOutputPortMessageIterator(graph, src_comp.output_ports["out"])
cc = self._create_clock_class()
sc = create_stream_class_func(tc, cc)
cc = self._create_clock_class()
sc = create_stream_class_func(tc, cc)
class MySink(bt2._UserSinkComponent):
def __init__(self, config, params, obj):
class MySink(bt2._UserSinkComponent):
def __init__(self, config, params, obj):
def _user_graph_is_configured(self):
self._input_iter = self._create_message_iterator(self._input_port)
def _user_graph_is_configured(self):
self._input_iter = self._create_message_iterator(self._input_port)
- src = graph.add_component(MySrc, 'ze source')
- snk = graph.add_component(MySink, 'ze sink')
- graph.connect_ports(src.output_ports['out'], snk.input_ports['in'])
+ src = graph.add_component(MySrc, "ze source")
+ snk = graph.add_component(MySink, "ze sink")
+ graph.connect_ports(src.output_ports["out"], snk.input_ports["in"])