from bt2 import clock_class as bt2_clock_class
from bt2 import query_executor as bt2_query_executor
from bt2 import port as bt2_port
+from bt2 import error as bt2_error
+from bt2 import integer_range_set as bt2_integer_range_set
import sys
-import bt2
class _IncompleteUserClass(Exception):
)
elif _UserSinkComponent in bases:
if not hasattr(cls, "_user_consume"):
- raise bt2._IncompleteUserClass(
+ raise _IncompleteUserClass(
"cannot create component class '{}': missing a _user_consume() method".format(
class_name
)
cls, comp_cls_name, comp_cls_descr, comp_cls_help
)
else:
- raise bt2._IncompleteUserClass(
+ raise _IncompleteUserClass(
"cannot find a known component class base in the bases of '{}'".format(
class_name
)
)
if cc_ptr is None:
- raise bt2._MemoryError(
+ raise bt2_error._MemoryError(
"cannot create component class '{}'".format(class_name)
)
@staticmethod
def _bt_set_iterator_class(cls, iter_cls):
if iter_cls is None:
- raise bt2._IncompleteUserClass(
+ raise _IncompleteUserClass(
"cannot create component class '{}': missing message iterator class".format(
cls.__name__
)
)
if not issubclass(iter_cls, bt2_message_iterator._UserMessageIterator):
- raise bt2._IncompleteUserClass(
+ raise _IncompleteUserClass(
"cannot create component class '{}': message iterator class does not inherit bt2._UserMessageIterator".format(
cls.__name__
)
)
if not hasattr(iter_cls, "__next__"):
- raise bt2._IncompleteUserClass(
+ raise _IncompleteUserClass(
"cannot create component class '{}': message iterator class is missing a __next__() method".format(
cls.__name__
)
if hasattr(iter_cls, "_user_can_seek_ns_from_origin") and not hasattr(
iter_cls, "_user_seek_ns_from_origin"
):
- raise bt2._IncompleteUserClass(
+ raise _IncompleteUserClass(
"cannot create component class '{}': message iterator class implements _user_can_seek_ns_from_origin but not _user_seek_ns_from_origin".format(
cls.__name__
)
if hasattr(iter_cls, "_user_can_seek_beginning") and not hasattr(
iter_cls, "_user_seek_beginning"
):
- raise bt2._IncompleteUserClass(
+ raise _IncompleteUserClass(
"cannot create component class '{}': message iterator class implements _user_can_seek_beginning but not _user_seek_beginning".format(
cls.__name__
)
# this can raise, but the native side checks the exception
range_set = cls._user_get_supported_mip_versions(params, obj, log_level)
- if type(range_set) is not bt2.UnsignedIntegerRangeSet:
+ if type(range_set) is not bt2_integer_range_set.UnsignedIntegerRangeSet:
# this can raise, but the native side checks the exception
- range_set = bt2.UnsignedIntegerRangeSet(range_set)
+ range_set = bt2_integer_range_set.UnsignedIntegerRangeSet(range_set)
# return new reference
range_set._get_ref(range_set._ptr)
priv_query_exec._invalidate()
# this can raise, but the native side checks the exception
- results = bt2.create_value(results)
+ results = bt2_value.create_value(results)
if results is None:
results_ptr = native_bt.value_null
return int(results_ptr)
def _user_query(cls, priv_query_executor, object_name, params, method_obj):
- raise bt2.UnknownObject
+ raise bt2_utils.UnknownObject
def _bt_component_class_ptr(self):
return self._bt_as_component_class_ptr(self._bt_cc_ptr)
tc_ptr = native_bt.trace_class_create(ptr)
if tc_ptr is None:
- raise bt2._MemoryError("could not create trace class")
+ raise bt2_error._MemoryError("could not create trace class")
tc = bt2_trace_class._TraceClass._create_from_ptr(tc_ptr)
tc._assigns_automatic_stream_class_id = assigns_automatic_stream_class_id
cc_ptr = native_bt.clock_class_create(ptr)
if cc_ptr is None:
- raise bt2._MemoryError("could not create clock class")
+ raise bt2_error._MemoryError("could not create clock class")
cc = bt2_clock_class._ClockClass._create_from_ptr(cc_ptr)