def _accept_port_connection_from_native(self, self_port_ptr, self_port_type, other_port_ptr):
port = bt2.port._create_self_from_ptr_and_get_ref(
self_port_ptr, self_port_type)
- other_port_type = native_bt.PORT_TYPE_INPUT if self_port_type == native_bt.PORT_TYPE_OUTPUT else native_bt.PORT_TYPE_OUTPUT
+
+ if self_port_type == native_bt.PORT_TYPE_OUTPUT:
+ other_port_type = native_bt.PORT_TYPE_INPUT
+ else:
+ other_port_type = native_bt.PORT_TYPE_OUTPUT
+
other_port = bt2.port._create_from_ptr_and_get_ref(
other_port_ptr, other_port_type)
res = self._accept_port_connection(port, other_port_ptr)
def _port_connected_from_native(self, self_port_ptr, self_port_type, other_port_ptr):
port = bt2.port._create_self_from_ptr_and_get_ref(
self_port_ptr, self_port_type)
- other_port_type = native_bt.PORT_TYPE_INPUT if self_port_type == native_bt.PORT_TYPE_OUTPUT else native_bt.PORT_TYPE_OUTPUT
+
+ if self_port_type == native_bt.PORT_TYPE_OUTPUT:
+ other_port_type = native_bt.PORT_TYPE_INPUT
+ else:
+ other_port_type = native_bt.PORT_TYPE_OUTPUT
+
other_port = bt2.port._create_from_ptr_and_get_ref(
other_port_ptr, other_port_type)
self._port_connected(port, other_port)
+ def _create_trace_class(self, env=None, uuid=None,
+ assigns_automatic_stream_class_id=True):
+ ptr = self._as_self_component_ptr(self._ptr)
+ tc_ptr = native_bt.trace_class_create(ptr)
+
+ if tc_ptr is None:
+ raise bt2.CreationError('could not create trace class')
+
+ tc = bt2.TraceClass._create_from_ptr(tc_ptr)
+
+ if env is not None:
+ for key, value in env.items():
+ tc.env[key] = value
+
+ if uuid is not None:
+ tc._uuid = uuid
+
+ tc._assigns_automatic_stream_class_id = assigns_automatic_stream_class_id
+
+ return tc
+
+ def _create_clock_class(self, frequency=None):
+ ptr = self._as_self_component_ptr(self._ptr)
+ cc_ptr = native_bt.clock_class_create(ptr)
+
+ if cc_ptr is None:
+ raise bt2.CreationError('could not create clock class')
+
+ cc = bt2.ClockClass._create_from_ptr(cc_ptr)
+
+ if frequency is not None:
+ cc._frequency = frequency
+
+ return cc
+
class _UserSourceComponent(_UserComponent, _SourceComponent):
_as_not_self_specific_component_ptr = staticmethod(native_bt.self_component_source_as_component_source)
+ _as_self_component_ptr = staticmethod(native_bt.self_component_source_as_self_component)
@property
def _output_ports(self):
class _UserFilterComponent(_UserComponent, _FilterComponent):
_as_not_self_specific_component_ptr = staticmethod(native_bt.self_component_filter_as_component_filter)
+ _as_self_component_ptr = staticmethod(native_bt.self_component_filter_as_self_component)
@property
def _output_ports(self):
class _UserSinkComponent(_UserComponent, _SinkComponent):
_as_not_self_specific_component_ptr = staticmethod(native_bt.self_component_sink_as_component_sink)
+ _as_self_component_ptr = staticmethod(native_bt.self_component_sink_as_self_component)
@property
def _input_ports(self):