1 # SPDX-License-Identifier: MIT
3 # Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com>
4 # Copyright (c) 2018 Francis Deslauriers <francis.deslauriers@efficios.com>
5 # Copyright (c) 2019 Simon Marchi <simon.marchi@efficios.com>
7 from bt2
import native_bt
, utils
, object
8 from bt2
import stream_class
as bt2_stream_class
9 from bt2
import field_class
as bt2_field_class
10 from bt2
import integer_range_set
as bt2_integer_range_set
11 from bt2
import trace
as bt2_trace
12 from bt2
import value
as bt2_value
13 import collections
.abc
18 def _trace_class_destruction_listener_from_native(
19 user_listener
, handle
, trace_class_ptr
21 trace_class
= _TraceClass
._create
_from
_ptr
_and
_get
_ref
(trace_class_ptr
)
22 user_listener(trace_class
)
26 class _TraceClassConst(object._SharedObject
, collections
.abc
.Mapping
):
27 _get_ref
= staticmethod(native_bt
.trace_class_get_ref
)
28 _put_ref
= staticmethod(native_bt
.trace_class_put_ref
)
29 _borrow_stream_class_ptr_by_index
= staticmethod(
30 native_bt
.trace_class_borrow_stream_class_by_index_const
32 _borrow_stream_class_ptr_by_id
= staticmethod(
33 native_bt
.trace_class_borrow_stream_class_by_id_const
35 _borrow_user_attributes_ptr
= staticmethod(
36 native_bt
.trace_class_borrow_user_attributes_const
38 _stream_class_pycls
= bt2_stream_class
._StreamClassConst
39 _create_value_from_ptr_and_get_ref
= staticmethod(
40 bt2_value
._create
_from
_const
_ptr
_and
_get
_ref
44 def user_attributes(self
):
45 ptr
= self
._borrow
_user
_attributes
_ptr
(self
._ptr
)
46 assert ptr
is not None
47 return self
._create
_value
_from
_ptr
_and
_get
_ref
(ptr
)
49 # Number of stream classes in this trace class.
52 count
= native_bt
.trace_class_get_stream_class_count(self
._ptr
)
56 # Get a stream class by stream id.
58 def __getitem__(self
, key
):
59 utils
._check
_uint
64(key
)
61 sc_ptr
= self
._borrow
_stream
_class
_ptr
_by
_id
(self
._ptr
, key
)
65 return self
._stream
_class
_pycls
._create
_from
_ptr
_and
_get
_ref
(sc_ptr
)
68 for idx
in range(len(self
)):
69 sc_ptr
= self
._borrow
_stream
_class
_ptr
_by
_index
(self
._ptr
, idx
)
70 assert sc_ptr
is not None
72 id = native_bt
.stream_class_get_id(sc_ptr
)
78 def assigns_automatic_stream_class_id(self
):
79 return native_bt
.trace_class_assigns_automatic_stream_class_id(self
._ptr
)
81 # Add a listener to be called when the trace class is destroyed.
83 def add_destruction_listener(self
, listener
):
85 if not callable(listener
):
86 raise TypeError("'listener' parameter is not callable")
88 handle
= utils
._ListenerHandle
(self
.addr
)
90 listener_from_native
= functools
.partial(
91 _trace_class_destruction_listener_from_native
, listener
, handle
94 fn
= native_bt
.bt2_trace_class_add_destruction_listener
95 status
, listener_id
= fn(self
._ptr
, listener_from_native
)
96 utils
._handle
_func
_status
(
97 status
, 'cannot add destruction listener to trace class object'
100 handle
._set
_listener
_id
(listener_id
)
104 def remove_destruction_listener(self
, listener_handle
):
105 utils
._check
_type
(listener_handle
, utils
._ListenerHandle
)
107 if listener_handle
._addr
!= self
.addr
:
109 'This trace class destruction listener does not match the trace class object.'
112 if listener_handle
._listener
_id
is None:
114 'This trace class destruction listener was already removed.'
117 status
= native_bt
.trace_class_remove_destruction_listener(
118 self
._ptr
, listener_handle
._listener
_id
120 utils
._handle
_func
_status
(status
)
121 listener_handle
._invalidate
()
124 class _TraceClass(_TraceClassConst
):
125 _borrow_stream_class_ptr_by_index
= staticmethod(
126 native_bt
.trace_class_borrow_stream_class_by_index
128 _borrow_stream_class_ptr_by_id
= staticmethod(
129 native_bt
.trace_class_borrow_stream_class_by_id
131 _borrow_user_attributes_ptr
= staticmethod(
132 native_bt
.trace_class_borrow_user_attributes
134 _stream_class_pycls
= bt2_stream_class
._StreamClass
135 _create_value_from_ptr_and_get_ref
= staticmethod(
136 bt2_value
._create
_from
_ptr
_and
_get
_ref
139 # Instantiate a trace of this class.
141 def __call__(self
, name
=None, user_attributes
=None, uuid
=None, environment
=None):
142 trace_ptr
= native_bt
.trace_create(self
._ptr
)
144 if trace_ptr
is None:
145 raise bt2
._MemoryError('cannot create trace class object')
147 trace
= bt2_trace
._Trace
._create
_from
_ptr
(trace_ptr
)
152 if user_attributes
is not None:
153 trace
._user
_attributes
= user_attributes
158 if environment
is not None:
159 for key
, value
in environment
.items():
160 trace
.environment
[key
] = value
164 def create_stream_class(
168 user_attributes
=None,
169 packet_context_field_class
=None,
170 event_common_context_field_class
=None,
171 default_clock_class
=None,
172 assigns_automatic_event_class_id
=True,
173 assigns_automatic_stream_id
=True,
174 supports_packets
=False,
175 packets_have_beginning_default_clock_snapshot
=False,
176 packets_have_end_default_clock_snapshot
=False,
177 supports_discarded_events
=False,
178 discarded_events_have_default_clock_snapshots
=False,
179 supports_discarded_packets
=False,
180 discarded_packets_have_default_clock_snapshots
=False,
182 # Validate parameters before we create the object.
183 bt2_stream_class
._StreamClass
._validate
_create
_params
(
186 packet_context_field_class
,
187 event_common_context_field_class
,
189 assigns_automatic_event_class_id
,
190 assigns_automatic_stream_id
,
192 packets_have_beginning_default_clock_snapshot
,
193 packets_have_end_default_clock_snapshot
,
194 supports_discarded_events
,
195 discarded_events_have_default_clock_snapshots
,
196 supports_discarded_packets
,
197 discarded_packets_have_default_clock_snapshots
,
200 if self
.assigns_automatic_stream_class_id
:
203 'id provided, but trace class assigns automatic stream class ids'
206 sc_ptr
= native_bt
.stream_class_create(self
._ptr
)
210 'id not provided, but trace class does not assign automatic stream class ids'
213 utils
._check
_uint
64(id)
214 sc_ptr
= native_bt
.stream_class_create_with_id(self
._ptr
, id)
216 sc
= bt2_stream_class
._StreamClass
._create
_from
_ptr
(sc_ptr
)
221 if user_attributes
is not None:
222 sc
._user
_attributes
= user_attributes
224 if event_common_context_field_class
is not None:
225 sc
._event
_common
_context
_field
_class
= event_common_context_field_class
227 if default_clock_class
is not None:
228 sc
._default
_clock
_class
= default_clock_class
230 # call after `sc._default_clock_class` because, if
231 # `packets_have_beginning_default_clock_snapshot` or
232 # `packets_have_end_default_clock_snapshot` is true, then this
233 # stream class needs a default clock class already.
234 sc
._set
_supports
_packets
(
236 packets_have_beginning_default_clock_snapshot
,
237 packets_have_end_default_clock_snapshot
,
240 # call after sc._set_supports_packets() because, if
241 # `packet_context_field_class` is not `None`, then this stream
242 # class needs to support packets already.
243 if packet_context_field_class
is not None:
244 sc
._packet
_context
_field
_class
= packet_context_field_class
246 sc
._assigns
_automatic
_event
_class
_id
= assigns_automatic_event_class_id
247 sc
._assigns
_automatic
_stream
_id
= assigns_automatic_stream_id
248 sc
._set
_supports
_discarded
_events
(
249 supports_discarded_events
, discarded_events_have_default_clock_snapshots
251 sc
._set
_supports
_discarded
_packets
(
252 supports_discarded_packets
, discarded_packets_have_default_clock_snapshots
256 def _user_attributes(self
, user_attributes
):
257 value
= bt2_value
.create_value(user_attributes
)
258 utils
._check
_type
(value
, bt2_value
.MapValue
)
259 native_bt
.trace_class_set_user_attributes(self
._ptr
, value
._ptr
)
261 _user_attributes
= property(fset
=_user_attributes
)
263 def _assigns_automatic_stream_class_id(self
, auto_id
):
264 utils
._check
_bool
(auto_id
)
265 return native_bt
.trace_class_set_assigns_automatic_stream_class_id(
269 _assigns_automatic_stream_class_id
= property(
270 fset
=_assigns_automatic_stream_class_id
273 # Field class creation methods.
275 def _check_field_class_create_status(self
, ptr
, type_name
):
277 raise bt2
._MemoryError('cannot create {} field class'.format(type_name
))
280 def _set_field_class_user_attrs(fc
, user_attributes
):
281 if user_attributes
is not None:
282 fc
._user
_attributes
= user_attributes
284 def create_bool_field_class(self
, user_attributes
=None):
285 field_class_ptr
= native_bt
.field_class_bool_create(self
._ptr
)
286 self
._check
_field
_class
_create
_status
(field_class_ptr
, 'boolean')
287 fc
= bt2_field_class
._BoolFieldClass
._create
_from
_ptr
(field_class_ptr
)
288 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
291 def create_bit_array_field_class(self
, length
, user_attributes
=None):
292 utils
._check
_uint
64(length
)
294 if length
< 1 or length
> 64:
296 'invalid length {}: expecting a value in the [1, 64] range'.format(
301 field_class_ptr
= native_bt
.field_class_bit_array_create(self
._ptr
, length
)
302 self
._check
_field
_class
_create
_status
(field_class_ptr
, 'bit array')
303 fc
= bt2_field_class
._BitArrayFieldClass
._create
_from
_ptr
(field_class_ptr
)
304 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
307 def _create_integer_field_class(
313 preferred_display_base
,
316 field_class_ptr
= create_func(self
._ptr
)
317 self
._check
_field
_class
_create
_status
(field_class_ptr
, type_name
)
319 field_class
= py_cls
._create
_from
_ptr
(field_class_ptr
)
321 if field_value_range
is not None:
322 field_class
._field
_value
_range
= field_value_range
324 if preferred_display_base
is not None:
325 field_class
._preferred
_display
_base
= preferred_display_base
327 self
._set
_field
_class
_user
_attrs
(field_class
, user_attributes
)
330 def create_signed_integer_field_class(
331 self
, field_value_range
=None, preferred_display_base
=None, user_attributes
=None
333 return self
._create
_integer
_field
_class
(
334 native_bt
.field_class_integer_signed_create
,
335 bt2_field_class
._SignedIntegerFieldClass
,
338 preferred_display_base
,
342 def create_unsigned_integer_field_class(
343 self
, field_value_range
=None, preferred_display_base
=None, user_attributes
=None
345 return self
._create
_integer
_field
_class
(
346 native_bt
.field_class_integer_unsigned_create
,
347 bt2_field_class
._UnsignedIntegerFieldClass
,
350 preferred_display_base
,
354 def create_signed_enumeration_field_class(
355 self
, field_value_range
=None, preferred_display_base
=None, user_attributes
=None
357 return self
._create
_integer
_field
_class
(
358 native_bt
.field_class_enumeration_signed_create
,
359 bt2_field_class
._SignedEnumerationFieldClass
,
360 'signed enumeration',
362 preferred_display_base
,
366 def create_unsigned_enumeration_field_class(
367 self
, field_value_range
=None, preferred_display_base
=None, user_attributes
=None
369 return self
._create
_integer
_field
_class
(
370 native_bt
.field_class_enumeration_unsigned_create
,
371 bt2_field_class
._UnsignedEnumerationFieldClass
,
372 'unsigned enumeration',
374 preferred_display_base
,
378 def create_single_precision_real_field_class(self
, user_attributes
=None):
379 field_class_ptr
= native_bt
.field_class_real_single_precision_create(self
._ptr
)
380 self
._check
_field
_class
_create
_status
(field_class_ptr
, 'single-precision real')
382 field_class
= bt2_field_class
._SinglePrecisionRealFieldClass
._create
_from
_ptr
(
386 self
._set
_field
_class
_user
_attrs
(field_class
, user_attributes
)
390 def create_double_precision_real_field_class(self
, user_attributes
=None):
391 field_class_ptr
= native_bt
.field_class_real_double_precision_create(self
._ptr
)
392 self
._check
_field
_class
_create
_status
(field_class_ptr
, 'double-precision real')
394 field_class
= bt2_field_class
._DoublePrecisionRealFieldClass
._create
_from
_ptr
(
398 self
._set
_field
_class
_user
_attrs
(field_class
, user_attributes
)
402 def create_structure_field_class(self
, user_attributes
=None):
403 field_class_ptr
= native_bt
.field_class_structure_create(self
._ptr
)
404 self
._check
_field
_class
_create
_status
(field_class_ptr
, 'structure')
405 fc
= bt2_field_class
._StructureFieldClass
._create
_from
_ptr
(field_class_ptr
)
406 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
409 def create_string_field_class(self
, user_attributes
=None):
410 field_class_ptr
= native_bt
.field_class_string_create(self
._ptr
)
411 self
._check
_field
_class
_create
_status
(field_class_ptr
, 'string')
412 fc
= bt2_field_class
._StringFieldClass
._create
_from
_ptr
(field_class_ptr
)
413 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
416 def create_static_array_field_class(self
, elem_fc
, length
, user_attributes
=None):
417 utils
._check
_type
(elem_fc
, bt2_field_class
._FieldClass
)
418 utils
._check
_uint
64(length
)
419 ptr
= native_bt
.field_class_array_static_create(self
._ptr
, elem_fc
._ptr
, length
)
420 self
._check
_field
_class
_create
_status
(ptr
, 'static array')
421 fc
= bt2_field_class
._StaticArrayFieldClass
._create
_from
_ptr
_and
_get
_ref
(ptr
)
422 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
425 def create_dynamic_array_field_class(
426 self
, elem_fc
, length_fc
=None, user_attributes
=None
428 utils
._check
_type
(elem_fc
, bt2_field_class
._FieldClass
)
431 if length_fc
is not None:
432 utils
._check
_type
(length_fc
, bt2_field_class
._UnsignedIntegerFieldClass
)
433 length_fc_ptr
= length_fc
._ptr
435 ptr
= native_bt
.field_class_array_dynamic_create(
436 self
._ptr
, elem_fc
._ptr
, length_fc_ptr
438 self
._check
_field
_class
_create
_status
(ptr
, 'dynamic array')
439 fc
= bt2_field_class
._create
_field
_class
_from
_ptr
_and
_get
_ref
(ptr
)
440 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
443 def create_option_without_selector_field_class(
444 self
, content_fc
, user_attributes
=None
446 utils
._check
_type
(content_fc
, bt2_field_class
._FieldClass
)
447 ptr
= native_bt
.field_class_option_without_selector_create(
448 self
._ptr
, content_fc
._ptr
450 self
._check
_field
_class
_create
_status
(ptr
, 'option')
451 fc
= bt2_field_class
._create
_field
_class
_from
_ptr
_and
_get
_ref
(ptr
)
452 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
455 def create_option_with_bool_selector_field_class(
456 self
, content_fc
, selector_fc
, selector_is_reversed
=False, user_attributes
=None
458 utils
._check
_type
(content_fc
, bt2_field_class
._FieldClass
)
459 utils
._check
_bool
(selector_is_reversed
)
460 utils
._check
_type
(selector_fc
, bt2_field_class
._BoolFieldClass
)
461 ptr
= native_bt
.field_class_option_with_selector_field_bool_create(
462 self
._ptr
, content_fc
._ptr
, selector_fc
._ptr
464 self
._check
_field
_class
_create
_status
(ptr
, 'option')
465 fc
= bt2_field_class
._create
_field
_class
_from
_ptr
_and
_get
_ref
(ptr
)
466 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
467 fc
._selector
_is
_reversed
= selector_is_reversed
470 def create_option_with_integer_selector_field_class(
471 self
, content_fc
, selector_fc
, ranges
, user_attributes
=None
473 utils
._check
_type
(content_fc
, bt2_field_class
._FieldClass
)
474 utils
._check
_type
(selector_fc
, bt2_field_class
._IntegerFieldClass
)
477 raise ValueError('integer range set is empty')
479 if isinstance(selector_fc
, bt2_field_class
._UnsignedIntegerFieldClass
):
480 utils
._check
_type
(ranges
, bt2_integer_range_set
.UnsignedIntegerRangeSet
)
481 ptr
= native_bt
.field_class_option_with_selector_field_integer_unsigned_create(
482 self
._ptr
, content_fc
._ptr
, selector_fc
._ptr
, ranges
._ptr
485 utils
._check
_type
(ranges
, bt2_integer_range_set
.SignedIntegerRangeSet
)
486 ptr
= native_bt
.field_class_option_with_selector_field_integer_signed_create(
487 self
._ptr
, content_fc
._ptr
, selector_fc
._ptr
, ranges
._ptr
490 self
._check
_field
_class
_create
_status
(ptr
, 'option')
491 fc
= bt2_field_class
._create
_field
_class
_from
_ptr
_and
_get
_ref
(ptr
)
492 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
495 def create_variant_field_class(self
, selector_fc
=None, user_attributes
=None):
496 selector_fc_ptr
= None
498 if selector_fc
is not None:
499 utils
._check
_type
(selector_fc
, bt2_field_class
._IntegerFieldClass
)
500 selector_fc_ptr
= selector_fc
._ptr
502 ptr
= native_bt
.field_class_variant_create(self
._ptr
, selector_fc_ptr
)
503 self
._check
_field
_class
_create
_status
(ptr
, 'variant')
504 fc
= bt2_field_class
._create
_field
_class
_from
_ptr
_and
_get
_ref
(ptr
)
505 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)