+
+
+# Create a const field of the given field class.
+#
+# The field is part of a dummy stream, itself part of a dummy trace created
+# from trace class `tc`.
+def create_const_field(tc, field_class, field_value_setter_fn):
+ field_name = "const field"
+
+ class MyIter(bt2._UserMessageIterator):
+ def __init__(self, config, self_port_output):
+ nonlocal field_class
+ nonlocal field_value_setter_fn
+ trace = tc()
+ packet_context_fc = tc.create_structure_field_class()
+ packet_context_fc.append_member(field_name, field_class)
+ sc = tc.create_stream_class(
+ packet_context_field_class=packet_context_fc, supports_packets=True
+ )
+ stream = trace.create_stream(sc)
+ packet = stream.create_packet()
+
+ field_value_setter_fn(packet.context_field[field_name])
+
+ self._msgs = [
+ self._create_stream_beginning_message(stream),
+ self._create_packet_beginning_message(packet),
+ ]
+
+ def __next__(self):
+ if len(self._msgs) == 0:
+ raise StopIteration
+
+ return self._msgs.pop(0)
+
+ class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ def __init__(self, config, params, obj):
+ self._add_output_port("out", params)
+
+ graph = bt2.Graph()
+ src_comp = graph.add_component(MySrc, "my_source", None)
+ msg_iter = TestOutputPortMessageIterator(graph, src_comp.output_ports["out"])
+
+ # Ignore first message, stream beginning
+ _ = next(msg_iter)
+ packet_beg_msg = next(msg_iter)
+
+ return packet_beg_msg.packet.context_field[field_name]
+
+
+# Run `msg_iter_next_func` in a bt2._UserMessageIterator.__next__ context.
+#
+# For convenience, a trace and a stream are created. To allow the caller to
+# customize the created stream class, the `create_stream_class_func` callback
+# is invoked during the component initialization. It gets passed a trace class
+# and a clock class, and must return a stream class.
+#
+# The `msg_iter_next_func` callback receives two arguments, the message iterator
+# and the created stream.
+#
+# The value returned by `msg_iter_next_func` is returned by this function.
+def run_in_message_iterator_next(create_stream_class_func, msg_iter_next_func):
+ class MyIter(bt2._UserMessageIterator):
+ def __init__(self, config, port):
+ tc, sc = port.user_data
+ trace = tc()
+ self._stream = trace.create_stream(sc)
+
+ def __next__(self):
+ nonlocal res_bound
+ res_bound = msg_iter_next_func(self, self._stream)
+ raise bt2.Stop
+
+ class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ def __init__(self, config, params, obj):
+ tc = self._create_trace_class()
+ cc = self._create_clock_class()
+ sc = create_stream_class_func(tc, cc)
+
+ self._add_output_port("out", (tc, sc))
+
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, config, params, obj):
+ self._input_port = self._add_input_port("in")
+
+ def _user_graph_is_configured(self):
+ self._input_iter = self._create_message_iterator(self._input_port)
+
+ def _user_consume(self):
+ next(self._input_iter)
+
+ graph = bt2.Graph()
+ res_bound = None
+ src = graph.add_component(MySrc, "ze source")
+ snk = graph.add_component(MySink, "ze sink")
+ graph.connect_ports(src.output_ports["out"], snk.input_ports["in"])
+ graph.run()
+
+ # We deliberately use a different variable for returning the result than
+ # the variable bound to the MyIter.__next__ context. See the big comment
+ # about that in `run_in_component_init`.
+
+ res = res_bound
+ del res_bound
+ return res