tests: add tests for discarded events/packets creation
[babeltrace.git] / tests / bindings / python / bt2 / utils.py
index 8d2d08fbf9c5f8e0a474ac68b78d854e0ce605bf..a15e777b5c7f766da010cd8cf4835f88e25cead5 100644 (file)
@@ -323,3 +323,60 @@ def create_const_field(tc, field_class, field_value_setter_fn):
     packet_beg_msg = next(msg_iter)
 
     return packet_beg_msg.packet.context_field[field_name]
+
+
+# Run `msg_iter_next_func` in a bt2._UserMessageIterator.__next__ context.
+#
+# For convenience, a trace and a stream are created.  To allow the caller to
+# customize the created stream class, the `create_stream_class_func` callback
+# is invoked during the component initialization.  It gets passed a trace class
+# and a clock class, and must return a stream class.
+#
+# The `msg_iter_next_func` callback receives two arguments, the message iterator
+# and the created stream.
+#
+# The value returned by `msg_iter_next_func` is returned by this function.
+def run_in_message_iterator_next(create_stream_class_func, msg_iter_next_func):
+    class MyIter(bt2._UserMessageIterator):
+        def __init__(self, config, port):
+            tc, sc = port.user_data
+            trace = tc()
+            self._stream = trace.create_stream(sc)
+
+        def __next__(self):
+            nonlocal res_bound
+            res_bound = msg_iter_next_func(self, self._stream)
+            raise bt2.Stop
+
+    class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
+        def __init__(self, config, params, obj):
+            tc = self._create_trace_class()
+            cc = self._create_clock_class()
+            sc = create_stream_class_func(tc, cc)
+
+            self._add_output_port('out', (tc, sc))
+
+    class MySink(bt2._UserSinkComponent):
+        def __init__(self, config, params, obj):
+            self._input_port = self._add_input_port('in')
+
+        def _user_graph_is_configured(self):
+            self._input_iter = self._create_message_iterator(self._input_port)
+
+        def _user_consume(self):
+            next(self._input_iter)
+
+    graph = bt2.Graph()
+    res_bound = None
+    src = graph.add_component(MySrc, 'ze source')
+    snk = graph.add_component(MySink, 'ze sink')
+    graph.connect_ports(src.output_ports['out'], snk.input_ports['in'])
+    graph.run()
+
+    # We deliberately use a different variable for returning the result than
+    # the variable bound to the MyIter.__next__ context.  See the big comment
+    # about that in `run_in_component_init`.
+
+    res = res_bound
+    del res_bound
+    return res
This page took 0.032956 seconds and 4 git commands to generate.