+# SPDX-License-Identifier: GPL-2.0-only
#
# Copyright (C) 2019 EfficiOS Inc.
#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; only version 2
-# of the License.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
-#
import bt2
import collections.abc
g = bt2.Graph()
res_bound = None
- g.add_component(MySink, 'comp')
+ g.add_component(MySink, "comp")
# We deliberately use a different variable for returning the result than
# the variable bound to the MySink.__init__ context and delete res_bound.
self._at = 0
self._msgs = [
self._create_stream_beginning_message(
- self_output_port.user_data['stream']
+ self_output_port.user_data["stream"]
)
]
if with_packet:
- assert self_output_port.user_data['packet']
+ assert self_output_port.user_data["packet"]
self._msgs.append(
self._create_packet_beginning_message(
- self_output_port.user_data['packet']
+ self_output_port.user_data["packet"]
)
)
default_clock_snapshot = 789
if with_packet:
- assert self_output_port.user_data['packet']
- ev_parent = self_output_port.user_data['packet']
+ assert self_output_port.user_data["packet"]
+ ev_parent = self_output_port.user_data["packet"]
else:
- assert self_output_port.user_data['stream']
- ev_parent = self_output_port.user_data['stream']
+ assert self_output_port.user_data["stream"]
+ ev_parent = self_output_port.user_data["stream"]
msg = self._create_event_message(
- self_output_port.user_data['event_class'],
+ self_output_port.user_data["event_class"],
ev_parent,
default_clock_snapshot,
)
- msg.event.payload_field['giraffe'] = 1
- msg.event.specific_context_field['ant'] = -1
- msg.event.common_context_field['cpu_id'] = 1
+ msg.event.payload_field["giraffe"] = 1
+ msg.event.specific_context_field["ant"] = -1
+ msg.event.common_context_field["cpu_id"] = 1
self._msgs.append(msg)
if with_packet:
self._msgs.append(
self._create_packet_end_message(
- self_output_port.user_data['packet']
+ self_output_port.user_data["packet"]
)
)
self._msgs.append(
- self._create_stream_end_message(self_output_port.user_data['stream'])
+ self._create_stream_end_message(self_output_port.user_data["stream"])
)
_msgs = self._msgs
# event common context (stream-class-defined)
cc = tc.create_structure_field_class()
- cc += [('cpu_id', tc.create_signed_integer_field_class(8))]
+ cc += [("cpu_id", tc.create_signed_integer_field_class(8))]
# packet context (stream-class-defined)
pc = None
if with_packet:
pc = tc.create_structure_field_class()
- pc += [('something', tc.create_unsigned_integer_field_class(8))]
+ pc += [("something", tc.create_unsigned_integer_field_class(8))]
stream_class = tc.create_stream_class(
default_clock_class=clock_class,
# specific context (event-class-defined)
sc = tc.create_structure_field_class()
- sc += [('ant', tc.create_signed_integer_field_class(16))]
+ sc += [("ant", tc.create_signed_integer_field_class(16))]
# event payload
ep = tc.create_structure_field_class()
- ep += [('giraffe', tc.create_signed_integer_field_class(32))]
+ ep += [("giraffe", tc.create_signed_integer_field_class(32))]
event_class = stream_class.create_event_class(
- name='garou', specific_context_field_class=sc, payload_field_class=ep
+ name="garou", specific_context_field_class=sc, payload_field_class=ep
)
- trace = tc(environment={'patate': 12})
- stream = trace.create_stream(stream_class, user_attributes={'salut': 23})
+ trace = tc(environment={"patate": 12})
+ stream = trace.create_stream(stream_class, user_attributes={"salut": 23})
if with_packet:
packet = stream.create_packet()
- packet.context_field['something'] = 154
+ packet.context_field["something"] = 154
else:
packet = None
self._add_output_port(
- 'out',
+ "out",
{
- 'tc': tc,
- 'stream': stream,
- 'event_class': event_class,
- 'trace': trace,
- 'packet': packet,
+ "tc": tc,
+ "stream": stream,
+ "event_class": event_class,
+ "trace": trace,
+ "packet": packet,
},
)
_graph = bt2.Graph()
- _src_comp = _graph.add_component(MySrc, 'my_source')
- _msg_iter = TestOutputPortMessageIterator(_graph, _src_comp.output_ports['out'])
+ _src_comp = _graph.add_component(MySrc, "my_source")
+ _msg_iter = TestOutputPortMessageIterator(_graph, _src_comp.output_ports["out"])
const_msgs = list(_msg_iter)
def __init__(self, config, params, msg_list):
assert msg_list is not None
self._msg_list = msg_list
- self._add_input_port('in')
+ self._add_input_port("in")
def _user_graph_is_configured(self):
- self._msg_iter = self._create_input_port_message_iterator(
- self._input_ports['in']
- )
+ self._msg_iter = self._create_message_iterator(self._input_ports["in"])
def _user_consume(self):
assert self._msg_list[0] is None
def __init__(self, graph, output_port):
self._graph = graph
self._msg_list = [None]
- sink = graph.add_component(TestProxySink, 'test-proxy-sink', obj=self._msg_list)
- graph.connect_ports(output_port, sink.input_ports['in'])
+ sink = graph.add_component(TestProxySink, "test-proxy-sink", obj=self._msg_list)
+ graph.connect_ports(output_port, sink.input_ports["in"])
def __next__(self):
assert self._msg_list[0] is None
# The field is part of a dummy stream, itself part of a dummy trace created
# from trace class `tc`.
def create_const_field(tc, field_class, field_value_setter_fn):
- field_name = 'const field'
+ field_name = "const field"
class MyIter(bt2._UserMessageIterator):
def __init__(self, config, self_port_output):
class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
def __init__(self, config, params, obj):
- self._add_output_port('out', params)
+ self._add_output_port("out", params)
graph = bt2.Graph()
- src_comp = graph.add_component(MySrc, 'my_source', None)
- msg_iter = TestOutputPortMessageIterator(graph, src_comp.output_ports['out'])
+ src_comp = graph.add_component(MySrc, "my_source", None)
+ msg_iter = TestOutputPortMessageIterator(graph, src_comp.output_ports["out"])
# Ignore first message, stream beginning
_ = next(msg_iter)
packet_beg_msg = next(msg_iter)
return packet_beg_msg.packet.context_field[field_name]
+
+
+# Run `msg_iter_next_func` in a bt2._UserMessageIterator.__next__ context.
+#
+# For convenience, a trace and a stream are created. To allow the caller to
+# customize the created stream class, the `create_stream_class_func` callback
+# is invoked during the component initialization. It gets passed a trace class
+# and a clock class, and must return a stream class.
+#
+# The `msg_iter_next_func` callback receives two arguments, the message iterator
+# and the created stream.
+#
+# The value returned by `msg_iter_next_func` is returned by this function.
+def run_in_message_iterator_next(create_stream_class_func, msg_iter_next_func):
+ class MyIter(bt2._UserMessageIterator):
+ def __init__(self, config, port):
+ tc, sc = port.user_data
+ trace = tc()
+ self._stream = trace.create_stream(sc)
+
+ def __next__(self):
+ nonlocal res_bound
+ res_bound = msg_iter_next_func(self, self._stream)
+ raise bt2.Stop
+
+ class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ def __init__(self, config, params, obj):
+ tc = self._create_trace_class()
+ cc = self._create_clock_class()
+ sc = create_stream_class_func(tc, cc)
+
+ self._add_output_port("out", (tc, sc))
+
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, config, params, obj):
+ self._input_port = self._add_input_port("in")
+
+ def _user_graph_is_configured(self):
+ self._input_iter = self._create_message_iterator(self._input_port)
+
+ def _user_consume(self):
+ next(self._input_iter)
+
+ graph = bt2.Graph()
+ res_bound = None
+ src = graph.add_component(MySrc, "ze source")
+ snk = graph.add_component(MySink, "ze sink")
+ graph.connect_ports(src.output_ports["out"], snk.input_ports["in"])
+ graph.run()
+
+ # We deliberately use a different variable for returning the result than
+ # the variable bound to the MyIter.__next__ context. See the big comment
+ # about that in `run_in_component_init`.
+
+ res = res_bound
+ del res_bound
+ return res