import bt2.message_iterator
import collections.abc
import bt2.value
+import bt2.trace_class
import traceback
import bt2.port
import sys
# pointer to a 'bt_component_class *'.
-class _GenericComponentClass(object._SharedObject):
+class _ComponentClass(object._SharedObject):
@property
def name(self):
ptr = self._bt_as_component_class_ptr(self._ptr)
return self._bt_as_component_class_ptr(self._ptr)
def __eq__(self, other):
- if not isinstance(other, _GenericComponentClass):
+ if not isinstance(other, _ComponentClass):
try:
if not issubclass(other, _UserComponent):
return False
return self.addr == other.addr
-class _GenericSourceComponentClass(_GenericComponentClass):
+class _SourceComponentClass(_ComponentClass):
_get_ref = staticmethod(native_bt.component_class_source_get_ref)
_put_ref = staticmethod(native_bt.component_class_source_put_ref)
_bt_as_component_class_ptr = staticmethod(
)
-class _GenericFilterComponentClass(_GenericComponentClass):
+class _FilterComponentClass(_ComponentClass):
_get_ref = staticmethod(native_bt.component_class_filter_get_ref)
_put_ref = staticmethod(native_bt.component_class_filter_put_ref)
_bt_as_component_class_ptr = staticmethod(
)
-class _GenericSinkComponentClass(_GenericComponentClass):
+class _SinkComponentClass(_ComponentClass):
_get_ref = staticmethod(native_bt.component_class_sink_get_ref)
_put_ref = staticmethod(native_bt.component_class_sink_put_ref)
_bt_as_component_class_ptr = staticmethod(
_bt_as_component_ptr = staticmethod(native_bt.component_sink_as_component_const)
-# This is analogous to _GenericSourceComponentClass, but for source
+# This is analogous to _SourceComponentClass, but for source
# component objects.
class _GenericSourceComponent(object._SharedObject, _SourceComponent):
_get_ref = staticmethod(native_bt.component_source_get_ref)
)
-# This is analogous to _GenericFilterComponentClass, but for filter
+# This is analogous to _FilterComponentClass, but for filter
# component objects.
class _GenericFilterComponent(object._SharedObject, _FilterComponent):
_get_ref = staticmethod(native_bt.component_filter_get_ref)
)
-# This is analogous to _GenericSinkComponentClass, but for sink
+# This is analogous to _SinkComponentClass, but for sink
# component objects.
class _GenericSinkComponent(object._SharedObject, _SinkComponent):
_get_ref = staticmethod(native_bt.component_sink_get_ref)
_COMP_CLS_TYPE_TO_GENERIC_COMP_CLS_PYCLS = {
- native_bt.COMPONENT_CLASS_TYPE_SOURCE: _GenericSourceComponentClass,
- native_bt.COMPONENT_CLASS_TYPE_FILTER: _GenericFilterComponentClass,
- native_bt.COMPONENT_CLASS_TYPE_SINK: _GenericSinkComponentClass,
+ native_bt.COMPONENT_CLASS_TYPE_SOURCE: _SourceComponentClass,
+ native_bt.COMPONENT_CLASS_TYPE_FILTER: _FilterComponentClass,
+ native_bt.COMPONENT_CLASS_TYPE_SINK: _SinkComponentClass,
}
# Create a component class Python object of type
-# _GenericSourceComponentClass, _GenericFilterComponentClass or
-# _GenericSinkComponentClass, depending on comp_cls_type.
+# _SourceComponentClass, _FilterComponentClass or
+# _SinkComponentClass, depending on comp_cls_type.
#
# Acquires a new reference to ptr.
# creates a native BT component class of the corresponding type and
# associates it with this user-defined class. The metaclass also defines
# class methods like the `name` and `description` properties to match
-# the _GenericComponentClass interface.
+# the _ComponentClass interface.
#
# The component class name which is used is either:
#
)
elif _UserSinkComponent in bases:
if not hasattr(cls, '_consume'):
- raise bt2.IncompleteUserClass(
+ raise bt2._IncompleteUserClass(
"cannot create component class '{}': missing a _consume() method".format(
class_name
)
cls, comp_cls_name, comp_cls_descr, comp_cls_help
)
else:
- raise bt2.IncompleteUserClass(
+ raise bt2._IncompleteUserClass(
"cannot find a known component class base in the bases of '{}'".format(
class_name
)
)
if cc_ptr is None:
- raise bt2.CreationError(
+ raise bt2._MemoryError(
"cannot create component class '{}'".format(class_name)
)
return self
def __call__(cls, *args, **kwargs):
- raise bt2.Error(
+ raise RuntimeError(
'cannot directly instantiate a user component from a Python module'
)
@staticmethod
def _bt_set_iterator_class(cls, iter_cls):
if iter_cls is None:
- raise bt2.IncompleteUserClass(
+ raise bt2._IncompleteUserClass(
"cannot create component class '{}': missing message iterator class".format(
cls.__name__
)
)
if not issubclass(iter_cls, bt2.message_iterator._UserMessageIterator):
- raise bt2.IncompleteUserClass(
+ raise bt2._IncompleteUserClass(
"cannot create component class '{}': message iterator class does not inherit bt2._UserMessageIterator".format(
cls.__name__
)
)
if not hasattr(iter_cls, '__next__'):
- raise bt2.IncompleteUserClass(
+ raise bt2._IncompleteUserClass(
"cannot create component class '{}': message iterator class is missing a __next__() method".format(
cls.__name__
)
tc_ptr = native_bt.trace_class_create(ptr)
if tc_ptr is None:
- raise bt2.CreationError('could not create trace class')
+ raise bt2._MemoryError('could not create trace class')
- tc = bt2._TraceClass._create_from_ptr(tc_ptr)
+ tc = bt2.trace_class._TraceClass._create_from_ptr(tc_ptr)
tc._assigns_automatic_stream_class_id = assigns_automatic_stream_class_id
return tc
cc_ptr = native_bt.clock_class_create(ptr)
if cc_ptr is None:
- raise bt2.CreationError('could not create clock class')
+ raise bt2._MemoryError('could not create clock class')
cc = bt2.clock_class._ClockClass._create_from_ptr(cc_ptr)
)
assert self_port_ptr
return bt2.port._UserComponentInputPort._create_from_ptr(self_port_ptr)
+
+ def _create_input_port_message_iterator(self, input_port):
+ utils._check_type(input_port, bt2.port._UserComponentInputPort)
+
+ msg_iter_ptr = native_bt.self_component_port_input_message_iterator_create_from_sink_component(
+ self._bt_ptr, input_port._ptr
+ )
+
+ if msg_iter_ptr is None:
+ raise bt2.CreationError('cannot create message iterator object')
+
+ return bt2.message_iterator._UserComponentInputPortMessageIterator(msg_iter_ptr)
+
+ @property
+ def _is_interrupted(self):
+ return bool(native_bt.self_component_sink_is_interrupted(self._bt_ptr))