#
# Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com>
-from bt2 import native_bt, object, utils
+from bt2 import native_bt
+from bt2 import object as bt2_object
+from bt2 import utils as bt2_utils
import collections.abc
from bt2 import value as bt2_value
from bt2 import stream as bt2_stream
self._trace = trace
def __getitem__(self, key):
- utils._check_str(key)
+ bt2_utils._check_str(key)
borrow_entry_fn = native_bt.trace_borrow_environment_entry_value_by_name_const
value_ptr = borrow_entry_fn(self._trace._ptr, key)
raise TypeError("expected str or int, got {}".format(type(value)))
status = set_env_entry_fn(self._trace._ptr, key, value)
- utils._handle_func_status(status, "cannot set trace object's environment entry")
+ bt2_utils._handle_func_status(
+ status, "cannot set trace object's environment entry"
+ )
def __delitem__(self, key):
raise NotImplementedError
-class _TraceConst(object._SharedObject, collections.abc.Mapping):
+class _TraceConst(bt2_object._SharedObject, collections.abc.Mapping):
@staticmethod
def _get_ref(ptr):
native_bt.trace_get_ref(ptr)
return count
def __getitem__(self, id):
- utils._check_uint64(id)
+ bt2_utils._check_uint64(id)
stream_ptr = self._borrow_stream_ptr_by_id(self._ptr, id)
if not callable(listener):
raise TypeError("'listener' parameter is not callable")
- handle = utils._ListenerHandle(self.addr)
+ handle = bt2_utils._ListenerHandle(self.addr)
fn = native_bt.bt2_trace_add_destruction_listener
listener_from_native = functools.partial(
)
status, listener_id = fn(self._ptr, listener_from_native)
- utils._handle_func_status(
+ bt2_utils._handle_func_status(
status, "cannot add destruction listener to trace object"
)
return handle
def remove_destruction_listener(self, listener_handle):
- utils._check_type(listener_handle, utils._ListenerHandle)
+ bt2_utils._check_type(listener_handle, bt2_utils._ListenerHandle)
if listener_handle._addr != self.addr:
raise ValueError(
status = native_bt.trace_remove_destruction_listener(
self._ptr, listener_handle._listener_id
)
- utils._handle_func_status(status)
+ bt2_utils._handle_func_status(status)
listener_handle._invalidate()
_trace_env_pycls = property(lambda _: _TraceEnvironment)
def _name(self, name):
- utils._check_str(name)
+ bt2_utils._check_str(name)
status = native_bt.trace_set_name(self._ptr, name)
- utils._handle_func_status(status, "cannot set trace class object's name")
+ bt2_utils._handle_func_status(status, "cannot set trace class object's name")
_name = property(fset=_name)
def _user_attributes(self, user_attributes):
value = bt2_value.create_value(user_attributes)
- utils._check_type(value, bt2_value.MapValue)
+ bt2_utils._check_type(value, bt2_value.MapValue)
native_bt.trace_set_user_attributes(self._ptr, value._ptr)
_user_attributes = property(fset=_user_attributes)
def _uuid(self, uuid):
- utils._check_type(uuid, uuidp.UUID)
+ bt2_utils._check_type(uuid, uuidp.UUID)
native_bt.trace_set_uuid(self._ptr, uuid.bytes)
_uuid = property(fset=_uuid)
def create_stream(self, stream_class, id=None, name=None, user_attributes=None):
- utils._check_type(stream_class, bt2_stream_class._StreamClass)
+ bt2_utils._check_type(stream_class, bt2_stream_class._StreamClass)
if stream_class.assigns_automatic_stream_id:
if id is not None:
"id not provided, but stream class does not assign automatic stream ids"
)
- utils._check_uint64(id)
+ bt2_utils._check_uint64(id)
stream_ptr = native_bt.stream_create_with_id(
stream_class._ptr, self._ptr, id
)