#
# Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com>
-from bt2 import native_bt, object, utils
+from bt2 import native_bt
+from bt2 import object as bt2_object
+from bt2 import utils as bt2_utils
from bt2 import message_iterator as bt2_message_iterator
import collections.abc
from bt2 import value as bt2_value
# pointer to a 'bt_component_class *'.
-class _ComponentClassConst(object._SharedObject):
+class _ComponentClassConst(bt2_object._SharedObject):
@property
def name(self):
ptr = self._bt_as_component_class_ptr(self._ptr)
self._port_pycls = port_pycls
def __getitem__(self, key):
- utils._check_str(key)
+ bt2_utils._check_str(key)
port_ptr = self._borrow_port_ptr_by_name(self._component_ptr, key)
if port_ptr is None:
# This is analogous to _SourceComponentClassConst, but for source
# component objects.
-class _GenericSourceComponentConst(object._SharedObject, _SourceComponentConst):
+class _GenericSourceComponentConst(bt2_object._SharedObject, _SourceComponentConst):
@staticmethod
def _get_ref(ptr):
native_bt.component_source_get_ref(ptr)
# This is analogous to _FilterComponentClassConst, but for filter
# component objects.
-class _GenericFilterComponentConst(object._SharedObject, _FilterComponentConst):
+class _GenericFilterComponentConst(bt2_object._SharedObject, _FilterComponentConst):
@staticmethod
def _get_ref(ptr):
native_bt.component_filter_get_ref(ptr)
# This is analogous to _SinkComponentClassConst, but for sink
# component objects.
-class _GenericSinkComponentConst(object._SharedObject, _SinkComponentConst):
+class _GenericSinkComponentConst(bt2_object._SharedObject, _SinkComponentConst):
@staticmethod
def _get_ref(ptr):
native_bt.component_sink_get_ref(ptr)
return
comp_cls_name = kwargs.get("name", class_name)
- utils._check_str(comp_cls_name)
+ bt2_utils._check_str(comp_cls_name)
comp_cls_descr = None
comp_cls_help = None
if hasattr(cls, "__doc__") and cls.__doc__ is not None:
- utils._check_str(cls.__doc__)
+ bt2_utils._check_str(cls.__doc__)
docstring = _trim_docstring(cls.__doc__)
lines = docstring.splitlines()
)
def _add_output_port(self, name, user_data=None):
- utils._check_str(name)
+ bt2_utils._check_str(name)
if name in self._output_ports:
raise ValueError(
fn = native_bt.self_component_source_add_output_port
comp_status, self_port_ptr = fn(self._bt_ptr, name, user_data)
- utils._handle_func_status(
+ bt2_utils._handle_func_status(
comp_status, "cannot add output port to source component object"
)
assert self_port_ptr is not None
)
def _add_output_port(self, name, user_data=None):
- utils._check_str(name)
+ bt2_utils._check_str(name)
if name in self._output_ports:
raise ValueError(
fn = native_bt.self_component_filter_add_output_port
comp_status, self_port_ptr = fn(self._bt_ptr, name, user_data)
- utils._handle_func_status(
+ bt2_utils._handle_func_status(
comp_status, "cannot add output port to filter component object"
)
assert self_port_ptr
)
def _add_input_port(self, name, user_data=None):
- utils._check_str(name)
+ bt2_utils._check_str(name)
if name in self._input_ports:
raise ValueError(
fn = native_bt.self_component_filter_add_input_port
comp_status, self_port_ptr = fn(self._bt_ptr, name, user_data)
- utils._handle_func_status(
+ bt2_utils._handle_func_status(
comp_status, "cannot add input port to filter component object"
)
assert self_port_ptr
)
def _add_input_port(self, name, user_data=None):
- utils._check_str(name)
+ bt2_utils._check_str(name)
if name in self._input_ports:
raise ValueError(
fn = native_bt.self_component_sink_add_input_port
comp_status, self_port_ptr = fn(self._bt_ptr, name, user_data)
- utils._handle_func_status(
+ bt2_utils._handle_func_status(
comp_status, "cannot add input port to sink component object"
)
assert self_port_ptr
)
def _create_message_iterator(self, input_port):
- utils._check_type(input_port, bt2_port._UserComponentInputPort)
+ bt2_utils._check_type(input_port, bt2_port._UserComponentInputPort)
if not input_port.is_connected:
raise ValueError("input port is not connected")
) = native_bt.bt2_message_iterator_create_from_sink_component(
self._bt_ptr, input_port._ptr
)
- utils._handle_func_status(status, "cannot create message iterator object")
+ bt2_utils._handle_func_status(status, "cannot create message iterator object")
assert msg_iter_ptr is not None
return bt2_message_iterator._UserComponentInputPortMessageIterator(msg_iter_ptr)