# THE SOFTWARE.
from bt2 import native_bt, object, utils
-import bt2.message_iterator
+from bt2 import message_iterator as bt2_message_iterator
import collections.abc
-import bt2.value
-import bt2.trace_class
+from bt2 import value as bt2_value
+from bt2 import trace_class as bt2_trace_class
+from bt2 import clock_class as bt2_clock_class
+from bt2 import query_executor as bt2_query_executor
import traceback
-import bt2.port
+from bt2 import port as bt2_port
import sys
import bt2
import os
native_bt.component_source_borrow_output_port_by_name_const,
native_bt.component_source_borrow_output_port_by_index_const,
native_bt.component_source_get_output_port_count,
- bt2.port._OutputPort,
+ bt2_port._OutputPort,
)
native_bt.component_filter_borrow_output_port_by_name_const,
native_bt.component_filter_borrow_output_port_by_index_const,
native_bt.component_filter_get_output_port_count,
- bt2.port._OutputPort,
+ bt2_port._OutputPort,
)
@property
native_bt.component_filter_borrow_input_port_by_name_const,
native_bt.component_filter_borrow_input_port_by_index_const,
native_bt.component_filter_get_input_port_count,
- bt2.port._InputPort,
+ bt2_port._InputPort,
)
native_bt.component_sink_borrow_input_port_by_name_const,
native_bt.component_sink_borrow_input_port_by_index_const,
native_bt.component_sink_get_input_port_count,
- bt2.port._InputPort,
+ bt2_port._InputPort,
)
# call user's __init__() method
if params_ptr is not None:
- params = bt2.value._create_from_ptr_and_get_ref(params_ptr)
+ params = bt2_value._create_from_ptr_and_get_ref(params_ptr)
else:
params = None
)
)
- if not issubclass(iter_cls, bt2.message_iterator._UserMessageIterator):
+ if not issubclass(iter_cls, bt2_message_iterator._UserMessageIterator):
raise bt2._IncompleteUserClass(
"cannot create component class '{}': message iterator class does not inherit bt2._UserMessageIterator".format(
cls.__name__
# this can raise, in which case the native call to
# bt_component_class_query() returns NULL
if params_ptr is not None:
- params = bt2.value._create_from_ptr_and_get_ref(params_ptr)
+ params = bt2_value._create_from_ptr_and_get_ref(params_ptr)
else:
params = None
- query_exec = bt2.QueryExecutor._create_from_ptr_and_get_ref(query_exec_ptr)
+ query_exec = bt2_query_executor.QueryExecutor._create_from_ptr_and_get_ref(
+ query_exec_ptr
+ )
# this can raise, but the native side checks the exception
results = cls._query(query_exec, obj, params, log_level)
results_ptr = results._ptr
# We return a new reference.
- bt2.value._Value._get_ref(results_ptr)
+ bt2_value._Value._get_ref(results_ptr)
return int(results_ptr)
def _bt_port_connected_from_native(
self, self_port_ptr, self_port_type, other_port_ptr
):
- port = bt2.port._create_self_from_ptr_and_get_ref(self_port_ptr, self_port_type)
+ port = bt2_port._create_self_from_ptr_and_get_ref(self_port_ptr, self_port_type)
if self_port_type == native_bt.PORT_TYPE_OUTPUT:
other_port_type = native_bt.PORT_TYPE_INPUT
else:
other_port_type = native_bt.PORT_TYPE_OUTPUT
- other_port = bt2.port._create_from_ptr_and_get_ref(
+ other_port = bt2_port._create_from_ptr_and_get_ref(
other_port_ptr, other_port_type
)
self._port_connected(port, other_port)
if tc_ptr is None:
raise bt2._MemoryError('could not create trace class')
- tc = bt2.trace_class._TraceClass._create_from_ptr(tc_ptr)
+ tc = bt2_trace_class._TraceClass._create_from_ptr(tc_ptr)
tc._assigns_automatic_stream_class_id = assigns_automatic_stream_class_id
return tc
if cc_ptr is None:
raise bt2._MemoryError('could not create clock class')
- cc = bt2.clock_class._ClockClass._create_from_ptr(cc_ptr)
+ cc = bt2_clock_class._ClockClass._create_from_ptr(cc_ptr)
if frequency is not None:
cc._frequency = frequency
native_bt.self_component_source_borrow_output_port_by_name,
native_bt.self_component_source_borrow_output_port_by_index,
get_output_port_count,
- bt2.port._UserComponentOutputPort,
+ bt2_port._UserComponentOutputPort,
)
def _add_output_port(self, name, user_data=None):
comp_status, 'cannot add output port to source component object'
)
assert self_port_ptr is not None
- return bt2.port._UserComponentOutputPort._create_from_ptr(self_port_ptr)
+ return bt2_port._UserComponentOutputPort._create_from_ptr(self_port_ptr)
class _UserFilterComponent(_UserComponent, _FilterComponent):
native_bt.self_component_filter_borrow_output_port_by_name,
native_bt.self_component_filter_borrow_output_port_by_index,
get_output_port_count,
- bt2.port._UserComponentOutputPort,
+ bt2_port._UserComponentOutputPort,
)
@property
native_bt.self_component_filter_borrow_input_port_by_name,
native_bt.self_component_filter_borrow_input_port_by_index,
get_input_port_count,
- bt2.port._UserComponentInputPort,
+ bt2_port._UserComponentInputPort,
)
def _add_output_port(self, name, user_data=None):
comp_status, 'cannot add output port to filter component object'
)
assert self_port_ptr
- return bt2.port._UserComponentOutputPort._create_from_ptr(self_port_ptr)
+ return bt2_port._UserComponentOutputPort._create_from_ptr(self_port_ptr)
def _add_input_port(self, name, user_data=None):
utils._check_str(name)
comp_status, 'cannot add input port to filter component object'
)
assert self_port_ptr
- return bt2.port._UserComponentInputPort._create_from_ptr(self_port_ptr)
+ return bt2_port._UserComponentInputPort._create_from_ptr(self_port_ptr)
class _UserSinkComponent(_UserComponent, _SinkComponent):
native_bt.self_component_sink_borrow_input_port_by_name,
native_bt.self_component_sink_borrow_input_port_by_index,
get_input_port_count,
- bt2.port._UserComponentInputPort,
+ bt2_port._UserComponentInputPort,
)
def _add_input_port(self, name, user_data=None):
comp_status, 'cannot add input port to sink component object'
)
assert self_port_ptr
- return bt2.port._UserComponentInputPort._create_from_ptr(self_port_ptr)
+ return bt2_port._UserComponentInputPort._create_from_ptr(self_port_ptr)
def _create_input_port_message_iterator(self, input_port):
- utils._check_type(input_port, bt2.port._UserComponentInputPort)
+ utils._check_type(input_port, bt2_port._UserComponentInputPort)
msg_iter_ptr = native_bt.self_component_port_input_message_iterator_create_from_sink_component(
self._bt_ptr, input_port._ptr
if msg_iter_ptr is None:
raise bt2.CreationError('cannot create message iterator object')
- return bt2.message_iterator._UserComponentInputPortMessageIterator(msg_iter_ptr)
+ return bt2_message_iterator._UserComponentInputPortMessageIterator(msg_iter_ptr)
@property
def _is_interrupted(self):