import os
-_env_var = os.environ.get('BABELTRACE_PYTHON_BT2_NO_TRACEBACK')
-_NO_PRINT_TRACEBACK = _env_var == '1'
-
-
# This class wraps a component class pointer. This component class could
# have been created by Python code, but since we only have the pointer,
# we can only wrap it in a generic way and lose the original Python
#
# Subclasses must implement some methods that this base class uses:
#
-# - _as_component_class_ptr: static method, convert the passed component class
+# - _bt_as_component_class_ptr: static method, convert the passed component class
# pointer to a 'bt_component_class *'.
class _GenericComponentClass(object._SharedObject):
@property
def name(self):
- ptr = self._as_component_class_ptr(self._ptr)
+ ptr = self._bt_as_component_class_ptr(self._ptr)
name = native_bt.component_class_get_name(ptr)
assert name is not None
return name
@property
def description(self):
- ptr = self._as_component_class_ptr(self._ptr)
+ ptr = self._bt_as_component_class_ptr(self._ptr)
return native_bt.component_class_get_description(ptr)
@property
def help(self):
- ptr = self._as_component_class_ptr(self._ptr)
+ ptr = self._bt_as_component_class_ptr(self._ptr)
return native_bt.component_class_get_help(ptr)
- def _component_class_ptr(self):
- return self._as_component_class_ptr(self._ptr)
+ def _bt_component_class_ptr(self):
+ return self._bt_as_component_class_ptr(self._ptr)
def __eq__(self, other):
if not isinstance(other, _GenericComponentClass):
class _GenericSourceComponentClass(_GenericComponentClass):
_get_ref = staticmethod(native_bt.component_class_source_get_ref)
_put_ref = staticmethod(native_bt.component_class_source_put_ref)
- _as_component_class_ptr = staticmethod(native_bt.component_class_source_as_component_class)
+ _bt_as_component_class_ptr = staticmethod(native_bt.component_class_source_as_component_class)
class _GenericFilterComponentClass(_GenericComponentClass):
_get_ref = staticmethod(native_bt.component_class_filter_get_ref)
_put_ref = staticmethod(native_bt.component_class_filter_put_ref)
- _as_component_class_ptr = staticmethod(native_bt.component_class_filter_as_component_class)
+ _bt_as_component_class_ptr = staticmethod(native_bt.component_class_filter_as_component_class)
class _GenericSinkComponentClass(_GenericComponentClass):
_get_ref = staticmethod(native_bt.component_class_sink_get_ref)
_put_ref = staticmethod(native_bt.component_class_sink_put_ref)
- _as_component_class_ptr = staticmethod(native_bt.component_class_sink_as_component_class)
+ _bt_as_component_class_ptr = staticmethod(native_bt.component_class_sink_as_component_class)
class _PortIterator(collections.abc.Iterator):
#
# Subclasses must provide these methods or property:
#
-# - _borrow_component_class_ptr: static method, must return a pointer to the
+# - _bt_borrow_component_class_ptr: static method, must return a pointer to the
# specialized component class (e.g. 'bt_component_class_sink *') of the
# passed specialized component pointer (e.g. 'bt_component_sink *').
-# - _comp_cls_type: property, one of the native_bt.COMPONENT_CLASS_TYPE_*
+# - _bt_comp_cls_type: property, one of the native_bt.COMPONENT_CLASS_TYPE_*
# constants.
-# - _as_component_ptr: static method, must return the passed specialized
+# - _bt_as_component_ptr: static method, must return the passed specialized
# component pointer (e.g. 'bt_component_sink *') as a 'bt_component *'.
class _Component:
@property
def name(self):
- ptr = self._as_component_ptr(self._ptr)
+ ptr = self._bt_as_component_ptr(self._ptr)
name = native_bt.component_get_name(ptr)
assert name is not None
return name
@property
def logging_level(self):
- ptr = self._as_component_ptr(self._ptr)
+ ptr = self._bt_as_component_ptr(self._ptr)
return native_bt.component_get_logging_level(ptr)
@property
def cls(self):
- cc_ptr = self._borrow_component_class_ptr(self._ptr)
+ cc_ptr = self._bt_borrow_component_class_ptr(self._ptr)
assert cc_ptr is not None
- return _create_component_class_from_ptr_and_get_ref(cc_ptr, self._comp_cls_type)
+ return _create_component_class_from_ptr_and_get_ref(cc_ptr, self._bt_comp_cls_type)
def __eq__(self, other):
if not hasattr(other, 'addr'):
class _SourceComponent(_Component):
- _borrow_component_class_ptr = staticmethod(native_bt.component_source_borrow_class_const)
- _comp_cls_type = native_bt.COMPONENT_CLASS_TYPE_SOURCE
- _as_component_class_ptr = staticmethod(native_bt.component_class_source_as_component_class)
- _as_component_ptr = staticmethod(native_bt.component_source_as_component_const)
+ _bt_borrow_component_class_ptr = staticmethod(native_bt.component_source_borrow_class_const)
+ _bt_comp_cls_type = native_bt.COMPONENT_CLASS_TYPE_SOURCE
+ _bt_as_component_class_ptr = staticmethod(native_bt.component_class_source_as_component_class)
+ _bt_as_component_ptr = staticmethod(native_bt.component_source_as_component_const)
class _FilterComponent(_Component):
- _borrow_component_class_ptr = staticmethod(native_bt.component_filter_borrow_class_const)
- _comp_cls_type = native_bt.COMPONENT_CLASS_TYPE_FILTER
- _as_component_class_ptr = staticmethod(native_bt.component_class_filter_as_component_class)
- _as_component_ptr = staticmethod(native_bt.component_filter_as_component_const)
+ _bt_borrow_component_class_ptr = staticmethod(native_bt.component_filter_borrow_class_const)
+ _bt_comp_cls_type = native_bt.COMPONENT_CLASS_TYPE_FILTER
+ _bt_as_component_class_ptr = staticmethod(native_bt.component_class_filter_as_component_class)
+ _bt_as_component_ptr = staticmethod(native_bt.component_filter_as_component_const)
class _SinkComponent(_Component):
- _borrow_component_class_ptr = staticmethod(native_bt.component_sink_borrow_class_const)
- _comp_cls_type = native_bt.COMPONENT_CLASS_TYPE_SINK
- _as_component_class_ptr = staticmethod(native_bt.component_class_sink_as_component_class)
- _as_component_ptr = staticmethod(native_bt.component_sink_as_component_const)
+ _bt_borrow_component_class_ptr = staticmethod(native_bt.component_sink_borrow_class_const)
+ _bt_comp_cls_type = native_bt.COMPONENT_CLASS_TYPE_SINK
+ _bt_as_component_class_ptr = staticmethod(native_bt.component_class_sink_as_component_class)
+ _bt_as_component_ptr = staticmethod(native_bt.component_sink_as_component_const)
# This is analogous to _GenericSourceComponentClass, but for source
iter_cls = kwargs.get('message_iterator_class')
if _UserSourceComponent in bases:
- _UserComponentType._set_iterator_class(cls, iter_cls)
+ _UserComponentType._bt_set_iterator_class(cls, iter_cls)
cc_ptr = native_bt.bt2_component_class_source_create(cls,
comp_cls_name,
comp_cls_descr,
comp_cls_help)
elif _UserFilterComponent in bases:
- _UserComponentType._set_iterator_class(cls, iter_cls)
+ _UserComponentType._bt_set_iterator_class(cls, iter_cls)
cc_ptr = native_bt.bt2_component_class_filter_create(cls,
comp_cls_name,
comp_cls_descr,
if cc_ptr is None:
raise bt2.CreationError("cannot create component class '{}'".format(class_name))
- cls._cc_ptr = cc_ptr
+ cls._bt_cc_ptr = cc_ptr
- def _init_from_native(cls, comp_ptr, params_ptr):
+ def _bt_init_from_native(cls, comp_ptr, params_ptr):
# create instance, not user-initialized yet
self = cls.__new__(cls)
# pointer to native self component object (weak/borrowed)
- self._ptr = comp_ptr
+ self._bt_ptr = comp_ptr
# call user's __init__() method
if params_ptr is not None:
raise bt2.Error('cannot directly instantiate a user component from a Python module')
@staticmethod
- def _set_iterator_class(cls, iter_cls):
+ def _bt_set_iterator_class(cls, iter_cls):
if iter_cls is None:
raise bt2.IncompleteUserClass("cannot create component class '{}': missing message iterator class".format(cls.__name__))
@property
def name(cls):
- ptr = cls._as_component_class_ptr(cls._cc_ptr)
+ ptr = cls._bt_as_component_class_ptr(cls._bt_cc_ptr)
return native_bt.component_class_get_name(ptr)
@property
def description(cls):
- ptr = cls._as_component_class_ptr(cls._cc_ptr)
+ ptr = cls._bt_as_component_class_ptr(cls._bt_cc_ptr)
return native_bt.component_class_get_description(ptr)
@property
def help(cls):
- ptr = cls._as_component_class_ptr(cls._cc_ptr)
+ ptr = cls._bt_as_component_class_ptr(cls._bt_cc_ptr)
return native_bt.component_class_get_help(ptr)
@property
def addr(cls):
- return int(cls._cc_ptr)
+ return int(cls._bt_cc_ptr)
- def _query_from_native(cls, query_exec_ptr, obj, params_ptr, log_level):
+ def _bt_query_from_native(cls, query_exec_ptr, obj, params_ptr, log_level):
# this can raise, in which case the native call to
# bt_component_class_query() returns NULL
if params_ptr is not None:
def _query(cls, query_executor, obj, params, log_level):
raise NotImplementedError
- def _component_class_ptr(self):
- return self._as_component_class_ptr(self._cc_ptr)
+ def _bt_component_class_ptr(self):
+ return self._bt_as_component_class_ptr(self._bt_cc_ptr)
def __del__(cls):
- if hasattr(cls, '_cc_ptr'):
- cc_ptr = cls._as_component_class_ptr(cls._cc_ptr)
+ if hasattr(cls, '_bt_cc_ptr'):
+ cc_ptr = cls._bt_as_component_class_ptr(cls._bt_cc_ptr)
native_bt.component_class_put_ref(cc_ptr)
# Subclasses must provide these methods or property:
#
-# - _as_not_self_specific_component_ptr: static method, must return the passed
+# - _bt_as_not_self_specific_component_ptr: static method, must return the passed
# specialized self component pointer (e.g. 'bt_self_component_sink *') as a
# specialized non-self pointer (e.g. 'bt_component_sink *').
-# - _borrow_component_class_ptr: static method, must return a pointer to the
+# - _bt_borrow_component_class_ptr: static method, must return a pointer to the
# specialized component class (e.g. 'bt_component_class_sink *') of the
# passed specialized component pointer (e.g. 'bt_component_sink *').
-# - _comp_cls_type: property, one of the native_bt.COMPONENT_CLASS_TYPE_*
+# - _bt_comp_cls_type: property, one of the native_bt.COMPONENT_CLASS_TYPE_*
# constants.
class _UserComponent(metaclass=_UserComponentType):
@property
def name(self):
- ptr = self._as_not_self_specific_component_ptr(self._ptr)
- ptr = self._as_component_ptr(ptr)
+ ptr = self._bt_as_not_self_specific_component_ptr(self._bt_ptr)
+ ptr = self._bt_as_component_ptr(ptr)
name = native_bt.component_get_name(ptr)
assert name is not None
return name
@property
def logging_level(self):
- ptr = self._as_not_self_specific_component_ptr(self._ptr)
- ptr = self._as_component_ptr(ptr)
+ ptr = self._bt_as_not_self_specific_component_ptr(self._bt_ptr)
+ ptr = self._bt_as_component_ptr(ptr)
return native_bt.component_get_logging_level(ptr)
@property
def cls(self):
- comp_ptr = self._as_not_self_specific_component_ptr(self._ptr)
- cc_ptr = self._borrow_component_class_ptr(comp_ptr)
- return _create_component_class_from_ptr_and_get_ref(cc_ptr, self._comp_cls_type)
+ comp_ptr = self._bt_as_not_self_specific_component_ptr(self._bt_ptr)
+ cc_ptr = self._bt_borrow_component_class_ptr(comp_ptr)
+ return _create_component_class_from_ptr_and_get_ref(cc_ptr, self._bt_comp_cls_type)
@property
def addr(self):
- return int(self._ptr)
+ return int(self._bt_ptr)
def __init__(self, params=None):
pass
def _port_connected(self, port, other_port):
pass
- def _port_connected_from_native(self, self_port_ptr, self_port_type, other_port_ptr):
+ def _bt_port_connected_from_native(self, self_port_ptr, self_port_type, other_port_ptr):
port = bt2.port._create_self_from_ptr_and_get_ref(
self_port_ptr, self_port_type)
other_port_ptr, other_port_type)
self._port_connected(port, other_port)
- def _graph_is_configured_from_native(self):
+ def _bt_graph_is_configured_from_native(self):
self._graph_is_configured()
- def _create_trace_class(self, env=None, uuid=None,
- assigns_automatic_stream_class_id=True):
- ptr = self._as_self_component_ptr(self._ptr)
+ def _create_trace_class(self, assigns_automatic_stream_class_id=True):
+ ptr = self._bt_as_self_component_ptr(self._bt_ptr)
tc_ptr = native_bt.trace_class_create(ptr)
if tc_ptr is None:
raise bt2.CreationError('could not create trace class')
tc = bt2._TraceClass._create_from_ptr(tc_ptr)
-
- if env is not None:
- for key, value in env.items():
- tc.env[key] = value
-
- if uuid is not None:
- tc._uuid = uuid
-
tc._assigns_automatic_stream_class_id = assigns_automatic_stream_class_id
return tc
def _create_clock_class(self, frequency=None, name=None, description=None,
precision=None, offset=None, origin_is_unix_epoch=True,
uuid=None):
- ptr = self._as_self_component_ptr(self._ptr)
+ ptr = self._bt_as_self_component_ptr(self._bt_ptr)
cc_ptr = native_bt.clock_class_create(ptr)
if cc_ptr is None:
class _UserSourceComponent(_UserComponent, _SourceComponent):
- _as_not_self_specific_component_ptr = staticmethod(native_bt.self_component_source_as_component_source)
- _as_self_component_ptr = staticmethod(native_bt.self_component_source_as_self_component)
+ _bt_as_not_self_specific_component_ptr = staticmethod(native_bt.self_component_source_as_component_source)
+ _bt_as_self_component_ptr = staticmethod(native_bt.self_component_source_as_self_component)
@property
def _output_ports(self):
def get_output_port_count(self_ptr):
- ptr = self._as_not_self_specific_component_ptr(self_ptr)
+ ptr = self._bt_as_not_self_specific_component_ptr(self_ptr)
return native_bt.component_source_get_output_port_count(ptr)
- return _ComponentPorts(self._ptr,
+ return _ComponentPorts(self._bt_ptr,
native_bt.self_component_source_borrow_output_port_by_name,
native_bt.self_component_source_borrow_output_port_by_index,
get_output_port_count,
def _add_output_port(self, name, user_data=None):
utils._check_str(name)
fn = native_bt.self_component_source_add_output_port
- comp_status, self_port_ptr = fn(self._ptr, name, user_data)
+ comp_status, self_port_ptr = fn(self._bt_ptr, name, user_data)
utils._handle_func_status(comp_status,
'cannot add output port to source component object')
assert self_port_ptr is not None
class _UserFilterComponent(_UserComponent, _FilterComponent):
- _as_not_self_specific_component_ptr = staticmethod(native_bt.self_component_filter_as_component_filter)
- _as_self_component_ptr = staticmethod(native_bt.self_component_filter_as_self_component)
+ _bt_as_not_self_specific_component_ptr = staticmethod(native_bt.self_component_filter_as_component_filter)
+ _bt_as_self_component_ptr = staticmethod(native_bt.self_component_filter_as_self_component)
@property
def _output_ports(self):
def get_output_port_count(self_ptr):
- ptr = self._as_not_self_specific_component_ptr(self_ptr)
+ ptr = self._bt_as_not_self_specific_component_ptr(self_ptr)
return native_bt.component_filter_get_output_port_count(ptr)
- return _ComponentPorts(self._ptr,
+ return _ComponentPorts(self._bt_ptr,
native_bt.self_component_filter_borrow_output_port_by_name,
native_bt.self_component_filter_borrow_output_port_by_index,
get_output_port_count,
@property
def _input_ports(self):
def get_input_port_count(self_ptr):
- ptr = self._as_not_self_specific_component_ptr(self_ptr)
+ ptr = self._bt_as_not_self_specific_component_ptr(self_ptr)
return native_bt.component_filter_get_input_port_count(ptr)
- return _ComponentPorts(self._ptr,
+ return _ComponentPorts(self._bt_ptr,
native_bt.self_component_filter_borrow_input_port_by_name,
native_bt.self_component_filter_borrow_input_port_by_index,
get_input_port_count,
def _add_output_port(self, name, user_data=None):
utils._check_str(name)
fn = native_bt.self_component_filter_add_output_port
- comp_status, self_port_ptr = fn(self._ptr, name, user_data)
+ comp_status, self_port_ptr = fn(self._bt_ptr, name, user_data)
utils._handle_func_status(comp_status,
'cannot add output port to filter component object')
assert self_port_ptr
def _add_input_port(self, name, user_data=None):
utils._check_str(name)
fn = native_bt.self_component_filter_add_input_port
- comp_status, self_port_ptr = fn(self._ptr, name, user_data)
+ comp_status, self_port_ptr = fn(self._bt_ptr, name, user_data)
utils._handle_func_status(comp_status,
'cannot add input port to filter component object')
assert self_port_ptr
class _UserSinkComponent(_UserComponent, _SinkComponent):
- _as_not_self_specific_component_ptr = staticmethod(native_bt.self_component_sink_as_component_sink)
- _as_self_component_ptr = staticmethod(native_bt.self_component_sink_as_self_component)
+ _bt_as_not_self_specific_component_ptr = staticmethod(native_bt.self_component_sink_as_component_sink)
+ _bt_as_self_component_ptr = staticmethod(native_bt.self_component_sink_as_self_component)
@property
def _input_ports(self):
def get_input_port_count(self_ptr):
- ptr = self._as_not_self_specific_component_ptr(self_ptr)
+ ptr = self._bt_as_not_self_specific_component_ptr(self_ptr)
return native_bt.component_sink_get_input_port_count(ptr)
- return _ComponentPorts(self._ptr,
+ return _ComponentPorts(self._bt_ptr,
native_bt.self_component_sink_borrow_input_port_by_name,
native_bt.self_component_sink_borrow_input_port_by_index,
get_input_port_count,
def _add_input_port(self, name, user_data=None):
utils._check_str(name)
fn = native_bt.self_component_sink_add_input_port
- comp_status, self_port_ptr = fn(self._ptr, name, user_data)
+ comp_status, self_port_ptr = fn(self._bt_ptr, name, user_data)
utils._handle_func_status(comp_status,
'cannot add input port to sink component object')
assert self_port_ptr