1 # SPDX-License-Identifier: GPL-2.0-only
3 # Copyright (C) 2019 EfficiOS Inc.
10 # Run callable `func` in the context of a component's __init__ method. The
11 # callable is passed the Component being instantiated.
13 # The value returned by the callable is returned by run_in_component_init.
14 def run_in_component_init(func
):
15 class MySink(bt2
._UserSinkComponent
):
16 def __init__(self
, config
, params
, obj
):
18 res_bound
= func(self
)
20 def _user_consume(self
):
25 g
.add_component(MySink
, "comp")
27 # We deliberately use a different variable for returning the result than
28 # the variable bound to the MySink.__init__ context and delete res_bound.
29 # The MySink.__init__ context stays alive until the end of the program, so
30 # if res_bound were to still point to our result, it would contribute an
31 # unexpected reference to the refcount of the result, from the point of view
32 # of the user of this function. It would then affect destruction tests,
33 # for example, which want to test what happens when the refcount of a Python
41 # Create an empty trace class with default values.
42 def get_default_trace_class():
44 return comp_self
._create
_trace
_class
()
46 return run_in_component_init(f
)
49 # Create a pair of list, one containing non-const messages and the other
50 # containing const messages
51 def _get_all_message_types(with_packet
=True):
54 class MyIter(bt2
._UserMessageIterator
):
55 def __init__(self
, config
, self_output_port
):
59 self
._create
_stream
_beginning
_message
(
60 self_output_port
.user_data
["stream"]
65 assert self_output_port
.user_data
["packet"]
67 self
._create
_packet
_beginning
_message
(
68 self_output_port
.user_data
["packet"]
72 default_clock_snapshot
= 789
75 assert self_output_port
.user_data
["packet"]
76 ev_parent
= self_output_port
.user_data
["packet"]
78 assert self_output_port
.user_data
["stream"]
79 ev_parent
= self_output_port
.user_data
["stream"]
81 msg
= self
._create
_event
_message
(
82 self_output_port
.user_data
["event_class"],
84 default_clock_snapshot
,
87 msg
.event
.payload_field
["giraffe"] = 1
88 msg
.event
.specific_context_field
["ant"] = -1
89 msg
.event
.common_context_field
["cpu_id"] = 1
90 self
._msgs
.append(msg
)
94 self
._create
_packet
_end
_message
(
95 self_output_port
.user_data
["packet"]
100 self
._create
_stream
_end
_message
(self_output_port
.user_data
["stream"])
106 if self
._at
== len(self
._msgs
):
109 msg
= self
._msgs
[self
._at
]
113 class MySrc(bt2
._UserSourceComponent
, message_iterator_class
=MyIter
):
114 def __init__(self
, config
, params
, obj
):
115 tc
= self
._create
_trace
_class
()
116 clock_class
= self
._create
_clock
_class
(frequency
=1000)
118 # event common context (stream-class-defined)
119 cc
= tc
.create_structure_field_class()
120 cc
+= [("cpu_id", tc
.create_signed_integer_field_class(8))]
122 # packet context (stream-class-defined)
126 pc
= tc
.create_structure_field_class()
127 pc
+= [("something", tc
.create_unsigned_integer_field_class(8))]
129 stream_class
= tc
.create_stream_class(
130 default_clock_class
=clock_class
,
131 event_common_context_field_class
=cc
,
132 packet_context_field_class
=pc
,
133 supports_packets
=with_packet
,
136 # specific context (event-class-defined)
137 sc
= tc
.create_structure_field_class()
138 sc
+= [("ant", tc
.create_signed_integer_field_class(16))]
141 ep
= tc
.create_structure_field_class()
142 ep
+= [("giraffe", tc
.create_signed_integer_field_class(32))]
144 event_class
= stream_class
.create_event_class(
145 name
="garou", specific_context_field_class
=sc
, payload_field_class
=ep
148 trace
= tc(environment
={"patate": 12})
149 stream
= trace
.create_stream(stream_class
, user_attributes
={"salut": 23})
152 packet
= stream
.create_packet()
153 packet
.context_field
["something"] = 154
157 self
._add
_output
_port
(
162 "event_class": event_class
,
169 _src_comp
= _graph
.add_component(MySrc
, "my_source")
170 _msg_iter
= TestOutputPortMessageIterator(_graph
, _src_comp
.output_ports
["out"])
172 const_msgs
= list(_msg_iter
)
174 return _msgs
, const_msgs
177 def get_stream_beginning_message():
178 msgs
, _
= _get_all_message_types()
180 if type(m
) is bt2
._StreamBeginningMessage
:
184 def get_const_stream_beginning_message():
185 _
, const_msgs
= _get_all_message_types()
187 if type(m
) is bt2
._StreamBeginningMessageConst
:
191 def get_stream_end_message():
192 msgs
, _
= _get_all_message_types()
194 if type(m
) is bt2
._StreamEndMessage
:
198 def get_packet_beginning_message():
199 msgs
, _
= _get_all_message_types(with_packet
=True)
201 if type(m
) is bt2
._PacketBeginningMessage
:
205 def get_const_packet_beginning_message():
206 _
, const_msgs
= _get_all_message_types(with_packet
=True)
208 if type(m
) is bt2
._PacketBeginningMessageConst
:
212 def get_packet_end_message():
213 msgs
, _
= _get_all_message_types(with_packet
=True)
215 if type(m
) is bt2
._PacketEndMessage
:
219 def get_event_message():
220 msgs
, _
= _get_all_message_types()
222 if type(m
) is bt2
._EventMessage
:
226 def get_const_event_message():
227 _
, const_msgs
= _get_all_message_types()
229 if type(m
) is bt2
._EventMessageConst
:
233 # Proxy sink component class.
235 # This sink accepts a list of a single item as its initialization
236 # object. This sink creates a single input port `in`. When it consumes
237 # from this port, it puts the returned message in the initialization
238 # list as the first item.
239 class TestProxySink(bt2
._UserSinkComponent
):
240 def __init__(self
, config
, params
, msg_list
):
241 assert msg_list
is not None
242 self
._msg
_list
= msg_list
243 self
._add
_input
_port
("in")
245 def _user_graph_is_configured(self
):
246 self
._msg
_iter
= self
._create
_message
_iterator
(self
._input
_ports
["in"])
248 def _user_consume(self
):
249 assert self
._msg
_list
[0] is None
250 self
._msg
_list
[0] = next(self
._msg
_iter
)
253 # This is a helper message iterator for tests.
255 # The constructor accepts a graph and an output port.
257 # Internally, it adds a proxy sink to the graph and connects the
258 # received output port to the proxy sink's input port. Its __next__()
259 # method then uses the proxy sink to transfer the consumed message to
260 # the output port message iterator's user.
262 # This message iterator cannot seek.
263 class TestOutputPortMessageIterator(collections
.abc
.Iterator
):
264 def __init__(self
, graph
, output_port
):
266 self
._msg
_list
= [None]
267 sink
= graph
.add_component(TestProxySink
, "test-proxy-sink", obj
=self
._msg
_list
)
268 graph
.connect_ports(output_port
, sink
.input_ports
["in"])
271 assert self
._msg
_list
[0] is None
272 self
._graph
.run_once()
273 msg
= self
._msg
_list
[0]
274 assert msg
is not None
275 self
._msg
_list
[0] = None
279 # Create a const field of the given field class.
281 # The field is part of a dummy stream, itself part of a dummy trace created
282 # from trace class `tc`.
283 def create_const_field(tc
, field_class
, field_value_setter_fn
):
284 field_name
= "const field"
286 class MyIter(bt2
._UserMessageIterator
):
287 def __init__(self
, config
, self_port_output
):
289 nonlocal field_value_setter_fn
291 packet_context_fc
= tc
.create_structure_field_class()
292 packet_context_fc
.append_member(field_name
, field_class
)
293 sc
= tc
.create_stream_class(
294 packet_context_field_class
=packet_context_fc
, supports_packets
=True
296 stream
= trace
.create_stream(sc
)
297 packet
= stream
.create_packet()
299 field_value_setter_fn(packet
.context_field
[field_name
])
302 self
._create
_stream
_beginning
_message
(stream
),
303 self
._create
_packet
_beginning
_message
(packet
),
307 if len(self
._msgs
) == 0:
310 return self
._msgs
.pop(0)
312 class MySrc(bt2
._UserSourceComponent
, message_iterator_class
=MyIter
):
313 def __init__(self
, config
, params
, obj
):
314 self
._add
_output
_port
("out", params
)
317 src_comp
= graph
.add_component(MySrc
, "my_source", None)
318 msg_iter
= TestOutputPortMessageIterator(graph
, src_comp
.output_ports
["out"])
320 # Ignore first message, stream beginning
322 packet_beg_msg
= next(msg_iter
)
324 return packet_beg_msg
.packet
.context_field
[field_name
]
327 # Run `msg_iter_next_func` in a bt2._UserMessageIterator.__next__ context.
329 # For convenience, a trace and a stream are created. To allow the caller to
330 # customize the created stream class, the `create_stream_class_func` callback
331 # is invoked during the component initialization. It gets passed a trace class
332 # and a clock class, and must return a stream class.
334 # The `msg_iter_next_func` callback receives two arguments, the message iterator
335 # and the created stream.
337 # The value returned by `msg_iter_next_func` is returned by this function.
338 def run_in_message_iterator_next(create_stream_class_func
, msg_iter_next_func
):
339 class MyIter(bt2
._UserMessageIterator
):
340 def __init__(self
, config
, port
):
341 tc
, sc
= port
.user_data
343 self
._stream
= trace
.create_stream(sc
)
347 res_bound
= msg_iter_next_func(self
, self
._stream
)
350 class MySrc(bt2
._UserSourceComponent
, message_iterator_class
=MyIter
):
351 def __init__(self
, config
, params
, obj
):
352 tc
= self
._create
_trace
_class
()
353 cc
= self
._create
_clock
_class
()
354 sc
= create_stream_class_func(tc
, cc
)
356 self
._add
_output
_port
("out", (tc
, sc
))
358 class MySink(bt2
._UserSinkComponent
):
359 def __init__(self
, config
, params
, obj
):
360 self
._input
_port
= self
._add
_input
_port
("in")
362 def _user_graph_is_configured(self
):
363 self
._input
_iter
= self
._create
_message
_iterator
(self
._input
_port
)
365 def _user_consume(self
):
366 next(self
._input
_iter
)
370 src
= graph
.add_component(MySrc
, "ze source")
371 snk
= graph
.add_component(MySink
, "ze sink")
372 graph
.connect_ports(src
.output_ports
["out"], snk
.input_ports
["in"])
375 # We deliberately use a different variable for returning the result than
376 # the variable bound to the MyIter.__next__ context. See the big comment
377 # about that in `run_in_component_init`.