1 # SPDX-License-Identifier: MIT
3 # Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com>
4 # Copyright (c) 2018 Francis Deslauriers <francis.deslauriers@efficios.com>
5 # Copyright (c) 2019 Simon Marchi <simon.marchi@efficios.com>
7 from bt2
import native_bt
, utils
, object
8 from bt2
import stream_class
as bt2_stream_class
9 from bt2
import field_class
as bt2_field_class
10 from bt2
import integer_range_set
as bt2_integer_range_set
11 from bt2
import trace
as bt2_trace
12 from bt2
import value
as bt2_value
13 import collections
.abc
18 def _trace_class_destruction_listener_from_native(
19 user_listener
, handle
, trace_class_ptr
21 trace_class
= _TraceClass
._create
_from
_ptr
_and
_get
_ref
(trace_class_ptr
)
22 user_listener(trace_class
)
26 class _TraceClassConst(object._SharedObject
, collections
.abc
.Mapping
):
29 native_bt
.trace_class_get_ref(ptr
)
33 native_bt
.trace_class_put_ref(ptr
)
35 _borrow_stream_class_ptr_by_index
= staticmethod(
36 native_bt
.trace_class_borrow_stream_class_by_index_const
38 _borrow_stream_class_ptr_by_id
= staticmethod(
39 native_bt
.trace_class_borrow_stream_class_by_id_const
41 _borrow_user_attributes_ptr
= staticmethod(
42 native_bt
.trace_class_borrow_user_attributes_const
44 _stream_class_pycls
= bt2_stream_class
._StreamClassConst
45 _create_value_from_ptr_and_get_ref
= staticmethod(
46 bt2_value
._create
_from
_const
_ptr
_and
_get
_ref
50 def user_attributes(self
):
51 ptr
= self
._borrow
_user
_attributes
_ptr
(self
._ptr
)
52 assert ptr
is not None
53 return self
._create
_value
_from
_ptr
_and
_get
_ref
(ptr
)
55 # Number of stream classes in this trace class.
58 count
= native_bt
.trace_class_get_stream_class_count(self
._ptr
)
62 # Get a stream class by stream id.
64 def __getitem__(self
, key
):
65 utils
._check
_uint
64(key
)
67 sc_ptr
= self
._borrow
_stream
_class
_ptr
_by
_id
(self
._ptr
, key
)
71 return self
._stream
_class
_pycls
._create
_from
_ptr
_and
_get
_ref
(sc_ptr
)
74 for idx
in range(len(self
)):
75 sc_ptr
= self
._borrow
_stream
_class
_ptr
_by
_index
(self
._ptr
, idx
)
76 assert sc_ptr
is not None
78 id = native_bt
.stream_class_get_id(sc_ptr
)
84 def assigns_automatic_stream_class_id(self
):
85 return native_bt
.trace_class_assigns_automatic_stream_class_id(self
._ptr
)
87 # Add a listener to be called when the trace class is destroyed.
89 def add_destruction_listener(self
, listener
):
90 if not callable(listener
):
91 raise TypeError("'listener' parameter is not callable")
93 handle
= utils
._ListenerHandle
(self
.addr
)
95 listener_from_native
= functools
.partial(
96 _trace_class_destruction_listener_from_native
, listener
, handle
99 fn
= native_bt
.bt2_trace_class_add_destruction_listener
100 status
, listener_id
= fn(self
._ptr
, listener_from_native
)
101 utils
._handle
_func
_status
(
102 status
, "cannot add destruction listener to trace class object"
105 handle
._set
_listener
_id
(listener_id
)
109 def remove_destruction_listener(self
, listener_handle
):
110 utils
._check
_type
(listener_handle
, utils
._ListenerHandle
)
112 if listener_handle
._addr
!= self
.addr
:
114 "This trace class destruction listener does not match the trace class object."
117 if listener_handle
._listener
_id
is None:
119 "This trace class destruction listener was already removed."
122 status
= native_bt
.trace_class_remove_destruction_listener(
123 self
._ptr
, listener_handle
._listener
_id
125 utils
._handle
_func
_status
(status
)
126 listener_handle
._invalidate
()
129 class _TraceClass(_TraceClassConst
):
130 _borrow_stream_class_ptr_by_index
= staticmethod(
131 native_bt
.trace_class_borrow_stream_class_by_index
133 _borrow_stream_class_ptr_by_id
= staticmethod(
134 native_bt
.trace_class_borrow_stream_class_by_id
136 _borrow_user_attributes_ptr
= staticmethod(
137 native_bt
.trace_class_borrow_user_attributes
139 _stream_class_pycls
= bt2_stream_class
._StreamClass
140 _create_value_from_ptr_and_get_ref
= staticmethod(
141 bt2_value
._create
_from
_ptr
_and
_get
_ref
144 # Instantiate a trace of this class.
146 def __call__(self
, name
=None, user_attributes
=None, uuid
=None, environment
=None):
147 trace_ptr
= native_bt
.trace_create(self
._ptr
)
149 if trace_ptr
is None:
150 raise bt2
._MemoryError("cannot create trace class object")
152 trace
= bt2_trace
._Trace
._create
_from
_ptr
(trace_ptr
)
157 if user_attributes
is not None:
158 trace
._user
_attributes
= user_attributes
163 if environment
is not None:
164 for key
, value
in environment
.items():
165 trace
.environment
[key
] = value
169 def create_stream_class(
173 user_attributes
=None,
174 packet_context_field_class
=None,
175 event_common_context_field_class
=None,
176 default_clock_class
=None,
177 assigns_automatic_event_class_id
=True,
178 assigns_automatic_stream_id
=True,
179 supports_packets
=False,
180 packets_have_beginning_default_clock_snapshot
=False,
181 packets_have_end_default_clock_snapshot
=False,
182 supports_discarded_events
=False,
183 discarded_events_have_default_clock_snapshots
=False,
184 supports_discarded_packets
=False,
185 discarded_packets_have_default_clock_snapshots
=False,
187 # Validate parameters before we create the object.
188 bt2_stream_class
._StreamClass
._validate
_create
_params
(
191 packet_context_field_class
,
192 event_common_context_field_class
,
194 assigns_automatic_event_class_id
,
195 assigns_automatic_stream_id
,
197 packets_have_beginning_default_clock_snapshot
,
198 packets_have_end_default_clock_snapshot
,
199 supports_discarded_events
,
200 discarded_events_have_default_clock_snapshots
,
201 supports_discarded_packets
,
202 discarded_packets_have_default_clock_snapshots
,
205 if self
.assigns_automatic_stream_class_id
:
208 "id provided, but trace class assigns automatic stream class ids"
211 sc_ptr
= native_bt
.stream_class_create(self
._ptr
)
215 "id not provided, but trace class does not assign automatic stream class ids"
218 utils
._check
_uint
64(id)
219 sc_ptr
= native_bt
.stream_class_create_with_id(self
._ptr
, id)
221 sc
= bt2_stream_class
._StreamClass
._create
_from
_ptr
(sc_ptr
)
226 if user_attributes
is not None:
227 sc
._user
_attributes
= user_attributes
229 if event_common_context_field_class
is not None:
230 sc
._event
_common
_context
_field
_class
= event_common_context_field_class
232 if default_clock_class
is not None:
233 sc
._default
_clock
_class
= default_clock_class
235 # call after `sc._default_clock_class` because, if
236 # `packets_have_beginning_default_clock_snapshot` or
237 # `packets_have_end_default_clock_snapshot` is true, then this
238 # stream class needs a default clock class already.
239 sc
._set
_supports
_packets
(
241 packets_have_beginning_default_clock_snapshot
,
242 packets_have_end_default_clock_snapshot
,
245 # call after sc._set_supports_packets() because, if
246 # `packet_context_field_class` is not `None`, then this stream
247 # class needs to support packets already.
248 if packet_context_field_class
is not None:
249 sc
._packet
_context
_field
_class
= packet_context_field_class
251 sc
._assigns
_automatic
_event
_class
_id
= assigns_automatic_event_class_id
252 sc
._assigns
_automatic
_stream
_id
= assigns_automatic_stream_id
253 sc
._set
_supports
_discarded
_events
(
254 supports_discarded_events
, discarded_events_have_default_clock_snapshots
256 sc
._set
_supports
_discarded
_packets
(
257 supports_discarded_packets
, discarded_packets_have_default_clock_snapshots
261 def _user_attributes(self
, user_attributes
):
262 value
= bt2_value
.create_value(user_attributes
)
263 utils
._check
_type
(value
, bt2_value
.MapValue
)
264 native_bt
.trace_class_set_user_attributes(self
._ptr
, value
._ptr
)
266 _user_attributes
= property(fset
=_user_attributes
)
268 def _assigns_automatic_stream_class_id(self
, auto_id
):
269 utils
._check
_bool
(auto_id
)
270 return native_bt
.trace_class_set_assigns_automatic_stream_class_id(
274 _assigns_automatic_stream_class_id
= property(
275 fset
=_assigns_automatic_stream_class_id
278 # Field class creation methods.
280 def _check_field_class_create_status(self
, ptr
, type_name
):
282 raise bt2
._MemoryError("cannot create {} field class".format(type_name
))
285 def _set_field_class_user_attrs(fc
, user_attributes
):
286 if user_attributes
is not None:
287 fc
._user
_attributes
= user_attributes
289 def create_bool_field_class(self
, user_attributes
=None):
290 field_class_ptr
= native_bt
.field_class_bool_create(self
._ptr
)
291 self
._check
_field
_class
_create
_status
(field_class_ptr
, "boolean")
292 fc
= bt2_field_class
._BoolFieldClass
._create
_from
_ptr
(field_class_ptr
)
293 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
296 def create_bit_array_field_class(self
, length
, user_attributes
=None):
297 utils
._check
_uint
64(length
)
299 if length
< 1 or length
> 64:
301 "invalid length {}: expecting a value in the [1, 64] range".format(
306 field_class_ptr
= native_bt
.field_class_bit_array_create(self
._ptr
, length
)
307 self
._check
_field
_class
_create
_status
(field_class_ptr
, "bit array")
308 fc
= bt2_field_class
._BitArrayFieldClass
._create
_from
_ptr
(field_class_ptr
)
309 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
312 def _create_integer_field_class(
318 preferred_display_base
,
321 field_class_ptr
= create_func(self
._ptr
)
322 self
._check
_field
_class
_create
_status
(field_class_ptr
, type_name
)
324 field_class
= py_cls
._create
_from
_ptr
(field_class_ptr
)
326 if field_value_range
is not None:
327 field_class
._field
_value
_range
= field_value_range
329 if preferred_display_base
is not None:
330 field_class
._preferred
_display
_base
= preferred_display_base
332 self
._set
_field
_class
_user
_attrs
(field_class
, user_attributes
)
335 def create_signed_integer_field_class(
336 self
, field_value_range
=None, preferred_display_base
=None, user_attributes
=None
338 return self
._create
_integer
_field
_class
(
339 native_bt
.field_class_integer_signed_create
,
340 bt2_field_class
._SignedIntegerFieldClass
,
343 preferred_display_base
,
347 def create_unsigned_integer_field_class(
348 self
, field_value_range
=None, preferred_display_base
=None, user_attributes
=None
350 return self
._create
_integer
_field
_class
(
351 native_bt
.field_class_integer_unsigned_create
,
352 bt2_field_class
._UnsignedIntegerFieldClass
,
355 preferred_display_base
,
359 def create_signed_enumeration_field_class(
360 self
, field_value_range
=None, preferred_display_base
=None, user_attributes
=None
362 return self
._create
_integer
_field
_class
(
363 native_bt
.field_class_enumeration_signed_create
,
364 bt2_field_class
._SignedEnumerationFieldClass
,
365 "signed enumeration",
367 preferred_display_base
,
371 def create_unsigned_enumeration_field_class(
372 self
, field_value_range
=None, preferred_display_base
=None, user_attributes
=None
374 return self
._create
_integer
_field
_class
(
375 native_bt
.field_class_enumeration_unsigned_create
,
376 bt2_field_class
._UnsignedEnumerationFieldClass
,
377 "unsigned enumeration",
379 preferred_display_base
,
383 def create_single_precision_real_field_class(self
, user_attributes
=None):
384 field_class_ptr
= native_bt
.field_class_real_single_precision_create(self
._ptr
)
385 self
._check
_field
_class
_create
_status
(field_class_ptr
, "single-precision real")
387 field_class
= bt2_field_class
._SinglePrecisionRealFieldClass
._create
_from
_ptr
(
391 self
._set
_field
_class
_user
_attrs
(field_class
, user_attributes
)
395 def create_double_precision_real_field_class(self
, user_attributes
=None):
396 field_class_ptr
= native_bt
.field_class_real_double_precision_create(self
._ptr
)
397 self
._check
_field
_class
_create
_status
(field_class_ptr
, "double-precision real")
399 field_class
= bt2_field_class
._DoublePrecisionRealFieldClass
._create
_from
_ptr
(
403 self
._set
_field
_class
_user
_attrs
(field_class
, user_attributes
)
407 def create_structure_field_class(self
, user_attributes
=None):
408 field_class_ptr
= native_bt
.field_class_structure_create(self
._ptr
)
409 self
._check
_field
_class
_create
_status
(field_class_ptr
, "structure")
410 fc
= bt2_field_class
._StructureFieldClass
._create
_from
_ptr
(field_class_ptr
)
411 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
414 def create_string_field_class(self
, user_attributes
=None):
415 field_class_ptr
= native_bt
.field_class_string_create(self
._ptr
)
416 self
._check
_field
_class
_create
_status
(field_class_ptr
, "string")
417 fc
= bt2_field_class
._StringFieldClass
._create
_from
_ptr
(field_class_ptr
)
418 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
421 def create_static_array_field_class(self
, elem_fc
, length
, user_attributes
=None):
422 utils
._check
_type
(elem_fc
, bt2_field_class
._FieldClass
)
423 utils
._check
_uint
64(length
)
424 ptr
= native_bt
.field_class_array_static_create(self
._ptr
, elem_fc
._ptr
, length
)
425 self
._check
_field
_class
_create
_status
(ptr
, "static array")
426 fc
= bt2_field_class
._StaticArrayFieldClass
._create
_from
_ptr
(ptr
)
427 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
430 def create_dynamic_array_field_class(
431 self
, elem_fc
, length_fc
=None, user_attributes
=None
433 utils
._check
_type
(elem_fc
, bt2_field_class
._FieldClass
)
436 if length_fc
is not None:
437 utils
._check
_type
(length_fc
, bt2_field_class
._UnsignedIntegerFieldClass
)
438 length_fc_ptr
= length_fc
._ptr
440 ptr
= native_bt
.field_class_array_dynamic_create(
441 self
._ptr
, elem_fc
._ptr
, length_fc_ptr
443 self
._check
_field
_class
_create
_status
(ptr
, "dynamic array")
444 fc
= bt2_field_class
._obj
_type
_from
_field
_class
_ptr
(ptr
)._create
_from
_ptr
(ptr
)
445 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
448 def create_option_without_selector_field_class(
449 self
, content_fc
, user_attributes
=None
451 utils
._check
_type
(content_fc
, bt2_field_class
._FieldClass
)
452 ptr
= native_bt
.field_class_option_without_selector_create(
453 self
._ptr
, content_fc
._ptr
455 self
._check
_field
_class
_create
_status
(ptr
, "option")
456 fc
= bt2_field_class
._obj
_type
_from
_field
_class
_ptr
(ptr
)._create
_from
_ptr
(ptr
)
457 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
460 def create_option_with_bool_selector_field_class(
461 self
, content_fc
, selector_fc
, selector_is_reversed
=False, user_attributes
=None
463 utils
._check
_type
(content_fc
, bt2_field_class
._FieldClass
)
464 utils
._check
_bool
(selector_is_reversed
)
465 utils
._check
_type
(selector_fc
, bt2_field_class
._BoolFieldClass
)
466 ptr
= native_bt
.field_class_option_with_selector_field_bool_create(
467 self
._ptr
, content_fc
._ptr
, selector_fc
._ptr
469 self
._check
_field
_class
_create
_status
(ptr
, "option")
470 fc
= bt2_field_class
._obj
_type
_from
_field
_class
_ptr
(ptr
)._create
_from
_ptr
(ptr
)
471 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
472 fc
._selector
_is
_reversed
= selector_is_reversed
475 def create_option_with_integer_selector_field_class(
476 self
, content_fc
, selector_fc
, ranges
, user_attributes
=None
478 utils
._check
_type
(content_fc
, bt2_field_class
._FieldClass
)
479 utils
._check
_type
(selector_fc
, bt2_field_class
._IntegerFieldClass
)
482 raise ValueError("integer range set is empty")
484 if isinstance(selector_fc
, bt2_field_class
._UnsignedIntegerFieldClass
):
485 utils
._check
_type
(ranges
, bt2_integer_range_set
.UnsignedIntegerRangeSet
)
486 ptr
= native_bt
.field_class_option_with_selector_field_integer_unsigned_create(
487 self
._ptr
, content_fc
._ptr
, selector_fc
._ptr
, ranges
._ptr
490 utils
._check
_type
(ranges
, bt2_integer_range_set
.SignedIntegerRangeSet
)
492 native_bt
.field_class_option_with_selector_field_integer_signed_create(
493 self
._ptr
, content_fc
._ptr
, selector_fc
._ptr
, ranges
._ptr
497 self
._check
_field
_class
_create
_status
(ptr
, "option")
498 fc
= bt2_field_class
._obj
_type
_from
_field
_class
_ptr
(ptr
)._create
_from
_ptr
(ptr
)
499 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
502 def create_variant_field_class(self
, selector_fc
=None, user_attributes
=None):
503 selector_fc_ptr
= None
505 if selector_fc
is not None:
506 utils
._check
_type
(selector_fc
, bt2_field_class
._IntegerFieldClass
)
507 selector_fc_ptr
= selector_fc
._ptr
509 ptr
= native_bt
.field_class_variant_create(self
._ptr
, selector_fc_ptr
)
510 self
._check
_field
_class
_create
_status
(ptr
, "variant")
511 fc
= bt2_field_class
._obj
_type
_from
_field
_class
_ptr
(ptr
)._create
_from
_ptr
(ptr
)
512 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)