1 # SPDX-License-Identifier: MIT
3 # Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com>
4 # Copyright (c) 2018 Francis Deslauriers <francis.deslauriers@efficios.com>
5 # Copyright (c) 2019 Simon Marchi <simon.marchi@efficios.com>
10 from bt2
import error
as bt2_error
11 from bt2
import trace
as bt2_trace
12 from bt2
import utils
as bt2_utils
13 from bt2
import value
as bt2_value
14 from bt2
import object as bt2_object
15 from bt2
import native_bt
16 from bt2
import field_class
as bt2_field_class
17 from bt2
import stream_class
as bt2_stream_class
18 from bt2
import integer_range_set
as bt2_integer_range_set
21 def _trace_class_destruction_listener_from_native(
22 user_listener
, handle
, trace_class_ptr
24 trace_class
= _TraceClassConst
._create
_from
_ptr
_and
_get
_ref
(trace_class_ptr
)
25 user_listener(trace_class
)
29 class _TraceClassConst(bt2_object
._SharedObject
, collections
.abc
.Mapping
):
32 native_bt
.trace_class_get_ref(ptr
)
36 native_bt
.trace_class_put_ref(ptr
)
38 _borrow_stream_class_ptr_by_index
= staticmethod(
39 native_bt
.trace_class_borrow_stream_class_by_index_const
41 _borrow_stream_class_ptr_by_id
= staticmethod(
42 native_bt
.trace_class_borrow_stream_class_by_id_const
44 _borrow_user_attributes_ptr
= staticmethod(
45 native_bt
.trace_class_borrow_user_attributes_const
47 _stream_class_pycls
= bt2_stream_class
._StreamClassConst
48 _create_value_from_ptr_and_get_ref
= staticmethod(
49 bt2_value
._create
_from
_const
_ptr
_and
_get
_ref
53 def user_attributes(self
):
54 ptr
= self
._borrow
_user
_attributes
_ptr
(self
._ptr
)
55 assert ptr
is not None
56 return self
._create
_value
_from
_ptr
_and
_get
_ref
(ptr
)
58 # Number of stream classes in this trace class.
61 count
= native_bt
.trace_class_get_stream_class_count(self
._ptr
)
65 # Get a stream class by stream id.
67 def __getitem__(self
, key
):
68 bt2_utils
._check
_uint
64(key
)
70 sc_ptr
= self
._borrow
_stream
_class
_ptr
_by
_id
(self
._ptr
, key
)
74 return self
._stream
_class
_pycls
._create
_from
_ptr
_and
_get
_ref
(sc_ptr
)
77 for idx
in range(len(self
)):
78 sc_ptr
= self
._borrow
_stream
_class
_ptr
_by
_index
(self
._ptr
, idx
)
79 assert sc_ptr
is not None
81 id = native_bt
.stream_class_get_id(sc_ptr
)
87 def assigns_automatic_stream_class_id(self
):
88 return native_bt
.trace_class_assigns_automatic_stream_class_id(self
._ptr
)
90 # Add a listener to be called when the trace class is destroyed.
92 def add_destruction_listener(self
, listener
):
93 if not callable(listener
):
94 raise TypeError("'listener' parameter is not callable")
96 handle
= bt2_utils
._ListenerHandle
(self
.addr
)
98 listener_from_native
= functools
.partial(
99 _trace_class_destruction_listener_from_native
, listener
, handle
102 fn
= native_bt
.bt2_trace_class_add_destruction_listener
103 status
, listener_id
= fn(self
._ptr
, listener_from_native
)
104 bt2_utils
._handle
_func
_status
(
105 status
, "cannot add destruction listener to trace class object"
108 handle
._set
_listener
_id
(listener_id
)
112 def remove_destruction_listener(self
, listener_handle
):
113 bt2_utils
._check
_type
(listener_handle
, bt2_utils
._ListenerHandle
)
115 if listener_handle
._addr
!= self
.addr
:
117 "This trace class destruction listener does not match the trace class object."
120 if listener_handle
._listener
_id
is None:
122 "This trace class destruction listener was already removed."
125 status
= native_bt
.trace_class_remove_destruction_listener(
126 self
._ptr
, listener_handle
._listener
_id
128 bt2_utils
._handle
_func
_status
(status
)
129 listener_handle
._invalidate
()
132 class _TraceClass(_TraceClassConst
):
133 _borrow_stream_class_ptr_by_index
= staticmethod(
134 native_bt
.trace_class_borrow_stream_class_by_index
136 _borrow_stream_class_ptr_by_id
= staticmethod(
137 native_bt
.trace_class_borrow_stream_class_by_id
139 _borrow_user_attributes_ptr
= staticmethod(
140 native_bt
.trace_class_borrow_user_attributes
142 _stream_class_pycls
= bt2_stream_class
._StreamClass
143 _create_value_from_ptr_and_get_ref
= staticmethod(
144 bt2_value
._create
_from
_ptr
_and
_get
_ref
147 # Instantiate a trace of this class.
149 def __call__(self
, name
=None, user_attributes
=None, uuid
=None, environment
=None):
150 trace_ptr
= native_bt
.trace_create(self
._ptr
)
152 if trace_ptr
is None:
153 raise bt2_error
._MemoryError
("cannot create trace class object")
155 trace
= bt2_trace
._Trace
._create
_from
_ptr
(trace_ptr
)
160 if user_attributes
is not None:
161 trace
._user
_attributes
= user_attributes
166 if environment
is not None:
167 for key
, value
in environment
.items():
168 trace
.environment
[key
] = value
172 def create_stream_class(
176 user_attributes
=None,
177 packet_context_field_class
=None,
178 event_common_context_field_class
=None,
179 default_clock_class
=None,
180 assigns_automatic_event_class_id
=True,
181 assigns_automatic_stream_id
=True,
182 supports_packets
=False,
183 packets_have_beginning_default_clock_snapshot
=False,
184 packets_have_end_default_clock_snapshot
=False,
185 supports_discarded_events
=False,
186 discarded_events_have_default_clock_snapshots
=False,
187 supports_discarded_packets
=False,
188 discarded_packets_have_default_clock_snapshots
=False,
190 # Validate parameters before we create the object.
191 bt2_stream_class
._StreamClass
._validate
_create
_params
(
194 packet_context_field_class
,
195 event_common_context_field_class
,
197 assigns_automatic_event_class_id
,
198 assigns_automatic_stream_id
,
200 packets_have_beginning_default_clock_snapshot
,
201 packets_have_end_default_clock_snapshot
,
202 supports_discarded_events
,
203 discarded_events_have_default_clock_snapshots
,
204 supports_discarded_packets
,
205 discarded_packets_have_default_clock_snapshots
,
208 if self
.assigns_automatic_stream_class_id
:
211 "id provided, but trace class assigns automatic stream class ids"
214 sc_ptr
= native_bt
.stream_class_create(self
._ptr
)
218 "id not provided, but trace class does not assign automatic stream class ids"
221 bt2_utils
._check
_uint
64(id)
222 sc_ptr
= native_bt
.stream_class_create_with_id(self
._ptr
, id)
224 sc
= bt2_stream_class
._StreamClass
._create
_from
_ptr
(sc_ptr
)
229 if user_attributes
is not None:
230 sc
._user
_attributes
= user_attributes
232 if event_common_context_field_class
is not None:
233 sc
._event
_common
_context
_field
_class
= event_common_context_field_class
235 if default_clock_class
is not None:
236 sc
._default
_clock
_class
= default_clock_class
238 # call after `sc._default_clock_class` because, if
239 # `packets_have_beginning_default_clock_snapshot` or
240 # `packets_have_end_default_clock_snapshot` is true, then this
241 # stream class needs a default clock class already.
242 sc
._set
_supports
_packets
(
244 packets_have_beginning_default_clock_snapshot
,
245 packets_have_end_default_clock_snapshot
,
248 # call after sc._set_supports_packets() because, if
249 # `packet_context_field_class` is not `None`, then this stream
250 # class needs to support packets already.
251 if packet_context_field_class
is not None:
252 sc
._packet
_context
_field
_class
= packet_context_field_class
254 sc
._assigns
_automatic
_event
_class
_id
= assigns_automatic_event_class_id
255 sc
._assigns
_automatic
_stream
_id
= assigns_automatic_stream_id
256 sc
._set
_supports
_discarded
_events
(
257 supports_discarded_events
, discarded_events_have_default_clock_snapshots
259 sc
._set
_supports
_discarded
_packets
(
260 supports_discarded_packets
, discarded_packets_have_default_clock_snapshots
264 def _user_attributes(self
, user_attributes
):
265 value
= bt2_value
.create_value(user_attributes
)
266 bt2_utils
._check
_type
(value
, bt2_value
.MapValue
)
267 native_bt
.trace_class_set_user_attributes(self
._ptr
, value
._ptr
)
269 _user_attributes
= property(fset
=_user_attributes
)
271 def _assigns_automatic_stream_class_id(self
, auto_id
):
272 bt2_utils
._check
_bool
(auto_id
)
273 return native_bt
.trace_class_set_assigns_automatic_stream_class_id(
277 _assigns_automatic_stream_class_id
= property(
278 fset
=_assigns_automatic_stream_class_id
281 # Field class creation methods.
283 def _check_field_class_create_status(self
, ptr
, type_name
):
285 raise bt2_error
._MemoryError
(
286 "cannot create {} field class".format(type_name
)
290 def _set_field_class_user_attrs(fc
, user_attributes
):
291 if user_attributes
is not None:
292 fc
._user
_attributes
= user_attributes
294 def create_bool_field_class(self
, user_attributes
=None):
295 field_class_ptr
= native_bt
.field_class_bool_create(self
._ptr
)
296 self
._check
_field
_class
_create
_status
(field_class_ptr
, "boolean")
297 fc
= bt2_field_class
._BoolFieldClass
._create
_from
_ptr
(field_class_ptr
)
298 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
301 def create_bit_array_field_class(self
, length
, user_attributes
=None):
302 bt2_utils
._check
_uint
64(length
)
304 if length
< 1 or length
> 64:
306 "invalid length {}: expecting a value in the [1, 64] range".format(
311 field_class_ptr
= native_bt
.field_class_bit_array_create(self
._ptr
, length
)
312 self
._check
_field
_class
_create
_status
(field_class_ptr
, "bit array")
313 fc
= bt2_field_class
._BitArrayFieldClass
._create
_from
_ptr
(field_class_ptr
)
314 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
317 def _create_integer_field_class(
323 preferred_display_base
,
326 field_class_ptr
= create_func(self
._ptr
)
327 self
._check
_field
_class
_create
_status
(field_class_ptr
, type_name
)
329 field_class
= py_cls
._create
_from
_ptr
(field_class_ptr
)
331 if field_value_range
is not None:
332 field_class
._field
_value
_range
= field_value_range
334 if preferred_display_base
is not None:
335 field_class
._preferred
_display
_base
= preferred_display_base
337 self
._set
_field
_class
_user
_attrs
(field_class
, user_attributes
)
340 def create_signed_integer_field_class(
341 self
, field_value_range
=None, preferred_display_base
=None, user_attributes
=None
343 return self
._create
_integer
_field
_class
(
344 native_bt
.field_class_integer_signed_create
,
345 bt2_field_class
._SignedIntegerFieldClass
,
348 preferred_display_base
,
352 def create_unsigned_integer_field_class(
353 self
, field_value_range
=None, preferred_display_base
=None, user_attributes
=None
355 return self
._create
_integer
_field
_class
(
356 native_bt
.field_class_integer_unsigned_create
,
357 bt2_field_class
._UnsignedIntegerFieldClass
,
360 preferred_display_base
,
364 def create_signed_enumeration_field_class(
365 self
, field_value_range
=None, preferred_display_base
=None, user_attributes
=None
367 return self
._create
_integer
_field
_class
(
368 native_bt
.field_class_enumeration_signed_create
,
369 bt2_field_class
._SignedEnumerationFieldClass
,
370 "signed enumeration",
372 preferred_display_base
,
376 def create_unsigned_enumeration_field_class(
377 self
, field_value_range
=None, preferred_display_base
=None, user_attributes
=None
379 return self
._create
_integer
_field
_class
(
380 native_bt
.field_class_enumeration_unsigned_create
,
381 bt2_field_class
._UnsignedEnumerationFieldClass
,
382 "unsigned enumeration",
384 preferred_display_base
,
388 def create_single_precision_real_field_class(self
, user_attributes
=None):
389 field_class_ptr
= native_bt
.field_class_real_single_precision_create(self
._ptr
)
390 self
._check
_field
_class
_create
_status
(field_class_ptr
, "single-precision real")
392 field_class
= bt2_field_class
._SinglePrecisionRealFieldClass
._create
_from
_ptr
(
396 self
._set
_field
_class
_user
_attrs
(field_class
, user_attributes
)
400 def create_double_precision_real_field_class(self
, user_attributes
=None):
401 field_class_ptr
= native_bt
.field_class_real_double_precision_create(self
._ptr
)
402 self
._check
_field
_class
_create
_status
(field_class_ptr
, "double-precision real")
404 field_class
= bt2_field_class
._DoublePrecisionRealFieldClass
._create
_from
_ptr
(
408 self
._set
_field
_class
_user
_attrs
(field_class
, user_attributes
)
412 def create_structure_field_class(self
, user_attributes
=None):
413 field_class_ptr
= native_bt
.field_class_structure_create(self
._ptr
)
414 self
._check
_field
_class
_create
_status
(field_class_ptr
, "structure")
415 fc
= bt2_field_class
._StructureFieldClass
._create
_from
_ptr
(field_class_ptr
)
416 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
419 def create_string_field_class(self
, user_attributes
=None):
420 field_class_ptr
= native_bt
.field_class_string_create(self
._ptr
)
421 self
._check
_field
_class
_create
_status
(field_class_ptr
, "string")
422 fc
= bt2_field_class
._StringFieldClass
._create
_from
_ptr
(field_class_ptr
)
423 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
426 def create_static_array_field_class(self
, elem_fc
, length
, user_attributes
=None):
427 bt2_utils
._check
_type
(elem_fc
, bt2_field_class
._FieldClass
)
428 bt2_utils
._check
_uint
64(length
)
429 ptr
= native_bt
.field_class_array_static_create(self
._ptr
, elem_fc
._ptr
, length
)
430 self
._check
_field
_class
_create
_status
(ptr
, "static array")
431 fc
= bt2_field_class
._StaticArrayFieldClass
._create
_from
_ptr
(ptr
)
432 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
435 def create_dynamic_array_field_class(
436 self
, elem_fc
, length_fc
=None, user_attributes
=None
438 bt2_utils
._check
_type
(elem_fc
, bt2_field_class
._FieldClass
)
441 if length_fc
is not None:
442 bt2_utils
._check
_type
(length_fc
, bt2_field_class
._UnsignedIntegerFieldClass
)
443 length_fc_ptr
= length_fc
._ptr
445 ptr
= native_bt
.field_class_array_dynamic_create(
446 self
._ptr
, elem_fc
._ptr
, length_fc_ptr
448 self
._check
_field
_class
_create
_status
(ptr
, "dynamic array")
449 fc
= bt2_field_class
._obj
_type
_from
_field
_class
_ptr
(ptr
)._create
_from
_ptr
(ptr
)
450 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
453 def create_option_without_selector_field_class(
454 self
, content_fc
, user_attributes
=None
456 bt2_utils
._check
_type
(content_fc
, bt2_field_class
._FieldClass
)
457 ptr
= native_bt
.field_class_option_without_selector_create(
458 self
._ptr
, content_fc
._ptr
460 self
._check
_field
_class
_create
_status
(ptr
, "option")
461 fc
= bt2_field_class
._obj
_type
_from
_field
_class
_ptr
(ptr
)._create
_from
_ptr
(ptr
)
462 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
465 def create_option_with_bool_selector_field_class(
466 self
, content_fc
, selector_fc
, selector_is_reversed
=False, user_attributes
=None
468 bt2_utils
._check
_type
(content_fc
, bt2_field_class
._FieldClass
)
469 bt2_utils
._check
_bool
(selector_is_reversed
)
470 bt2_utils
._check
_type
(selector_fc
, bt2_field_class
._BoolFieldClass
)
471 ptr
= native_bt
.field_class_option_with_selector_field_bool_create(
472 self
._ptr
, content_fc
._ptr
, selector_fc
._ptr
474 self
._check
_field
_class
_create
_status
(ptr
, "option")
475 fc
= bt2_field_class
._obj
_type
_from
_field
_class
_ptr
(ptr
)._create
_from
_ptr
(ptr
)
476 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
477 fc
._selector
_is
_reversed
= selector_is_reversed
480 def create_option_with_integer_selector_field_class(
481 self
, content_fc
, selector_fc
, ranges
, user_attributes
=None
483 bt2_utils
._check
_type
(content_fc
, bt2_field_class
._FieldClass
)
484 bt2_utils
._check
_type
(selector_fc
, bt2_field_class
._IntegerFieldClass
)
487 raise ValueError("integer range set is empty")
489 if isinstance(selector_fc
, bt2_field_class
._UnsignedIntegerFieldClass
):
490 bt2_utils
._check
_type
(ranges
, bt2_integer_range_set
.UnsignedIntegerRangeSet
)
491 ptr
= native_bt
.field_class_option_with_selector_field_integer_unsigned_create(
492 self
._ptr
, content_fc
._ptr
, selector_fc
._ptr
, ranges
._ptr
495 bt2_utils
._check
_type
(ranges
, bt2_integer_range_set
.SignedIntegerRangeSet
)
497 native_bt
.field_class_option_with_selector_field_integer_signed_create(
498 self
._ptr
, content_fc
._ptr
, selector_fc
._ptr
, ranges
._ptr
502 self
._check
_field
_class
_create
_status
(ptr
, "option")
503 fc
= bt2_field_class
._obj
_type
_from
_field
_class
_ptr
(ptr
)._create
_from
_ptr
(ptr
)
504 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
507 def create_variant_field_class(self
, selector_fc
=None, user_attributes
=None):
508 selector_fc_ptr
= None
510 if selector_fc
is not None:
511 bt2_utils
._check
_type
(selector_fc
, bt2_field_class
._IntegerFieldClass
)
512 selector_fc_ptr
= selector_fc
._ptr
514 ptr
= native_bt
.field_class_variant_create(self
._ptr
, selector_fc_ptr
)
515 self
._check
_field
_class
_create
_status
(ptr
, "variant")
516 fc
= bt2_field_class
._obj
_type
_from
_field
_class
_ptr
(ptr
)._create
_from
_ptr
(ptr
)
517 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)