tests: add tests for discarded events/packets creation
[babeltrace.git] / tests / bindings / python / bt2 / utils.py
index 8602142072a7884e5acb77c627d54a590330c516..a15e777b5c7f766da010cd8cf4835f88e25cead5 100644 (file)
@@ -1,20 +1,7 @@
+# SPDX-License-Identifier: GPL-2.0-only
 #
 # Copyright (C) 2019 EfficiOS Inc.
 #
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; only version 2
-# of the License.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
-#
 
 import bt2
 import collections.abc
@@ -257,9 +244,7 @@ class TestProxySink(bt2._UserSinkComponent):
         self._add_input_port('in')
 
     def _user_graph_is_configured(self):
-        self._msg_iter = self._create_input_port_message_iterator(
-            self._input_ports['in']
-        )
+        self._msg_iter = self._create_message_iterator(self._input_ports['in'])
 
     def _user_consume(self):
         assert self._msg_list[0] is None
@@ -338,3 +323,60 @@ def create_const_field(tc, field_class, field_value_setter_fn):
     packet_beg_msg = next(msg_iter)
 
     return packet_beg_msg.packet.context_field[field_name]
+
+
+# Run `msg_iter_next_func` in a bt2._UserMessageIterator.__next__ context.
+#
+# For convenience, a trace and a stream are created.  To allow the caller to
+# customize the created stream class, the `create_stream_class_func` callback
+# is invoked during the component initialization.  It gets passed a trace class
+# and a clock class, and must return a stream class.
+#
+# The `msg_iter_next_func` callback receives two arguments, the message iterator
+# and the created stream.
+#
+# The value returned by `msg_iter_next_func` is returned by this function.
+def run_in_message_iterator_next(create_stream_class_func, msg_iter_next_func):
+    class MyIter(bt2._UserMessageIterator):
+        def __init__(self, config, port):
+            tc, sc = port.user_data
+            trace = tc()
+            self._stream = trace.create_stream(sc)
+
+        def __next__(self):
+            nonlocal res_bound
+            res_bound = msg_iter_next_func(self, self._stream)
+            raise bt2.Stop
+
+    class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
+        def __init__(self, config, params, obj):
+            tc = self._create_trace_class()
+            cc = self._create_clock_class()
+            sc = create_stream_class_func(tc, cc)
+
+            self._add_output_port('out', (tc, sc))
+
+    class MySink(bt2._UserSinkComponent):
+        def __init__(self, config, params, obj):
+            self._input_port = self._add_input_port('in')
+
+        def _user_graph_is_configured(self):
+            self._input_iter = self._create_message_iterator(self._input_port)
+
+        def _user_consume(self):
+            next(self._input_iter)
+
+    graph = bt2.Graph()
+    res_bound = None
+    src = graph.add_component(MySrc, 'ze source')
+    snk = graph.add_component(MySink, 'ze sink')
+    graph.connect_ports(src.output_ports['out'], snk.input_ports['in'])
+    graph.run()
+
+    # We deliberately use a different variable for returning the result than
+    # the variable bound to the MyIter.__next__ context.  See the big comment
+    # about that in `run_in_component_init`.
+
+    res = res_bound
+    del res_bound
+    return res
This page took 0.024044 seconds and 4 git commands to generate.