Remove `skip-string-normalization` in Python formatter config
[babeltrace.git] / tests / bindings / python / bt2 / utils.py
CommitLineData
0235b0db 1# SPDX-License-Identifier: GPL-2.0-only
d2d857a8
MJ
2#
3# Copyright (C) 2019 EfficiOS Inc.
4#
d2d857a8 5
fbbe9302 6import bt2
6c373cc9 7import collections.abc
fbbe9302 8
2287be69 9
fbbe9302
SM
10# Run callable `func` in the context of a component's __init__ method. The
11# callable is passed the Component being instantiated.
12#
13# The value returned by the callable is returned by run_in_component_init.
fbbe9302
SM
14def run_in_component_init(func):
15 class MySink(bt2._UserSinkComponent):
59225a3e 16 def __init__(self, config, params, obj):
fbbe9302
SM
17 nonlocal res_bound
18 res_bound = func(self)
19
6a91742b 20 def _user_consume(self):
a01b452b
SM
21 pass
22
fbbe9302
SM
23 g = bt2.Graph()
24 res_bound = None
f5567ea8 25 g.add_component(MySink, "comp")
fbbe9302
SM
26
27 # We deliberately use a different variable for returning the result than
28 # the variable bound to the MySink.__init__ context and delete res_bound.
29 # The MySink.__init__ context stays alive until the end of the program, so
30 # if res_bound were to still point to our result, it would contribute an
31 # unexpected reference to the refcount of the result, from the point of view
32 # of the user of this function. It would then affect destruction tests,
33 # for example, which want to test what happens when the refcount of a Python
34 # object reaches 0.
35
36 res = res_bound
37 del res_bound
38 return res
39
cfbd7cf3 40
fbbe9302 41# Create an empty trace class with default values.
fbbe9302
SM
42def get_default_trace_class():
43 def f(comp_self):
44 return comp_self._create_trace_class()
45
46 return run_in_component_init(f)
6c373cc9
PP
47
48
f0a42b33
FD
49# Create a pair of list, one containing non-const messages and the other
50# containing const messages
51def _get_all_message_types(with_packet=True):
52 _msgs = None
53
54 class MyIter(bt2._UserMessageIterator):
8d8b141d 55 def __init__(self, config, self_output_port):
f0a42b33
FD
56
57 nonlocal _msgs
58 self._at = 0
59 self._msgs = [
60 self._create_stream_beginning_message(
f5567ea8 61 self_output_port.user_data["stream"]
f0a42b33
FD
62 )
63 ]
64
65 if with_packet:
f5567ea8 66 assert self_output_port.user_data["packet"]
f0a42b33
FD
67 self._msgs.append(
68 self._create_packet_beginning_message(
f5567ea8 69 self_output_port.user_data["packet"]
f0a42b33
FD
70 )
71 )
72
73 default_clock_snapshot = 789
74
75 if with_packet:
f5567ea8
FD
76 assert self_output_port.user_data["packet"]
77 ev_parent = self_output_port.user_data["packet"]
f0a42b33 78 else:
f5567ea8
FD
79 assert self_output_port.user_data["stream"]
80 ev_parent = self_output_port.user_data["stream"]
f0a42b33
FD
81
82 msg = self._create_event_message(
f5567ea8 83 self_output_port.user_data["event_class"],
f0a42b33
FD
84 ev_parent,
85 default_clock_snapshot,
86 )
87
f5567ea8
FD
88 msg.event.payload_field["giraffe"] = 1
89 msg.event.specific_context_field["ant"] = -1
90 msg.event.common_context_field["cpu_id"] = 1
f0a42b33
FD
91 self._msgs.append(msg)
92
93 if with_packet:
94 self._msgs.append(
95 self._create_packet_end_message(
f5567ea8 96 self_output_port.user_data["packet"]
f0a42b33
FD
97 )
98 )
99
100 self._msgs.append(
f5567ea8 101 self._create_stream_end_message(self_output_port.user_data["stream"])
f0a42b33
FD
102 )
103
104 _msgs = self._msgs
105
106 def __next__(self):
107 if self._at == len(self._msgs):
108 raise bt2.Stop
109
110 msg = self._msgs[self._at]
111 self._at += 1
112 return msg
113
114 class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
59225a3e 115 def __init__(self, config, params, obj):
f0a42b33
FD
116 tc = self._create_trace_class()
117 clock_class = self._create_clock_class(frequency=1000)
118
119 # event common context (stream-class-defined)
120 cc = tc.create_structure_field_class()
f5567ea8 121 cc += [("cpu_id", tc.create_signed_integer_field_class(8))]
f0a42b33
FD
122
123 # packet context (stream-class-defined)
124 pc = None
125
126 if with_packet:
127 pc = tc.create_structure_field_class()
f5567ea8 128 pc += [("something", tc.create_unsigned_integer_field_class(8))]
f0a42b33
FD
129
130 stream_class = tc.create_stream_class(
131 default_clock_class=clock_class,
132 event_common_context_field_class=cc,
133 packet_context_field_class=pc,
134 supports_packets=with_packet,
135 )
136
137 # specific context (event-class-defined)
138 sc = tc.create_structure_field_class()
f5567ea8 139 sc += [("ant", tc.create_signed_integer_field_class(16))]
f0a42b33
FD
140
141 # event payload
142 ep = tc.create_structure_field_class()
f5567ea8 143 ep += [("giraffe", tc.create_signed_integer_field_class(32))]
f0a42b33
FD
144
145 event_class = stream_class.create_event_class(
f5567ea8 146 name="garou", specific_context_field_class=sc, payload_field_class=ep
f0a42b33
FD
147 )
148
f5567ea8
FD
149 trace = tc(environment={"patate": 12})
150 stream = trace.create_stream(stream_class, user_attributes={"salut": 23})
f0a42b33
FD
151
152 if with_packet:
153 packet = stream.create_packet()
f5567ea8 154 packet.context_field["something"] = 154
f0a42b33
FD
155 else:
156 packet = None
157
158 self._add_output_port(
f5567ea8 159 "out",
f0a42b33 160 {
f5567ea8
FD
161 "tc": tc,
162 "stream": stream,
163 "event_class": event_class,
164 "trace": trace,
165 "packet": packet,
f0a42b33
FD
166 },
167 )
168
169 _graph = bt2.Graph()
f5567ea8
FD
170 _src_comp = _graph.add_component(MySrc, "my_source")
171 _msg_iter = TestOutputPortMessageIterator(_graph, _src_comp.output_ports["out"])
f0a42b33
FD
172
173 const_msgs = list(_msg_iter)
174
175 return _msgs, const_msgs
176
177
178def get_stream_beginning_message():
179 msgs, _ = _get_all_message_types()
180 for m in msgs:
181 if type(m) is bt2._StreamBeginningMessage:
182 return m
183
184
185def get_const_stream_beginning_message():
186 _, const_msgs = _get_all_message_types()
187 for m in const_msgs:
188 if type(m) is bt2._StreamBeginningMessageConst:
189 return m
190
191
192def get_stream_end_message():
193 msgs, _ = _get_all_message_types()
194 for m in msgs:
195 if type(m) is bt2._StreamEndMessage:
196 return m
197
198
199def get_packet_beginning_message():
200 msgs, _ = _get_all_message_types(with_packet=True)
201 for m in msgs:
202 if type(m) is bt2._PacketBeginningMessage:
203 return m
204
205
206def get_const_packet_beginning_message():
207 _, const_msgs = _get_all_message_types(with_packet=True)
208 for m in const_msgs:
209 if type(m) is bt2._PacketBeginningMessageConst:
210 return m
211
212
213def get_packet_end_message():
214 msgs, _ = _get_all_message_types(with_packet=True)
215 for m in msgs:
216 if type(m) is bt2._PacketEndMessage:
217 return m
218
219
220def get_event_message():
221 msgs, _ = _get_all_message_types()
222 for m in msgs:
223 if type(m) is bt2._EventMessage:
224 return m
225
226
227def get_const_event_message():
228 _, const_msgs = _get_all_message_types()
229 for m in const_msgs:
230 if type(m) is bt2._EventMessageConst:
231 return m
232
233
6c373cc9
PP
234# Proxy sink component class.
235#
236# This sink accepts a list of a single item as its initialization
237# object. This sink creates a single input port `in`. When it consumes
238# from this port, it puts the returned message in the initialization
239# list as the first item.
240class TestProxySink(bt2._UserSinkComponent):
59225a3e 241 def __init__(self, config, params, msg_list):
6c373cc9
PP
242 assert msg_list is not None
243 self._msg_list = msg_list
f5567ea8 244 self._add_input_port("in")
6c373cc9
PP
245
246 def _user_graph_is_configured(self):
f5567ea8 247 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
6c373cc9
PP
248
249 def _user_consume(self):
250 assert self._msg_list[0] is None
251 self._msg_list[0] = next(self._msg_iter)
252
253
254# This is a helper message iterator for tests.
255#
256# The constructor accepts a graph and an output port.
257#
258# Internally, it adds a proxy sink to the graph and connects the
259# received output port to the proxy sink's input port. Its __next__()
260# method then uses the proxy sink to transfer the consumed message to
261# the output port message iterator's user.
262#
263# This message iterator cannot seek.
264class TestOutputPortMessageIterator(collections.abc.Iterator):
265 def __init__(self, graph, output_port):
266 self._graph = graph
267 self._msg_list = [None]
f5567ea8
FD
268 sink = graph.add_component(TestProxySink, "test-proxy-sink", obj=self._msg_list)
269 graph.connect_ports(output_port, sink.input_ports["in"])
6c373cc9
PP
270
271 def __next__(self):
272 assert self._msg_list[0] is None
273 self._graph.run_once()
274 msg = self._msg_list[0]
275 assert msg is not None
276 self._msg_list[0] = None
277 return msg
6d9a6c9e
FD
278
279
280# Create a const field of the given field class.
281#
282# The field is part of a dummy stream, itself part of a dummy trace created
283# from trace class `tc`.
284def create_const_field(tc, field_class, field_value_setter_fn):
f5567ea8 285 field_name = "const field"
6d9a6c9e
FD
286
287 class MyIter(bt2._UserMessageIterator):
288 def __init__(self, config, self_port_output):
289 nonlocal field_class
290 nonlocal field_value_setter_fn
291 trace = tc()
292 packet_context_fc = tc.create_structure_field_class()
293 packet_context_fc.append_member(field_name, field_class)
294 sc = tc.create_stream_class(
295 packet_context_field_class=packet_context_fc, supports_packets=True
296 )
297 stream = trace.create_stream(sc)
298 packet = stream.create_packet()
299
300 field_value_setter_fn(packet.context_field[field_name])
301
302 self._msgs = [
303 self._create_stream_beginning_message(stream),
304 self._create_packet_beginning_message(packet),
305 ]
306
307 def __next__(self):
308 if len(self._msgs) == 0:
309 raise StopIteration
310
311 return self._msgs.pop(0)
312
313 class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
314 def __init__(self, config, params, obj):
f5567ea8 315 self._add_output_port("out", params)
6d9a6c9e
FD
316
317 graph = bt2.Graph()
f5567ea8
FD
318 src_comp = graph.add_component(MySrc, "my_source", None)
319 msg_iter = TestOutputPortMessageIterator(graph, src_comp.output_ports["out"])
6d9a6c9e
FD
320
321 # Ignore first message, stream beginning
322 _ = next(msg_iter)
323 packet_beg_msg = next(msg_iter)
324
325 return packet_beg_msg.packet.context_field[field_name]
f18c6482
SM
326
327
328# Run `msg_iter_next_func` in a bt2._UserMessageIterator.__next__ context.
329#
330# For convenience, a trace and a stream are created. To allow the caller to
331# customize the created stream class, the `create_stream_class_func` callback
332# is invoked during the component initialization. It gets passed a trace class
333# and a clock class, and must return a stream class.
334#
335# The `msg_iter_next_func` callback receives two arguments, the message iterator
336# and the created stream.
337#
338# The value returned by `msg_iter_next_func` is returned by this function.
339def run_in_message_iterator_next(create_stream_class_func, msg_iter_next_func):
340 class MyIter(bt2._UserMessageIterator):
341 def __init__(self, config, port):
342 tc, sc = port.user_data
343 trace = tc()
344 self._stream = trace.create_stream(sc)
345
346 def __next__(self):
347 nonlocal res_bound
348 res_bound = msg_iter_next_func(self, self._stream)
349 raise bt2.Stop
350
351 class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
352 def __init__(self, config, params, obj):
353 tc = self._create_trace_class()
354 cc = self._create_clock_class()
355 sc = create_stream_class_func(tc, cc)
356
f5567ea8 357 self._add_output_port("out", (tc, sc))
f18c6482
SM
358
359 class MySink(bt2._UserSinkComponent):
360 def __init__(self, config, params, obj):
f5567ea8 361 self._input_port = self._add_input_port("in")
f18c6482
SM
362
363 def _user_graph_is_configured(self):
364 self._input_iter = self._create_message_iterator(self._input_port)
365
366 def _user_consume(self):
367 next(self._input_iter)
368
369 graph = bt2.Graph()
370 res_bound = None
f5567ea8
FD
371 src = graph.add_component(MySrc, "ze source")
372 snk = graph.add_component(MySink, "ze sink")
373 graph.connect_ports(src.output_ports["out"], snk.input_ports["in"])
f18c6482
SM
374 graph.run()
375
376 # We deliberately use a different variable for returning the result than
377 # the variable bound to the MyIter.__next__ context. See the big comment
378 # about that in `run_in_component_init`.
379
380 res = res_bound
381 del res_bound
382 return res
This page took 0.06605 seconds and 4 git commands to generate.