lib: remove output port message iterator
[babeltrace.git] / tests / bindings / python / bt2 / utils.py
1 #
2 # Copyright (C) 2019 EfficiOS Inc.
3 #
4 # This program is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License
6 # as published by the Free Software Foundation; only version 2
7 # of the License.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 #
18
19 import bt2
20 import collections.abc
21
22 # Run callable `func` in the context of a component's __init__ method. The
23 # callable is passed the Component being instantiated.
24 #
25 # The value returned by the callable is returned by run_in_component_init.
26 def run_in_component_init(func):
27 class MySink(bt2._UserSinkComponent):
28 def __init__(self, params, obj):
29 nonlocal res_bound
30 res_bound = func(self)
31
32 def _user_consume(self):
33 pass
34
35 g = bt2.Graph()
36 res_bound = None
37 g.add_component(MySink, 'comp')
38
39 # We deliberately use a different variable for returning the result than
40 # the variable bound to the MySink.__init__ context and delete res_bound.
41 # The MySink.__init__ context stays alive until the end of the program, so
42 # if res_bound were to still point to our result, it would contribute an
43 # unexpected reference to the refcount of the result, from the point of view
44 # of the user of this function. It would then affect destruction tests,
45 # for example, which want to test what happens when the refcount of a Python
46 # object reaches 0.
47
48 res = res_bound
49 del res_bound
50 return res
51
52
53 # Create an empty trace class with default values.
54 def get_default_trace_class():
55 def f(comp_self):
56 return comp_self._create_trace_class()
57
58 return run_in_component_init(f)
59
60
61 # Proxy sink component class.
62 #
63 # This sink accepts a list of a single item as its initialization
64 # object. This sink creates a single input port `in`. When it consumes
65 # from this port, it puts the returned message in the initialization
66 # list as the first item.
67 class TestProxySink(bt2._UserSinkComponent):
68 def __init__(self, params, msg_list):
69 assert msg_list is not None
70 self._msg_list = msg_list
71 self._add_input_port('in')
72
73 def _user_graph_is_configured(self):
74 self._msg_iter = self._create_input_port_message_iterator(
75 self._input_ports['in']
76 )
77
78 def _user_consume(self):
79 assert self._msg_list[0] is None
80 self._msg_list[0] = next(self._msg_iter)
81
82
83 # This is a helper message iterator for tests.
84 #
85 # The constructor accepts a graph and an output port.
86 #
87 # Internally, it adds a proxy sink to the graph and connects the
88 # received output port to the proxy sink's input port. Its __next__()
89 # method then uses the proxy sink to transfer the consumed message to
90 # the output port message iterator's user.
91 #
92 # This message iterator cannot seek.
93 class TestOutputPortMessageIterator(collections.abc.Iterator):
94 def __init__(self, graph, output_port):
95 self._graph = graph
96 self._msg_list = [None]
97 sink = graph.add_component(TestProxySink, 'test-proxy-sink', obj=self._msg_list)
98 graph.connect_ports(output_port, sink.input_ports['in'])
99
100 def __next__(self):
101 assert self._msg_list[0] is None
102 self._graph.run_once()
103 msg = self._msg_list[0]
104 assert msg is not None
105 self._msg_list[0] = None
106 return msg
This page took 0.030842 seconds and 4 git commands to generate.