1 # SPDX-License-Identifier: MIT
3 # Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com>
4 # Copyright (c) 2018 Francis Deslauriers <francis.deslauriers@efficios.com>
5 # Copyright (c) 2019 Simon Marchi <simon.marchi@efficios.com>
7 from bt2
import native_bt
, utils
, object
8 from bt2
import stream_class
as bt2_stream_class
9 from bt2
import field_class
as bt2_field_class
10 from bt2
import integer_range_set
as bt2_integer_range_set
11 from bt2
import trace
as bt2_trace
12 from bt2
import value
as bt2_value
13 import collections
.abc
18 def _trace_class_destruction_listener_from_native(
19 user_listener
, handle
, trace_class_ptr
21 trace_class
= _TraceClass
._create
_from
_ptr
_and
_get
_ref
(trace_class_ptr
)
22 user_listener(trace_class
)
26 class _TraceClassConst(object._SharedObject
, collections
.abc
.Mapping
):
29 native_bt
.trace_class_get_ref(ptr
)
33 native_bt
.trace_class_put_ref(ptr
)
35 _borrow_stream_class_ptr_by_index
= staticmethod(
36 native_bt
.trace_class_borrow_stream_class_by_index_const
38 _borrow_stream_class_ptr_by_id
= staticmethod(
39 native_bt
.trace_class_borrow_stream_class_by_id_const
41 _borrow_user_attributes_ptr
= staticmethod(
42 native_bt
.trace_class_borrow_user_attributes_const
44 _stream_class_pycls
= bt2_stream_class
._StreamClassConst
45 _create_value_from_ptr_and_get_ref
= staticmethod(
46 bt2_value
._create
_from
_const
_ptr
_and
_get
_ref
50 def user_attributes(self
):
51 ptr
= self
._borrow
_user
_attributes
_ptr
(self
._ptr
)
52 assert ptr
is not None
53 return self
._create
_value
_from
_ptr
_and
_get
_ref
(ptr
)
55 # Number of stream classes in this trace class.
58 count
= native_bt
.trace_class_get_stream_class_count(self
._ptr
)
62 # Get a stream class by stream id.
64 def __getitem__(self
, key
):
65 utils
._check
_uint
64(key
)
67 sc_ptr
= self
._borrow
_stream
_class
_ptr
_by
_id
(self
._ptr
, key
)
71 return self
._stream
_class
_pycls
._create
_from
_ptr
_and
_get
_ref
(sc_ptr
)
74 for idx
in range(len(self
)):
75 sc_ptr
= self
._borrow
_stream
_class
_ptr
_by
_index
(self
._ptr
, idx
)
76 assert sc_ptr
is not None
78 id = native_bt
.stream_class_get_id(sc_ptr
)
84 def assigns_automatic_stream_class_id(self
):
85 return native_bt
.trace_class_assigns_automatic_stream_class_id(self
._ptr
)
87 # Add a listener to be called when the trace class is destroyed.
89 def add_destruction_listener(self
, listener
):
91 if not callable(listener
):
92 raise TypeError("'listener' parameter is not callable")
94 handle
= utils
._ListenerHandle
(self
.addr
)
96 listener_from_native
= functools
.partial(
97 _trace_class_destruction_listener_from_native
, listener
, handle
100 fn
= native_bt
.bt2_trace_class_add_destruction_listener
101 status
, listener_id
= fn(self
._ptr
, listener_from_native
)
102 utils
._handle
_func
_status
(
103 status
, "cannot add destruction listener to trace class object"
106 handle
._set
_listener
_id
(listener_id
)
110 def remove_destruction_listener(self
, listener_handle
):
111 utils
._check
_type
(listener_handle
, utils
._ListenerHandle
)
113 if listener_handle
._addr
!= self
.addr
:
115 "This trace class destruction listener does not match the trace class object."
118 if listener_handle
._listener
_id
is None:
120 "This trace class destruction listener was already removed."
123 status
= native_bt
.trace_class_remove_destruction_listener(
124 self
._ptr
, listener_handle
._listener
_id
126 utils
._handle
_func
_status
(status
)
127 listener_handle
._invalidate
()
130 class _TraceClass(_TraceClassConst
):
131 _borrow_stream_class_ptr_by_index
= staticmethod(
132 native_bt
.trace_class_borrow_stream_class_by_index
134 _borrow_stream_class_ptr_by_id
= staticmethod(
135 native_bt
.trace_class_borrow_stream_class_by_id
137 _borrow_user_attributes_ptr
= staticmethod(
138 native_bt
.trace_class_borrow_user_attributes
140 _stream_class_pycls
= bt2_stream_class
._StreamClass
141 _create_value_from_ptr_and_get_ref
= staticmethod(
142 bt2_value
._create
_from
_ptr
_and
_get
_ref
145 # Instantiate a trace of this class.
147 def __call__(self
, name
=None, user_attributes
=None, uuid
=None, environment
=None):
148 trace_ptr
= native_bt
.trace_create(self
._ptr
)
150 if trace_ptr
is None:
151 raise bt2
._MemoryError("cannot create trace class object")
153 trace
= bt2_trace
._Trace
._create
_from
_ptr
(trace_ptr
)
158 if user_attributes
is not None:
159 trace
._user
_attributes
= user_attributes
164 if environment
is not None:
165 for key
, value
in environment
.items():
166 trace
.environment
[key
] = value
170 def create_stream_class(
174 user_attributes
=None,
175 packet_context_field_class
=None,
176 event_common_context_field_class
=None,
177 default_clock_class
=None,
178 assigns_automatic_event_class_id
=True,
179 assigns_automatic_stream_id
=True,
180 supports_packets
=False,
181 packets_have_beginning_default_clock_snapshot
=False,
182 packets_have_end_default_clock_snapshot
=False,
183 supports_discarded_events
=False,
184 discarded_events_have_default_clock_snapshots
=False,
185 supports_discarded_packets
=False,
186 discarded_packets_have_default_clock_snapshots
=False,
188 # Validate parameters before we create the object.
189 bt2_stream_class
._StreamClass
._validate
_create
_params
(
192 packet_context_field_class
,
193 event_common_context_field_class
,
195 assigns_automatic_event_class_id
,
196 assigns_automatic_stream_id
,
198 packets_have_beginning_default_clock_snapshot
,
199 packets_have_end_default_clock_snapshot
,
200 supports_discarded_events
,
201 discarded_events_have_default_clock_snapshots
,
202 supports_discarded_packets
,
203 discarded_packets_have_default_clock_snapshots
,
206 if self
.assigns_automatic_stream_class_id
:
209 "id provided, but trace class assigns automatic stream class ids"
212 sc_ptr
= native_bt
.stream_class_create(self
._ptr
)
216 "id not provided, but trace class does not assign automatic stream class ids"
219 utils
._check
_uint
64(id)
220 sc_ptr
= native_bt
.stream_class_create_with_id(self
._ptr
, id)
222 sc
= bt2_stream_class
._StreamClass
._create
_from
_ptr
(sc_ptr
)
227 if user_attributes
is not None:
228 sc
._user
_attributes
= user_attributes
230 if event_common_context_field_class
is not None:
231 sc
._event
_common
_context
_field
_class
= event_common_context_field_class
233 if default_clock_class
is not None:
234 sc
._default
_clock
_class
= default_clock_class
236 # call after `sc._default_clock_class` because, if
237 # `packets_have_beginning_default_clock_snapshot` or
238 # `packets_have_end_default_clock_snapshot` is true, then this
239 # stream class needs a default clock class already.
240 sc
._set
_supports
_packets
(
242 packets_have_beginning_default_clock_snapshot
,
243 packets_have_end_default_clock_snapshot
,
246 # call after sc._set_supports_packets() because, if
247 # `packet_context_field_class` is not `None`, then this stream
248 # class needs to support packets already.
249 if packet_context_field_class
is not None:
250 sc
._packet
_context
_field
_class
= packet_context_field_class
252 sc
._assigns
_automatic
_event
_class
_id
= assigns_automatic_event_class_id
253 sc
._assigns
_automatic
_stream
_id
= assigns_automatic_stream_id
254 sc
._set
_supports
_discarded
_events
(
255 supports_discarded_events
, discarded_events_have_default_clock_snapshots
257 sc
._set
_supports
_discarded
_packets
(
258 supports_discarded_packets
, discarded_packets_have_default_clock_snapshots
262 def _user_attributes(self
, user_attributes
):
263 value
= bt2_value
.create_value(user_attributes
)
264 utils
._check
_type
(value
, bt2_value
.MapValue
)
265 native_bt
.trace_class_set_user_attributes(self
._ptr
, value
._ptr
)
267 _user_attributes
= property(fset
=_user_attributes
)
269 def _assigns_automatic_stream_class_id(self
, auto_id
):
270 utils
._check
_bool
(auto_id
)
271 return native_bt
.trace_class_set_assigns_automatic_stream_class_id(
275 _assigns_automatic_stream_class_id
= property(
276 fset
=_assigns_automatic_stream_class_id
279 # Field class creation methods.
281 def _check_field_class_create_status(self
, ptr
, type_name
):
283 raise bt2
._MemoryError("cannot create {} field class".format(type_name
))
286 def _set_field_class_user_attrs(fc
, user_attributes
):
287 if user_attributes
is not None:
288 fc
._user
_attributes
= user_attributes
290 def create_bool_field_class(self
, user_attributes
=None):
291 field_class_ptr
= native_bt
.field_class_bool_create(self
._ptr
)
292 self
._check
_field
_class
_create
_status
(field_class_ptr
, "boolean")
293 fc
= bt2_field_class
._BoolFieldClass
._create
_from
_ptr
(field_class_ptr
)
294 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
297 def create_bit_array_field_class(self
, length
, user_attributes
=None):
298 utils
._check
_uint
64(length
)
300 if length
< 1 or length
> 64:
302 "invalid length {}: expecting a value in the [1, 64] range".format(
307 field_class_ptr
= native_bt
.field_class_bit_array_create(self
._ptr
, length
)
308 self
._check
_field
_class
_create
_status
(field_class_ptr
, "bit array")
309 fc
= bt2_field_class
._BitArrayFieldClass
._create
_from
_ptr
(field_class_ptr
)
310 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
313 def _create_integer_field_class(
319 preferred_display_base
,
322 field_class_ptr
= create_func(self
._ptr
)
323 self
._check
_field
_class
_create
_status
(field_class_ptr
, type_name
)
325 field_class
= py_cls
._create
_from
_ptr
(field_class_ptr
)
327 if field_value_range
is not None:
328 field_class
._field
_value
_range
= field_value_range
330 if preferred_display_base
is not None:
331 field_class
._preferred
_display
_base
= preferred_display_base
333 self
._set
_field
_class
_user
_attrs
(field_class
, user_attributes
)
336 def create_signed_integer_field_class(
337 self
, field_value_range
=None, preferred_display_base
=None, user_attributes
=None
339 return self
._create
_integer
_field
_class
(
340 native_bt
.field_class_integer_signed_create
,
341 bt2_field_class
._SignedIntegerFieldClass
,
344 preferred_display_base
,
348 def create_unsigned_integer_field_class(
349 self
, field_value_range
=None, preferred_display_base
=None, user_attributes
=None
351 return self
._create
_integer
_field
_class
(
352 native_bt
.field_class_integer_unsigned_create
,
353 bt2_field_class
._UnsignedIntegerFieldClass
,
356 preferred_display_base
,
360 def create_signed_enumeration_field_class(
361 self
, field_value_range
=None, preferred_display_base
=None, user_attributes
=None
363 return self
._create
_integer
_field
_class
(
364 native_bt
.field_class_enumeration_signed_create
,
365 bt2_field_class
._SignedEnumerationFieldClass
,
366 "signed enumeration",
368 preferred_display_base
,
372 def create_unsigned_enumeration_field_class(
373 self
, field_value_range
=None, preferred_display_base
=None, user_attributes
=None
375 return self
._create
_integer
_field
_class
(
376 native_bt
.field_class_enumeration_unsigned_create
,
377 bt2_field_class
._UnsignedEnumerationFieldClass
,
378 "unsigned enumeration",
380 preferred_display_base
,
384 def create_single_precision_real_field_class(self
, user_attributes
=None):
385 field_class_ptr
= native_bt
.field_class_real_single_precision_create(self
._ptr
)
386 self
._check
_field
_class
_create
_status
(field_class_ptr
, "single-precision real")
388 field_class
= bt2_field_class
._SinglePrecisionRealFieldClass
._create
_from
_ptr
(
392 self
._set
_field
_class
_user
_attrs
(field_class
, user_attributes
)
396 def create_double_precision_real_field_class(self
, user_attributes
=None):
397 field_class_ptr
= native_bt
.field_class_real_double_precision_create(self
._ptr
)
398 self
._check
_field
_class
_create
_status
(field_class_ptr
, "double-precision real")
400 field_class
= bt2_field_class
._DoublePrecisionRealFieldClass
._create
_from
_ptr
(
404 self
._set
_field
_class
_user
_attrs
(field_class
, user_attributes
)
408 def create_structure_field_class(self
, user_attributes
=None):
409 field_class_ptr
= native_bt
.field_class_structure_create(self
._ptr
)
410 self
._check
_field
_class
_create
_status
(field_class_ptr
, "structure")
411 fc
= bt2_field_class
._StructureFieldClass
._create
_from
_ptr
(field_class_ptr
)
412 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
415 def create_string_field_class(self
, user_attributes
=None):
416 field_class_ptr
= native_bt
.field_class_string_create(self
._ptr
)
417 self
._check
_field
_class
_create
_status
(field_class_ptr
, "string")
418 fc
= bt2_field_class
._StringFieldClass
._create
_from
_ptr
(field_class_ptr
)
419 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
422 def create_static_array_field_class(self
, elem_fc
, length
, user_attributes
=None):
423 utils
._check
_type
(elem_fc
, bt2_field_class
._FieldClass
)
424 utils
._check
_uint
64(length
)
425 ptr
= native_bt
.field_class_array_static_create(self
._ptr
, elem_fc
._ptr
, length
)
426 self
._check
_field
_class
_create
_status
(ptr
, "static array")
427 fc
= bt2_field_class
._StaticArrayFieldClass
._create
_from
_ptr
(ptr
)
428 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
431 def create_dynamic_array_field_class(
432 self
, elem_fc
, length_fc
=None, user_attributes
=None
434 utils
._check
_type
(elem_fc
, bt2_field_class
._FieldClass
)
437 if length_fc
is not None:
438 utils
._check
_type
(length_fc
, bt2_field_class
._UnsignedIntegerFieldClass
)
439 length_fc_ptr
= length_fc
._ptr
441 ptr
= native_bt
.field_class_array_dynamic_create(
442 self
._ptr
, elem_fc
._ptr
, length_fc_ptr
444 self
._check
_field
_class
_create
_status
(ptr
, "dynamic array")
445 fc
= bt2_field_class
._obj
_type
_from
_field
_class
_ptr
(ptr
)._create
_from
_ptr
(ptr
)
446 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
449 def create_option_without_selector_field_class(
450 self
, content_fc
, user_attributes
=None
452 utils
._check
_type
(content_fc
, bt2_field_class
._FieldClass
)
453 ptr
= native_bt
.field_class_option_without_selector_create(
454 self
._ptr
, content_fc
._ptr
456 self
._check
_field
_class
_create
_status
(ptr
, "option")
457 fc
= bt2_field_class
._obj
_type
_from
_field
_class
_ptr
(ptr
)._create
_from
_ptr
(ptr
)
458 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
461 def create_option_with_bool_selector_field_class(
462 self
, content_fc
, selector_fc
, selector_is_reversed
=False, user_attributes
=None
464 utils
._check
_type
(content_fc
, bt2_field_class
._FieldClass
)
465 utils
._check
_bool
(selector_is_reversed
)
466 utils
._check
_type
(selector_fc
, bt2_field_class
._BoolFieldClass
)
467 ptr
= native_bt
.field_class_option_with_selector_field_bool_create(
468 self
._ptr
, content_fc
._ptr
, selector_fc
._ptr
470 self
._check
_field
_class
_create
_status
(ptr
, "option")
471 fc
= bt2_field_class
._obj
_type
_from
_field
_class
_ptr
(ptr
)._create
_from
_ptr
(ptr
)
472 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
473 fc
._selector
_is
_reversed
= selector_is_reversed
476 def create_option_with_integer_selector_field_class(
477 self
, content_fc
, selector_fc
, ranges
, user_attributes
=None
479 utils
._check
_type
(content_fc
, bt2_field_class
._FieldClass
)
480 utils
._check
_type
(selector_fc
, bt2_field_class
._IntegerFieldClass
)
483 raise ValueError("integer range set is empty")
485 if isinstance(selector_fc
, bt2_field_class
._UnsignedIntegerFieldClass
):
486 utils
._check
_type
(ranges
, bt2_integer_range_set
.UnsignedIntegerRangeSet
)
487 ptr
= native_bt
.field_class_option_with_selector_field_integer_unsigned_create(
488 self
._ptr
, content_fc
._ptr
, selector_fc
._ptr
, ranges
._ptr
491 utils
._check
_type
(ranges
, bt2_integer_range_set
.SignedIntegerRangeSet
)
493 native_bt
.field_class_option_with_selector_field_integer_signed_create(
494 self
._ptr
, content_fc
._ptr
, selector_fc
._ptr
, ranges
._ptr
498 self
._check
_field
_class
_create
_status
(ptr
, "option")
499 fc
= bt2_field_class
._obj
_type
_from
_field
_class
_ptr
(ptr
)._create
_from
_ptr
(ptr
)
500 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
503 def create_variant_field_class(self
, selector_fc
=None, user_attributes
=None):
504 selector_fc_ptr
= None
506 if selector_fc
is not None:
507 utils
._check
_type
(selector_fc
, bt2_field_class
._IntegerFieldClass
)
508 selector_fc_ptr
= selector_fc
._ptr
510 ptr
= native_bt
.field_class_variant_create(self
._ptr
, selector_fc_ptr
)
511 self
._check
_field
_class
_create
_status
(ptr
, "variant")
512 fc
= bt2_field_class
._obj
_type
_from
_field
_class
_ptr
(ptr
)._create
_from
_ptr
(ptr
)
513 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)