fix: test_message_iterator.py hangs on Python 3.12
[babeltrace.git] / tests / bindings / python / bt2 / utils.py
CommitLineData
32d2d479
MJ
1#
2# Copyright (C) 2019 EfficiOS Inc.
3#
4# This program is free software; you can redistribute it and/or
5# modify it under the terms of the GNU General Public License
6# as published by the Free Software Foundation; only version 2
7# of the License.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License
15# along with this program; if not, write to the Free Software
16# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17#
18
0bee8ea9 19import bt2
fac7b25a 20import collections.abc
0bee8ea9 21
886890ce 22
0bee8ea9
SM
23# Run callable `func` in the context of a component's __init__ method. The
24# callable is passed the Component being instantiated.
25#
26# The value returned by the callable is returned by run_in_component_init.
0bee8ea9
SM
27def run_in_component_init(func):
28 class MySink(bt2._UserSinkComponent):
e3250e61 29 def __init__(self, config, params, obj):
0bee8ea9
SM
30 nonlocal res_bound
31 res_bound = func(self)
32
819d0ae7 33 def _user_consume(self):
8a08af82
SM
34 pass
35
0bee8ea9
SM
36 g = bt2.Graph()
37 res_bound = None
38 g.add_component(MySink, 'comp')
39
40 # We deliberately use a different variable for returning the result than
41 # the variable bound to the MySink.__init__ context and delete res_bound.
42 # The MySink.__init__ context stays alive until the end of the program, so
43 # if res_bound were to still point to our result, it would contribute an
44 # unexpected reference to the refcount of the result, from the point of view
45 # of the user of this function. It would then affect destruction tests,
46 # for example, which want to test what happens when the refcount of a Python
47 # object reaches 0.
48
49 res = res_bound
50 del res_bound
51 return res
52
61d96b89 53
0bee8ea9 54# Create an empty trace class with default values.
0bee8ea9
SM
55def get_default_trace_class():
56 def f(comp_self):
57 return comp_self._create_trace_class()
58
59 return run_in_component_init(f)
fac7b25a
PP
60
61
9cbe0c59
FD
62# Create a pair of list, one containing non-const messages and the other
63# containing const messages
64def _get_all_message_types(with_packet=True):
65 _msgs = None
66
67 class MyIter(bt2._UserMessageIterator):
9415de1c 68 def __init__(self, config, self_output_port):
9cbe0c59
FD
69
70 nonlocal _msgs
71 self._at = 0
72 self._msgs = [
73 self._create_stream_beginning_message(
74 self_output_port.user_data['stream']
75 )
76 ]
77
78 if with_packet:
79 assert self_output_port.user_data['packet']
80 self._msgs.append(
81 self._create_packet_beginning_message(
82 self_output_port.user_data['packet']
83 )
84 )
85
86 default_clock_snapshot = 789
87
88 if with_packet:
89 assert self_output_port.user_data['packet']
90 ev_parent = self_output_port.user_data['packet']
91 else:
92 assert self_output_port.user_data['stream']
93 ev_parent = self_output_port.user_data['stream']
94
95 msg = self._create_event_message(
96 self_output_port.user_data['event_class'],
97 ev_parent,
98 default_clock_snapshot,
99 )
100
101 msg.event.payload_field['giraffe'] = 1
102 msg.event.specific_context_field['ant'] = -1
103 msg.event.common_context_field['cpu_id'] = 1
104 self._msgs.append(msg)
105
106 if with_packet:
107 self._msgs.append(
108 self._create_packet_end_message(
109 self_output_port.user_data['packet']
110 )
111 )
112
113 self._msgs.append(
114 self._create_stream_end_message(self_output_port.user_data['stream'])
115 )
116
117 _msgs = self._msgs
118
119 def __next__(self):
120 if self._at == len(self._msgs):
121 raise bt2.Stop
122
123 msg = self._msgs[self._at]
124 self._at += 1
125 return msg
126
127 class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
e3250e61 128 def __init__(self, config, params, obj):
9cbe0c59
FD
129 tc = self._create_trace_class()
130 clock_class = self._create_clock_class(frequency=1000)
131
132 # event common context (stream-class-defined)
133 cc = tc.create_structure_field_class()
134 cc += [('cpu_id', tc.create_signed_integer_field_class(8))]
135
136 # packet context (stream-class-defined)
137 pc = None
138
139 if with_packet:
140 pc = tc.create_structure_field_class()
141 pc += [('something', tc.create_unsigned_integer_field_class(8))]
142
143 stream_class = tc.create_stream_class(
144 default_clock_class=clock_class,
145 event_common_context_field_class=cc,
146 packet_context_field_class=pc,
147 supports_packets=with_packet,
148 )
149
150 # specific context (event-class-defined)
151 sc = tc.create_structure_field_class()
152 sc += [('ant', tc.create_signed_integer_field_class(16))]
153
154 # event payload
155 ep = tc.create_structure_field_class()
156 ep += [('giraffe', tc.create_signed_integer_field_class(32))]
157
158 event_class = stream_class.create_event_class(
159 name='garou', specific_context_field_class=sc, payload_field_class=ep
160 )
161
162 trace = tc(environment={'patate': 12})
163 stream = trace.create_stream(stream_class, user_attributes={'salut': 23})
164
165 if with_packet:
166 packet = stream.create_packet()
167 packet.context_field['something'] = 154
168 else:
169 packet = None
170
171 self._add_output_port(
172 'out',
173 {
174 'tc': tc,
175 'stream': stream,
176 'event_class': event_class,
177 'trace': trace,
178 'packet': packet,
179 },
180 )
181
182 _graph = bt2.Graph()
183 _src_comp = _graph.add_component(MySrc, 'my_source')
184 _msg_iter = TestOutputPortMessageIterator(_graph, _src_comp.output_ports['out'])
185
186 const_msgs = list(_msg_iter)
187
188 return _msgs, const_msgs
189
190
191def get_stream_beginning_message():
192 msgs, _ = _get_all_message_types()
193 for m in msgs:
194 if type(m) is bt2._StreamBeginningMessage:
195 return m
196
197
198def get_const_stream_beginning_message():
199 _, const_msgs = _get_all_message_types()
200 for m in const_msgs:
201 if type(m) is bt2._StreamBeginningMessageConst:
202 return m
203
204
205def get_stream_end_message():
206 msgs, _ = _get_all_message_types()
207 for m in msgs:
208 if type(m) is bt2._StreamEndMessage:
209 return m
210
211
212def get_packet_beginning_message():
213 msgs, _ = _get_all_message_types(with_packet=True)
214 for m in msgs:
215 if type(m) is bt2._PacketBeginningMessage:
216 return m
217
218
219def get_const_packet_beginning_message():
220 _, const_msgs = _get_all_message_types(with_packet=True)
221 for m in const_msgs:
222 if type(m) is bt2._PacketBeginningMessageConst:
223 return m
224
225
226def get_packet_end_message():
227 msgs, _ = _get_all_message_types(with_packet=True)
228 for m in msgs:
229 if type(m) is bt2._PacketEndMessage:
230 return m
231
232
233def get_event_message():
234 msgs, _ = _get_all_message_types()
235 for m in msgs:
236 if type(m) is bt2._EventMessage:
237 return m
238
239
240def get_const_event_message():
241 _, const_msgs = _get_all_message_types()
242 for m in const_msgs:
243 if type(m) is bt2._EventMessageConst:
244 return m
245
246
fac7b25a
PP
247# Proxy sink component class.
248#
249# This sink accepts a list of a single item as its initialization
250# object. This sink creates a single input port `in`. When it consumes
251# from this port, it puts the returned message in the initialization
252# list as the first item.
253class TestProxySink(bt2._UserSinkComponent):
e3250e61 254 def __init__(self, config, params, msg_list):
fac7b25a
PP
255 assert msg_list is not None
256 self._msg_list = msg_list
257 self._add_input_port('in')
258
259 def _user_graph_is_configured(self):
fbd8a4e0 260 self._msg_iter = self._create_message_iterator(self._input_ports['in'])
fac7b25a
PP
261
262 def _user_consume(self):
263 assert self._msg_list[0] is None
264 self._msg_list[0] = next(self._msg_iter)
265
266
267# This is a helper message iterator for tests.
268#
269# The constructor accepts a graph and an output port.
270#
271# Internally, it adds a proxy sink to the graph and connects the
272# received output port to the proxy sink's input port. Its __next__()
273# method then uses the proxy sink to transfer the consumed message to
274# the output port message iterator's user.
275#
276# This message iterator cannot seek.
277class TestOutputPortMessageIterator(collections.abc.Iterator):
278 def __init__(self, graph, output_port):
279 self._graph = graph
280 self._msg_list = [None]
281 sink = graph.add_component(TestProxySink, 'test-proxy-sink', obj=self._msg_list)
282 graph.connect_ports(output_port, sink.input_ports['in'])
283
284 def __next__(self):
285 assert self._msg_list[0] is None
286 self._graph.run_once()
287 msg = self._msg_list[0]
288 assert msg is not None
289 self._msg_list[0] = None
290 return msg
f7c7acd1
FD
291
292
293# Create a const field of the given field class.
294#
295# The field is part of a dummy stream, itself part of a dummy trace created
296# from trace class `tc`.
297def create_const_field(tc, field_class, field_value_setter_fn):
298 field_name = 'const field'
299
300 class MyIter(bt2._UserMessageIterator):
301 def __init__(self, config, self_port_output):
302 nonlocal field_class
303 nonlocal field_value_setter_fn
304 trace = tc()
305 packet_context_fc = tc.create_structure_field_class()
306 packet_context_fc.append_member(field_name, field_class)
307 sc = tc.create_stream_class(
308 packet_context_field_class=packet_context_fc, supports_packets=True
309 )
310 stream = trace.create_stream(sc)
311 packet = stream.create_packet()
312
313 field_value_setter_fn(packet.context_field[field_name])
314
315 self._msgs = [
316 self._create_stream_beginning_message(stream),
317 self._create_packet_beginning_message(packet),
318 ]
319
320 def __next__(self):
321 if len(self._msgs) == 0:
322 raise StopIteration
323
324 return self._msgs.pop(0)
325
326 class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
327 def __init__(self, config, params, obj):
328 self._add_output_port('out', params)
329
330 graph = bt2.Graph()
331 src_comp = graph.add_component(MySrc, 'my_source', None)
332 msg_iter = TestOutputPortMessageIterator(graph, src_comp.output_ports['out'])
333
334 # Ignore first message, stream beginning
335 _ = next(msg_iter)
336 packet_beg_msg = next(msg_iter)
337
338 return packet_beg_msg.packet.context_field[field_name]
356ada2b
SM
339
340
341# Run `msg_iter_next_func` in a bt2._UserMessageIterator.__next__ context.
342#
343# For convenience, a trace and a stream are created. To allow the caller to
344# customize the created stream class, the `create_stream_class_func` callback
345# is invoked during the component initialization. It gets passed a trace class
346# and a clock class, and must return a stream class.
347#
348# The `msg_iter_next_func` callback receives two arguments, the message iterator
349# and the created stream.
350#
351# The value returned by `msg_iter_next_func` is returned by this function.
352def run_in_message_iterator_next(create_stream_class_func, msg_iter_next_func):
353 class MyIter(bt2._UserMessageIterator):
354 def __init__(self, config, port):
355 tc, sc = port.user_data
356 trace = tc()
357 self._stream = trace.create_stream(sc)
358
359 def __next__(self):
360 nonlocal res_bound
361 res_bound = msg_iter_next_func(self, self._stream)
362 raise bt2.Stop
363
364 class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
365 def __init__(self, config, params, obj):
366 tc = self._create_trace_class()
367 cc = self._create_clock_class()
368 sc = create_stream_class_func(tc, cc)
369
370 self._add_output_port('out', (tc, sc))
371
372 class MySink(bt2._UserSinkComponent):
373 def __init__(self, config, params, obj):
374 self._input_port = self._add_input_port('in')
375
376 def _user_graph_is_configured(self):
377 self._input_iter = self._create_message_iterator(self._input_port)
378
379 def _user_consume(self):
380 next(self._input_iter)
381
382 graph = bt2.Graph()
383 res_bound = None
384 src = graph.add_component(MySrc, 'ze source')
385 snk = graph.add_component(MySink, 'ze sink')
386 graph.connect_ports(src.output_ports['out'], snk.input_ports['in'])
387 graph.run()
388
389 # We deliberately use a different variable for returning the result than
390 # the variable bound to the MyIter.__next__ context. See the big comment
391 # about that in `run_in_component_init`.
392
393 res = res_bound
394 del res_bound
395 return res
This page took 0.058137 seconds and 4 git commands to generate.