packet_beg_msg = next(msg_iter)
return packet_beg_msg.packet.context_field[field_name]
+
+
+# Run `msg_iter_next_func` in a bt2._UserMessageIterator.__next__ context.
+#
+# For convenience, a trace and a stream are created. To allow the caller to
+# customize the created stream class, the `create_stream_class_func` callback
+# is invoked during the component initialization. It gets passed a trace class
+# and a clock class, and must return a stream class.
+#
+# The `msg_iter_next_func` callback receives two arguments, the message iterator
+# and the created stream.
+#
+# The value returned by `msg_iter_next_func` is returned by this function.
+def run_in_message_iterator_next(create_stream_class_func, msg_iter_next_func):
+ class MyIter(bt2._UserMessageIterator):
+ def __init__(self, config, port):
+ tc, sc = port.user_data
+ trace = tc()
+ self._stream = trace.create_stream(sc)
+
+ def __next__(self):
+ nonlocal res_bound
+ res_bound = msg_iter_next_func(self, self._stream)
+ raise bt2.Stop
+
+ class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ def __init__(self, config, params, obj):
+ tc = self._create_trace_class()
+ cc = self._create_clock_class()
+ sc = create_stream_class_func(tc, cc)
+
+ self._add_output_port('out', (tc, sc))
+
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, config, params, obj):
+ self._input_port = self._add_input_port('in')
+
+ def _user_graph_is_configured(self):
+ self._input_iter = self._create_message_iterator(self._input_port)
+
+ def _user_consume(self):
+ next(self._input_iter)
+
+ graph = bt2.Graph()
+ res_bound = None
+ src = graph.add_component(MySrc, 'ze source')
+ snk = graph.add_component(MySink, 'ze sink')
+ graph.connect_ports(src.output_ports['out'], snk.input_ports['in'])
+ graph.run()
+
+ # We deliberately use a different variable for returning the result than
+ # the variable bound to the MyIter.__next__ context. See the big comment
+ # about that in `run_in_component_init`.
+
+ res = res_bound
+ del res_bound
+ return res