1 # SPDX-License-Identifier: MIT
3 # Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com>
4 # Copyright (c) 2018 Francis Deslauriers <francis.deslauriers@efficios.com>
5 # Copyright (c) 2019 Simon Marchi <simon.marchi@efficios.com>
7 from bt2
import native_bt
8 from bt2
import utils
as bt2_utils
9 from bt2
import object as bt2_object
10 from bt2
import stream_class
as bt2_stream_class
11 from bt2
import field_class
as bt2_field_class
12 from bt2
import integer_range_set
as bt2_integer_range_set
13 from bt2
import trace
as bt2_trace
14 from bt2
import value
as bt2_value
15 import collections
.abc
20 def _trace_class_destruction_listener_from_native(
21 user_listener
, handle
, trace_class_ptr
23 trace_class
= _TraceClass
._create
_from
_ptr
_and
_get
_ref
(trace_class_ptr
)
24 user_listener(trace_class
)
28 class _TraceClassConst(bt2_object
._SharedObject
, collections
.abc
.Mapping
):
31 native_bt
.trace_class_get_ref(ptr
)
35 native_bt
.trace_class_put_ref(ptr
)
37 _borrow_stream_class_ptr_by_index
= staticmethod(
38 native_bt
.trace_class_borrow_stream_class_by_index_const
40 _borrow_stream_class_ptr_by_id
= staticmethod(
41 native_bt
.trace_class_borrow_stream_class_by_id_const
43 _borrow_user_attributes_ptr
= staticmethod(
44 native_bt
.trace_class_borrow_user_attributes_const
46 _stream_class_pycls
= bt2_stream_class
._StreamClassConst
47 _create_value_from_ptr_and_get_ref
= staticmethod(
48 bt2_value
._create
_from
_const
_ptr
_and
_get
_ref
52 def user_attributes(self
):
53 ptr
= self
._borrow
_user
_attributes
_ptr
(self
._ptr
)
54 assert ptr
is not None
55 return self
._create
_value
_from
_ptr
_and
_get
_ref
(ptr
)
57 # Number of stream classes in this trace class.
60 count
= native_bt
.trace_class_get_stream_class_count(self
._ptr
)
64 # Get a stream class by stream id.
66 def __getitem__(self
, key
):
67 bt2_utils
._check
_uint
64(key
)
69 sc_ptr
= self
._borrow
_stream
_class
_ptr
_by
_id
(self
._ptr
, key
)
73 return self
._stream
_class
_pycls
._create
_from
_ptr
_and
_get
_ref
(sc_ptr
)
76 for idx
in range(len(self
)):
77 sc_ptr
= self
._borrow
_stream
_class
_ptr
_by
_index
(self
._ptr
, idx
)
78 assert sc_ptr
is not None
80 id = native_bt
.stream_class_get_id(sc_ptr
)
86 def assigns_automatic_stream_class_id(self
):
87 return native_bt
.trace_class_assigns_automatic_stream_class_id(self
._ptr
)
89 # Add a listener to be called when the trace class is destroyed.
91 def add_destruction_listener(self
, listener
):
92 if not callable(listener
):
93 raise TypeError("'listener' parameter is not callable")
95 handle
= bt2_utils
._ListenerHandle
(self
.addr
)
97 listener_from_native
= functools
.partial(
98 _trace_class_destruction_listener_from_native
, listener
, handle
101 fn
= native_bt
.bt2_trace_class_add_destruction_listener
102 status
, listener_id
= fn(self
._ptr
, listener_from_native
)
103 bt2_utils
._handle
_func
_status
(
104 status
, "cannot add destruction listener to trace class object"
107 handle
._set
_listener
_id
(listener_id
)
111 def remove_destruction_listener(self
, listener_handle
):
112 bt2_utils
._check
_type
(listener_handle
, bt2_utils
._ListenerHandle
)
114 if listener_handle
._addr
!= self
.addr
:
116 "This trace class destruction listener does not match the trace class object."
119 if listener_handle
._listener
_id
is None:
121 "This trace class destruction listener was already removed."
124 status
= native_bt
.trace_class_remove_destruction_listener(
125 self
._ptr
, listener_handle
._listener
_id
127 bt2_utils
._handle
_func
_status
(status
)
128 listener_handle
._invalidate
()
131 class _TraceClass(_TraceClassConst
):
132 _borrow_stream_class_ptr_by_index
= staticmethod(
133 native_bt
.trace_class_borrow_stream_class_by_index
135 _borrow_stream_class_ptr_by_id
= staticmethod(
136 native_bt
.trace_class_borrow_stream_class_by_id
138 _borrow_user_attributes_ptr
= staticmethod(
139 native_bt
.trace_class_borrow_user_attributes
141 _stream_class_pycls
= bt2_stream_class
._StreamClass
142 _create_value_from_ptr_and_get_ref
= staticmethod(
143 bt2_value
._create
_from
_ptr
_and
_get
_ref
146 # Instantiate a trace of this class.
148 def __call__(self
, name
=None, user_attributes
=None, uuid
=None, environment
=None):
149 trace_ptr
= native_bt
.trace_create(self
._ptr
)
151 if trace_ptr
is None:
152 raise bt2
._MemoryError("cannot create trace class object")
154 trace
= bt2_trace
._Trace
._create
_from
_ptr
(trace_ptr
)
159 if user_attributes
is not None:
160 trace
._user
_attributes
= user_attributes
165 if environment
is not None:
166 for key
, value
in environment
.items():
167 trace
.environment
[key
] = value
171 def create_stream_class(
175 user_attributes
=None,
176 packet_context_field_class
=None,
177 event_common_context_field_class
=None,
178 default_clock_class
=None,
179 assigns_automatic_event_class_id
=True,
180 assigns_automatic_stream_id
=True,
181 supports_packets
=False,
182 packets_have_beginning_default_clock_snapshot
=False,
183 packets_have_end_default_clock_snapshot
=False,
184 supports_discarded_events
=False,
185 discarded_events_have_default_clock_snapshots
=False,
186 supports_discarded_packets
=False,
187 discarded_packets_have_default_clock_snapshots
=False,
189 # Validate parameters before we create the object.
190 bt2_stream_class
._StreamClass
._validate
_create
_params
(
193 packet_context_field_class
,
194 event_common_context_field_class
,
196 assigns_automatic_event_class_id
,
197 assigns_automatic_stream_id
,
199 packets_have_beginning_default_clock_snapshot
,
200 packets_have_end_default_clock_snapshot
,
201 supports_discarded_events
,
202 discarded_events_have_default_clock_snapshots
,
203 supports_discarded_packets
,
204 discarded_packets_have_default_clock_snapshots
,
207 if self
.assigns_automatic_stream_class_id
:
210 "id provided, but trace class assigns automatic stream class ids"
213 sc_ptr
= native_bt
.stream_class_create(self
._ptr
)
217 "id not provided, but trace class does not assign automatic stream class ids"
220 bt2_utils
._check
_uint
64(id)
221 sc_ptr
= native_bt
.stream_class_create_with_id(self
._ptr
, id)
223 sc
= bt2_stream_class
._StreamClass
._create
_from
_ptr
(sc_ptr
)
228 if user_attributes
is not None:
229 sc
._user
_attributes
= user_attributes
231 if event_common_context_field_class
is not None:
232 sc
._event
_common
_context
_field
_class
= event_common_context_field_class
234 if default_clock_class
is not None:
235 sc
._default
_clock
_class
= default_clock_class
237 # call after `sc._default_clock_class` because, if
238 # `packets_have_beginning_default_clock_snapshot` or
239 # `packets_have_end_default_clock_snapshot` is true, then this
240 # stream class needs a default clock class already.
241 sc
._set
_supports
_packets
(
243 packets_have_beginning_default_clock_snapshot
,
244 packets_have_end_default_clock_snapshot
,
247 # call after sc._set_supports_packets() because, if
248 # `packet_context_field_class` is not `None`, then this stream
249 # class needs to support packets already.
250 if packet_context_field_class
is not None:
251 sc
._packet
_context
_field
_class
= packet_context_field_class
253 sc
._assigns
_automatic
_event
_class
_id
= assigns_automatic_event_class_id
254 sc
._assigns
_automatic
_stream
_id
= assigns_automatic_stream_id
255 sc
._set
_supports
_discarded
_events
(
256 supports_discarded_events
, discarded_events_have_default_clock_snapshots
258 sc
._set
_supports
_discarded
_packets
(
259 supports_discarded_packets
, discarded_packets_have_default_clock_snapshots
263 def _user_attributes(self
, user_attributes
):
264 value
= bt2_value
.create_value(user_attributes
)
265 bt2_utils
._check
_type
(value
, bt2_value
.MapValue
)
266 native_bt
.trace_class_set_user_attributes(self
._ptr
, value
._ptr
)
268 _user_attributes
= property(fset
=_user_attributes
)
270 def _assigns_automatic_stream_class_id(self
, auto_id
):
271 bt2_utils
._check
_bool
(auto_id
)
272 return native_bt
.trace_class_set_assigns_automatic_stream_class_id(
276 _assigns_automatic_stream_class_id
= property(
277 fset
=_assigns_automatic_stream_class_id
280 # Field class creation methods.
282 def _check_field_class_create_status(self
, ptr
, type_name
):
284 raise bt2
._MemoryError("cannot create {} field class".format(type_name
))
287 def _set_field_class_user_attrs(fc
, user_attributes
):
288 if user_attributes
is not None:
289 fc
._user
_attributes
= user_attributes
291 def create_bool_field_class(self
, user_attributes
=None):
292 field_class_ptr
= native_bt
.field_class_bool_create(self
._ptr
)
293 self
._check
_field
_class
_create
_status
(field_class_ptr
, "boolean")
294 fc
= bt2_field_class
._BoolFieldClass
._create
_from
_ptr
(field_class_ptr
)
295 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
298 def create_bit_array_field_class(self
, length
, user_attributes
=None):
299 bt2_utils
._check
_uint
64(length
)
301 if length
< 1 or length
> 64:
303 "invalid length {}: expecting a value in the [1, 64] range".format(
308 field_class_ptr
= native_bt
.field_class_bit_array_create(self
._ptr
, length
)
309 self
._check
_field
_class
_create
_status
(field_class_ptr
, "bit array")
310 fc
= bt2_field_class
._BitArrayFieldClass
._create
_from
_ptr
(field_class_ptr
)
311 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
314 def _create_integer_field_class(
320 preferred_display_base
,
323 field_class_ptr
= create_func(self
._ptr
)
324 self
._check
_field
_class
_create
_status
(field_class_ptr
, type_name
)
326 field_class
= py_cls
._create
_from
_ptr
(field_class_ptr
)
328 if field_value_range
is not None:
329 field_class
._field
_value
_range
= field_value_range
331 if preferred_display_base
is not None:
332 field_class
._preferred
_display
_base
= preferred_display_base
334 self
._set
_field
_class
_user
_attrs
(field_class
, user_attributes
)
337 def create_signed_integer_field_class(
338 self
, field_value_range
=None, preferred_display_base
=None, user_attributes
=None
340 return self
._create
_integer
_field
_class
(
341 native_bt
.field_class_integer_signed_create
,
342 bt2_field_class
._SignedIntegerFieldClass
,
345 preferred_display_base
,
349 def create_unsigned_integer_field_class(
350 self
, field_value_range
=None, preferred_display_base
=None, user_attributes
=None
352 return self
._create
_integer
_field
_class
(
353 native_bt
.field_class_integer_unsigned_create
,
354 bt2_field_class
._UnsignedIntegerFieldClass
,
357 preferred_display_base
,
361 def create_signed_enumeration_field_class(
362 self
, field_value_range
=None, preferred_display_base
=None, user_attributes
=None
364 return self
._create
_integer
_field
_class
(
365 native_bt
.field_class_enumeration_signed_create
,
366 bt2_field_class
._SignedEnumerationFieldClass
,
367 "signed enumeration",
369 preferred_display_base
,
373 def create_unsigned_enumeration_field_class(
374 self
, field_value_range
=None, preferred_display_base
=None, user_attributes
=None
376 return self
._create
_integer
_field
_class
(
377 native_bt
.field_class_enumeration_unsigned_create
,
378 bt2_field_class
._UnsignedEnumerationFieldClass
,
379 "unsigned enumeration",
381 preferred_display_base
,
385 def create_single_precision_real_field_class(self
, user_attributes
=None):
386 field_class_ptr
= native_bt
.field_class_real_single_precision_create(self
._ptr
)
387 self
._check
_field
_class
_create
_status
(field_class_ptr
, "single-precision real")
389 field_class
= bt2_field_class
._SinglePrecisionRealFieldClass
._create
_from
_ptr
(
393 self
._set
_field
_class
_user
_attrs
(field_class
, user_attributes
)
397 def create_double_precision_real_field_class(self
, user_attributes
=None):
398 field_class_ptr
= native_bt
.field_class_real_double_precision_create(self
._ptr
)
399 self
._check
_field
_class
_create
_status
(field_class_ptr
, "double-precision real")
401 field_class
= bt2_field_class
._DoublePrecisionRealFieldClass
._create
_from
_ptr
(
405 self
._set
_field
_class
_user
_attrs
(field_class
, user_attributes
)
409 def create_structure_field_class(self
, user_attributes
=None):
410 field_class_ptr
= native_bt
.field_class_structure_create(self
._ptr
)
411 self
._check
_field
_class
_create
_status
(field_class_ptr
, "structure")
412 fc
= bt2_field_class
._StructureFieldClass
._create
_from
_ptr
(field_class_ptr
)
413 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
416 def create_string_field_class(self
, user_attributes
=None):
417 field_class_ptr
= native_bt
.field_class_string_create(self
._ptr
)
418 self
._check
_field
_class
_create
_status
(field_class_ptr
, "string")
419 fc
= bt2_field_class
._StringFieldClass
._create
_from
_ptr
(field_class_ptr
)
420 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
423 def create_static_array_field_class(self
, elem_fc
, length
, user_attributes
=None):
424 bt2_utils
._check
_type
(elem_fc
, bt2_field_class
._FieldClass
)
425 bt2_utils
._check
_uint
64(length
)
426 ptr
= native_bt
.field_class_array_static_create(self
._ptr
, elem_fc
._ptr
, length
)
427 self
._check
_field
_class
_create
_status
(ptr
, "static array")
428 fc
= bt2_field_class
._StaticArrayFieldClass
._create
_from
_ptr
(ptr
)
429 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
432 def create_dynamic_array_field_class(
433 self
, elem_fc
, length_fc
=None, user_attributes
=None
435 bt2_utils
._check
_type
(elem_fc
, bt2_field_class
._FieldClass
)
438 if length_fc
is not None:
439 bt2_utils
._check
_type
(length_fc
, bt2_field_class
._UnsignedIntegerFieldClass
)
440 length_fc_ptr
= length_fc
._ptr
442 ptr
= native_bt
.field_class_array_dynamic_create(
443 self
._ptr
, elem_fc
._ptr
, length_fc_ptr
445 self
._check
_field
_class
_create
_status
(ptr
, "dynamic array")
446 fc
= bt2_field_class
._obj
_type
_from
_field
_class
_ptr
(ptr
)._create
_from
_ptr
(ptr
)
447 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
450 def create_option_without_selector_field_class(
451 self
, content_fc
, user_attributes
=None
453 bt2_utils
._check
_type
(content_fc
, bt2_field_class
._FieldClass
)
454 ptr
= native_bt
.field_class_option_without_selector_create(
455 self
._ptr
, content_fc
._ptr
457 self
._check
_field
_class
_create
_status
(ptr
, "option")
458 fc
= bt2_field_class
._obj
_type
_from
_field
_class
_ptr
(ptr
)._create
_from
_ptr
(ptr
)
459 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
462 def create_option_with_bool_selector_field_class(
463 self
, content_fc
, selector_fc
, selector_is_reversed
=False, user_attributes
=None
465 bt2_utils
._check
_type
(content_fc
, bt2_field_class
._FieldClass
)
466 bt2_utils
._check
_bool
(selector_is_reversed
)
467 bt2_utils
._check
_type
(selector_fc
, bt2_field_class
._BoolFieldClass
)
468 ptr
= native_bt
.field_class_option_with_selector_field_bool_create(
469 self
._ptr
, content_fc
._ptr
, selector_fc
._ptr
471 self
._check
_field
_class
_create
_status
(ptr
, "option")
472 fc
= bt2_field_class
._obj
_type
_from
_field
_class
_ptr
(ptr
)._create
_from
_ptr
(ptr
)
473 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
474 fc
._selector
_is
_reversed
= selector_is_reversed
477 def create_option_with_integer_selector_field_class(
478 self
, content_fc
, selector_fc
, ranges
, user_attributes
=None
480 bt2_utils
._check
_type
(content_fc
, bt2_field_class
._FieldClass
)
481 bt2_utils
._check
_type
(selector_fc
, bt2_field_class
._IntegerFieldClass
)
484 raise ValueError("integer range set is empty")
486 if isinstance(selector_fc
, bt2_field_class
._UnsignedIntegerFieldClass
):
487 bt2_utils
._check
_type
(ranges
, bt2_integer_range_set
.UnsignedIntegerRangeSet
)
488 ptr
= native_bt
.field_class_option_with_selector_field_integer_unsigned_create(
489 self
._ptr
, content_fc
._ptr
, selector_fc
._ptr
, ranges
._ptr
492 bt2_utils
._check
_type
(ranges
, bt2_integer_range_set
.SignedIntegerRangeSet
)
494 native_bt
.field_class_option_with_selector_field_integer_signed_create(
495 self
._ptr
, content_fc
._ptr
, selector_fc
._ptr
, ranges
._ptr
499 self
._check
_field
_class
_create
_status
(ptr
, "option")
500 fc
= bt2_field_class
._obj
_type
_from
_field
_class
_ptr
(ptr
)._create
_from
_ptr
(ptr
)
501 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
504 def create_variant_field_class(self
, selector_fc
=None, user_attributes
=None):
505 selector_fc_ptr
= None
507 if selector_fc
is not None:
508 bt2_utils
._check
_type
(selector_fc
, bt2_field_class
._IntegerFieldClass
)
509 selector_fc_ptr
= selector_fc
._ptr
511 ptr
= native_bt
.field_class_variant_create(self
._ptr
, selector_fc_ptr
)
512 self
._check
_field
_class
_create
_status
(ptr
, "variant")
513 fc
= bt2_field_class
._obj
_type
_from
_field
_class
_ptr
(ptr
)._create
_from
_ptr
(ptr
)
514 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)