1 # SPDX-License-Identifier: MIT
3 # Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com>
4 # Copyright (c) 2018 Francis Deslauriers <francis.deslauriers@efficios.com>
5 # Copyright (c) 2019 Simon Marchi <simon.marchi@efficios.com>
7 from bt2
import native_bt
8 from bt2
import utils
as bt2_utils
9 from bt2
import object as bt2_object
10 from bt2
import stream_class
as bt2_stream_class
11 from bt2
import field_class
as bt2_field_class
12 from bt2
import integer_range_set
as bt2_integer_range_set
13 from bt2
import trace
as bt2_trace
14 from bt2
import error
as bt2_error
15 from bt2
import value
as bt2_value
16 import collections
.abc
20 def _trace_class_destruction_listener_from_native(
21 user_listener
, handle
, trace_class_ptr
23 trace_class
= _TraceClass
._create
_from
_ptr
_and
_get
_ref
(trace_class_ptr
)
24 user_listener(trace_class
)
28 class _TraceClassConst(bt2_object
._SharedObject
, collections
.abc
.Mapping
):
31 native_bt
.trace_class_get_ref(ptr
)
35 native_bt
.trace_class_put_ref(ptr
)
37 _borrow_stream_class_ptr_by_index
= staticmethod(
38 native_bt
.trace_class_borrow_stream_class_by_index_const
40 _borrow_stream_class_ptr_by_id
= staticmethod(
41 native_bt
.trace_class_borrow_stream_class_by_id_const
43 _borrow_user_attributes_ptr
= staticmethod(
44 native_bt
.trace_class_borrow_user_attributes_const
46 _stream_class_pycls
= bt2_stream_class
._StreamClassConst
47 _create_value_from_ptr_and_get_ref
= staticmethod(
48 bt2_value
._create
_from
_const
_ptr
_and
_get
_ref
52 def user_attributes(self
):
53 ptr
= self
._borrow
_user
_attributes
_ptr
(self
._ptr
)
54 assert ptr
is not None
55 return self
._create
_value
_from
_ptr
_and
_get
_ref
(ptr
)
57 # Number of stream classes in this trace class.
60 count
= native_bt
.trace_class_get_stream_class_count(self
._ptr
)
64 # Get a stream class by stream id.
66 def __getitem__(self
, key
):
67 bt2_utils
._check
_uint
64(key
)
69 sc_ptr
= self
._borrow
_stream
_class
_ptr
_by
_id
(self
._ptr
, key
)
73 return self
._stream
_class
_pycls
._create
_from
_ptr
_and
_get
_ref
(sc_ptr
)
76 for idx
in range(len(self
)):
77 sc_ptr
= self
._borrow
_stream
_class
_ptr
_by
_index
(self
._ptr
, idx
)
78 assert sc_ptr
is not None
80 id = native_bt
.stream_class_get_id(sc_ptr
)
86 def assigns_automatic_stream_class_id(self
):
87 return native_bt
.trace_class_assigns_automatic_stream_class_id(self
._ptr
)
89 # Add a listener to be called when the trace class is destroyed.
91 def add_destruction_listener(self
, listener
):
92 if not callable(listener
):
93 raise TypeError("'listener' parameter is not callable")
95 handle
= bt2_utils
._ListenerHandle
(self
.addr
)
97 listener_from_native
= functools
.partial(
98 _trace_class_destruction_listener_from_native
, listener
, handle
101 fn
= native_bt
.bt2_trace_class_add_destruction_listener
102 status
, listener_id
= fn(self
._ptr
, listener_from_native
)
103 bt2_utils
._handle
_func
_status
(
104 status
, "cannot add destruction listener to trace class object"
107 handle
._set
_listener
_id
(listener_id
)
111 def remove_destruction_listener(self
, listener_handle
):
112 bt2_utils
._check
_type
(listener_handle
, bt2_utils
._ListenerHandle
)
114 if listener_handle
._addr
!= self
.addr
:
116 "This trace class destruction listener does not match the trace class object."
119 if listener_handle
._listener
_id
is None:
121 "This trace class destruction listener was already removed."
124 status
= native_bt
.trace_class_remove_destruction_listener(
125 self
._ptr
, listener_handle
._listener
_id
127 bt2_utils
._handle
_func
_status
(status
)
128 listener_handle
._invalidate
()
131 class _TraceClass(_TraceClassConst
):
132 _borrow_stream_class_ptr_by_index
= staticmethod(
133 native_bt
.trace_class_borrow_stream_class_by_index
135 _borrow_stream_class_ptr_by_id
= staticmethod(
136 native_bt
.trace_class_borrow_stream_class_by_id
138 _borrow_user_attributes_ptr
= staticmethod(
139 native_bt
.trace_class_borrow_user_attributes
141 _stream_class_pycls
= bt2_stream_class
._StreamClass
142 _create_value_from_ptr_and_get_ref
= staticmethod(
143 bt2_value
._create
_from
_ptr
_and
_get
_ref
146 # Instantiate a trace of this class.
148 def __call__(self
, name
=None, user_attributes
=None, uuid
=None, environment
=None):
149 trace_ptr
= native_bt
.trace_create(self
._ptr
)
151 if trace_ptr
is None:
152 raise bt2_error
._MemoryError
("cannot create trace class object")
154 trace
= bt2_trace
._Trace
._create
_from
_ptr
(trace_ptr
)
159 if user_attributes
is not None:
160 trace
._user
_attributes
= user_attributes
165 if environment
is not None:
166 for key
, value
in environment
.items():
167 trace
.environment
[key
] = value
171 def create_stream_class(
175 user_attributes
=None,
176 packet_context_field_class
=None,
177 event_common_context_field_class
=None,
178 default_clock_class
=None,
179 assigns_automatic_event_class_id
=True,
180 assigns_automatic_stream_id
=True,
181 supports_packets
=False,
182 packets_have_beginning_default_clock_snapshot
=False,
183 packets_have_end_default_clock_snapshot
=False,
184 supports_discarded_events
=False,
185 discarded_events_have_default_clock_snapshots
=False,
186 supports_discarded_packets
=False,
187 discarded_packets_have_default_clock_snapshots
=False,
189 # Validate parameters before we create the object.
190 bt2_stream_class
._StreamClass
._validate
_create
_params
(
193 packet_context_field_class
,
194 event_common_context_field_class
,
196 assigns_automatic_event_class_id
,
197 assigns_automatic_stream_id
,
199 packets_have_beginning_default_clock_snapshot
,
200 packets_have_end_default_clock_snapshot
,
201 supports_discarded_events
,
202 discarded_events_have_default_clock_snapshots
,
203 supports_discarded_packets
,
204 discarded_packets_have_default_clock_snapshots
,
207 if self
.assigns_automatic_stream_class_id
:
210 "id provided, but trace class assigns automatic stream class ids"
213 sc_ptr
= native_bt
.stream_class_create(self
._ptr
)
217 "id not provided, but trace class does not assign automatic stream class ids"
220 bt2_utils
._check
_uint
64(id)
221 sc_ptr
= native_bt
.stream_class_create_with_id(self
._ptr
, id)
223 sc
= bt2_stream_class
._StreamClass
._create
_from
_ptr
(sc_ptr
)
228 if user_attributes
is not None:
229 sc
._user
_attributes
= user_attributes
231 if event_common_context_field_class
is not None:
232 sc
._event
_common
_context
_field
_class
= event_common_context_field_class
234 if default_clock_class
is not None:
235 sc
._default
_clock
_class
= default_clock_class
237 # call after `sc._default_clock_class` because, if
238 # `packets_have_beginning_default_clock_snapshot` or
239 # `packets_have_end_default_clock_snapshot` is true, then this
240 # stream class needs a default clock class already.
241 sc
._set
_supports
_packets
(
243 packets_have_beginning_default_clock_snapshot
,
244 packets_have_end_default_clock_snapshot
,
247 # call after sc._set_supports_packets() because, if
248 # `packet_context_field_class` is not `None`, then this stream
249 # class needs to support packets already.
250 if packet_context_field_class
is not None:
251 sc
._packet
_context
_field
_class
= packet_context_field_class
253 sc
._assigns
_automatic
_event
_class
_id
= assigns_automatic_event_class_id
254 sc
._assigns
_automatic
_stream
_id
= assigns_automatic_stream_id
255 sc
._set
_supports
_discarded
_events
(
256 supports_discarded_events
, discarded_events_have_default_clock_snapshots
258 sc
._set
_supports
_discarded
_packets
(
259 supports_discarded_packets
, discarded_packets_have_default_clock_snapshots
263 def _user_attributes(self
, user_attributes
):
264 value
= bt2_value
.create_value(user_attributes
)
265 bt2_utils
._check
_type
(value
, bt2_value
.MapValue
)
266 native_bt
.trace_class_set_user_attributes(self
._ptr
, value
._ptr
)
268 _user_attributes
= property(fset
=_user_attributes
)
270 def _assigns_automatic_stream_class_id(self
, auto_id
):
271 bt2_utils
._check
_bool
(auto_id
)
272 return native_bt
.trace_class_set_assigns_automatic_stream_class_id(
276 _assigns_automatic_stream_class_id
= property(
277 fset
=_assigns_automatic_stream_class_id
280 # Field class creation methods.
282 def _check_field_class_create_status(self
, ptr
, type_name
):
284 raise bt2_error
._MemoryError
(
285 "cannot create {} field class".format(type_name
)
289 def _set_field_class_user_attrs(fc
, user_attributes
):
290 if user_attributes
is not None:
291 fc
._user
_attributes
= user_attributes
293 def create_bool_field_class(self
, user_attributes
=None):
294 field_class_ptr
= native_bt
.field_class_bool_create(self
._ptr
)
295 self
._check
_field
_class
_create
_status
(field_class_ptr
, "boolean")
296 fc
= bt2_field_class
._BoolFieldClass
._create
_from
_ptr
(field_class_ptr
)
297 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
300 def create_bit_array_field_class(self
, length
, user_attributes
=None):
301 bt2_utils
._check
_uint
64(length
)
303 if length
< 1 or length
> 64:
305 "invalid length {}: expecting a value in the [1, 64] range".format(
310 field_class_ptr
= native_bt
.field_class_bit_array_create(self
._ptr
, length
)
311 self
._check
_field
_class
_create
_status
(field_class_ptr
, "bit array")
312 fc
= bt2_field_class
._BitArrayFieldClass
._create
_from
_ptr
(field_class_ptr
)
313 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
316 def _create_integer_field_class(
322 preferred_display_base
,
325 field_class_ptr
= create_func(self
._ptr
)
326 self
._check
_field
_class
_create
_status
(field_class_ptr
, type_name
)
328 field_class
= py_cls
._create
_from
_ptr
(field_class_ptr
)
330 if field_value_range
is not None:
331 field_class
._field
_value
_range
= field_value_range
333 if preferred_display_base
is not None:
334 field_class
._preferred
_display
_base
= preferred_display_base
336 self
._set
_field
_class
_user
_attrs
(field_class
, user_attributes
)
339 def create_signed_integer_field_class(
340 self
, field_value_range
=None, preferred_display_base
=None, user_attributes
=None
342 return self
._create
_integer
_field
_class
(
343 native_bt
.field_class_integer_signed_create
,
344 bt2_field_class
._SignedIntegerFieldClass
,
347 preferred_display_base
,
351 def create_unsigned_integer_field_class(
352 self
, field_value_range
=None, preferred_display_base
=None, user_attributes
=None
354 return self
._create
_integer
_field
_class
(
355 native_bt
.field_class_integer_unsigned_create
,
356 bt2_field_class
._UnsignedIntegerFieldClass
,
359 preferred_display_base
,
363 def create_signed_enumeration_field_class(
364 self
, field_value_range
=None, preferred_display_base
=None, user_attributes
=None
366 return self
._create
_integer
_field
_class
(
367 native_bt
.field_class_enumeration_signed_create
,
368 bt2_field_class
._SignedEnumerationFieldClass
,
369 "signed enumeration",
371 preferred_display_base
,
375 def create_unsigned_enumeration_field_class(
376 self
, field_value_range
=None, preferred_display_base
=None, user_attributes
=None
378 return self
._create
_integer
_field
_class
(
379 native_bt
.field_class_enumeration_unsigned_create
,
380 bt2_field_class
._UnsignedEnumerationFieldClass
,
381 "unsigned enumeration",
383 preferred_display_base
,
387 def create_single_precision_real_field_class(self
, user_attributes
=None):
388 field_class_ptr
= native_bt
.field_class_real_single_precision_create(self
._ptr
)
389 self
._check
_field
_class
_create
_status
(field_class_ptr
, "single-precision real")
391 field_class
= bt2_field_class
._SinglePrecisionRealFieldClass
._create
_from
_ptr
(
395 self
._set
_field
_class
_user
_attrs
(field_class
, user_attributes
)
399 def create_double_precision_real_field_class(self
, user_attributes
=None):
400 field_class_ptr
= native_bt
.field_class_real_double_precision_create(self
._ptr
)
401 self
._check
_field
_class
_create
_status
(field_class_ptr
, "double-precision real")
403 field_class
= bt2_field_class
._DoublePrecisionRealFieldClass
._create
_from
_ptr
(
407 self
._set
_field
_class
_user
_attrs
(field_class
, user_attributes
)
411 def create_structure_field_class(self
, user_attributes
=None):
412 field_class_ptr
= native_bt
.field_class_structure_create(self
._ptr
)
413 self
._check
_field
_class
_create
_status
(field_class_ptr
, "structure")
414 fc
= bt2_field_class
._StructureFieldClass
._create
_from
_ptr
(field_class_ptr
)
415 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
418 def create_string_field_class(self
, user_attributes
=None):
419 field_class_ptr
= native_bt
.field_class_string_create(self
._ptr
)
420 self
._check
_field
_class
_create
_status
(field_class_ptr
, "string")
421 fc
= bt2_field_class
._StringFieldClass
._create
_from
_ptr
(field_class_ptr
)
422 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
425 def create_static_array_field_class(self
, elem_fc
, length
, user_attributes
=None):
426 bt2_utils
._check
_type
(elem_fc
, bt2_field_class
._FieldClass
)
427 bt2_utils
._check
_uint
64(length
)
428 ptr
= native_bt
.field_class_array_static_create(self
._ptr
, elem_fc
._ptr
, length
)
429 self
._check
_field
_class
_create
_status
(ptr
, "static array")
430 fc
= bt2_field_class
._StaticArrayFieldClass
._create
_from
_ptr
(ptr
)
431 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
434 def create_dynamic_array_field_class(
435 self
, elem_fc
, length_fc
=None, user_attributes
=None
437 bt2_utils
._check
_type
(elem_fc
, bt2_field_class
._FieldClass
)
440 if length_fc
is not None:
441 bt2_utils
._check
_type
(length_fc
, bt2_field_class
._UnsignedIntegerFieldClass
)
442 length_fc_ptr
= length_fc
._ptr
444 ptr
= native_bt
.field_class_array_dynamic_create(
445 self
._ptr
, elem_fc
._ptr
, length_fc_ptr
447 self
._check
_field
_class
_create
_status
(ptr
, "dynamic array")
448 fc
= bt2_field_class
._obj
_type
_from
_field
_class
_ptr
(ptr
)._create
_from
_ptr
(ptr
)
449 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
452 def create_option_without_selector_field_class(
453 self
, content_fc
, user_attributes
=None
455 bt2_utils
._check
_type
(content_fc
, bt2_field_class
._FieldClass
)
456 ptr
= native_bt
.field_class_option_without_selector_create(
457 self
._ptr
, content_fc
._ptr
459 self
._check
_field
_class
_create
_status
(ptr
, "option")
460 fc
= bt2_field_class
._obj
_type
_from
_field
_class
_ptr
(ptr
)._create
_from
_ptr
(ptr
)
461 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
464 def create_option_with_bool_selector_field_class(
465 self
, content_fc
, selector_fc
, selector_is_reversed
=False, user_attributes
=None
467 bt2_utils
._check
_type
(content_fc
, bt2_field_class
._FieldClass
)
468 bt2_utils
._check
_bool
(selector_is_reversed
)
469 bt2_utils
._check
_type
(selector_fc
, bt2_field_class
._BoolFieldClass
)
470 ptr
= native_bt
.field_class_option_with_selector_field_bool_create(
471 self
._ptr
, content_fc
._ptr
, selector_fc
._ptr
473 self
._check
_field
_class
_create
_status
(ptr
, "option")
474 fc
= bt2_field_class
._obj
_type
_from
_field
_class
_ptr
(ptr
)._create
_from
_ptr
(ptr
)
475 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
476 fc
._selector
_is
_reversed
= selector_is_reversed
479 def create_option_with_integer_selector_field_class(
480 self
, content_fc
, selector_fc
, ranges
, user_attributes
=None
482 bt2_utils
._check
_type
(content_fc
, bt2_field_class
._FieldClass
)
483 bt2_utils
._check
_type
(selector_fc
, bt2_field_class
._IntegerFieldClass
)
486 raise ValueError("integer range set is empty")
488 if isinstance(selector_fc
, bt2_field_class
._UnsignedIntegerFieldClass
):
489 bt2_utils
._check
_type
(ranges
, bt2_integer_range_set
.UnsignedIntegerRangeSet
)
490 ptr
= native_bt
.field_class_option_with_selector_field_integer_unsigned_create(
491 self
._ptr
, content_fc
._ptr
, selector_fc
._ptr
, ranges
._ptr
494 bt2_utils
._check
_type
(ranges
, bt2_integer_range_set
.SignedIntegerRangeSet
)
496 native_bt
.field_class_option_with_selector_field_integer_signed_create(
497 self
._ptr
, content_fc
._ptr
, selector_fc
._ptr
, ranges
._ptr
501 self
._check
_field
_class
_create
_status
(ptr
, "option")
502 fc
= bt2_field_class
._obj
_type
_from
_field
_class
_ptr
(ptr
)._create
_from
_ptr
(ptr
)
503 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)
506 def create_variant_field_class(self
, selector_fc
=None, user_attributes
=None):
507 selector_fc_ptr
= None
509 if selector_fc
is not None:
510 bt2_utils
._check
_type
(selector_fc
, bt2_field_class
._IntegerFieldClass
)
511 selector_fc_ptr
= selector_fc
._ptr
513 ptr
= native_bt
.field_class_variant_create(self
._ptr
, selector_fc_ptr
)
514 self
._check
_field
_class
_create
_status
(ptr
, "variant")
515 fc
= bt2_field_class
._obj
_type
_from
_field
_class
_ptr
(ptr
)._create
_from
_ptr
(ptr
)
516 self
._set
_field
_class
_user
_attrs
(fc
, user_attributes
)