native_bt.component_source_borrow_output_port_by_name_const,
native_bt.component_source_borrow_output_port_by_index_const,
native_bt.component_source_get_output_port_count,
- bt2_port._OutputPort,
+ bt2_port._OutputPortConst,
)
native_bt.component_filter_borrow_output_port_by_name_const,
native_bt.component_filter_borrow_output_port_by_index_const,
native_bt.component_filter_get_output_port_count,
- bt2_port._OutputPort,
+ bt2_port._OutputPortConst,
)
@property
native_bt.component_filter_borrow_input_port_by_name_const,
native_bt.component_filter_borrow_input_port_by_index_const,
native_bt.component_filter_get_input_port_count,
- bt2_port._InputPort,
+ bt2_port._InputPortConst,
)
native_bt.component_sink_borrow_input_port_by_name_const,
native_bt.component_sink_borrow_input_port_by_index_const,
native_bt.component_sink_get_input_port_count,
- bt2_port._InputPort,
+ bt2_port._InputPortConst,
)
else:
other_port_type = native_bt.PORT_TYPE_OUTPUT
- other_port = bt2_port._create_from_ptr_and_get_ref(
+ other_port = bt2_port._create_from_const_ptr_and_get_ref(
other_port_ptr, other_port_type
)
self._user_port_connected(port, other_port)
@property
def downstream_port(self):
port_ptr = native_bt.connection_borrow_downstream_port_const(self._ptr)
- return bt2_port._create_from_ptr_and_get_ref(
+ return bt2_port._create_from_const_ptr_and_get_ref(
port_ptr, native_bt.PORT_TYPE_INPUT
)
@property
def upstream_port(self):
port_ptr = native_bt.connection_borrow_upstream_port_const(self._ptr)
- return bt2_port._create_from_ptr_and_get_ref(
+ return bt2_port._create_from_const_ptr_and_get_ref(
port_ptr, native_bt.PORT_TYPE_OUTPUT
)
component = bt2_component._create_component_from_ptr_and_get_ref(
component_ptr, component_type
)
- port = bt2_port._create_from_ptr_and_get_ref(port_ptr, port_type)
+ port = bt2_port._create_from_const_ptr_and_get_ref(port_ptr, port_type)
user_listener(component, port)
upstream_component = bt2_component._create_component_from_ptr_and_get_ref(
upstream_component_ptr, upstream_component_type
)
- upstream_port = bt2_port._create_from_ptr_and_get_ref(
+ upstream_port = bt2_port._create_from_const_ptr_and_get_ref(
upstream_port_ptr, native_bt.PORT_TYPE_OUTPUT
)
downstream_component = bt2_component._create_component_from_ptr_and_get_ref(
downstream_component_ptr, downstream_component_type
)
- downstream_port = bt2_port._create_from_ptr_and_get_ref(
+ downstream_port = bt2_port._create_from_const_ptr_and_get_ref(
downstream_port_ptr, native_bt.PORT_TYPE_INPUT
)
user_listener(
return bt2_component._create_component_from_ptr(comp_ptr, cc_type)
def connect_ports(self, upstream_port, downstream_port):
- utils._check_type(upstream_port, bt2_port._OutputPort)
- utils._check_type(downstream_port, bt2_port._InputPort)
+ utils._check_type(upstream_port, bt2_port._OutputPortConst)
+ utils._check_type(downstream_port, bt2_port._InputPortConst)
status, conn_ptr = native_bt.graph_connect_ports(
self._ptr, upstream_port._ptr, downstream_port._ptr
)
from bt2 import connection as bt2_connection
-def _create_from_ptr_and_get_ref(ptr, port_type):
+def _create_from_const_ptr_and_get_ref(ptr, port_type):
cls = _PORT_TYPE_TO_PYCLS.get(port_type, None)
if cls is None:
return cls._create_from_ptr_and_get_ref(ptr)
-class _Port(object._SharedObject):
+class _PortConst(object._SharedObject):
@classmethod
def _get_ref(cls, ptr):
ptr = cls._as_port_ptr(ptr)
return self.connection is not None
-class _InputPort(_Port):
+class _InputPortConst(_PortConst):
_as_port_ptr = staticmethod(native_bt.port_input_as_port_const)
-class _OutputPort(_Port):
+class _OutputPortConst(_PortConst):
_as_port_ptr = staticmethod(native_bt.port_output_as_port_const)
-class _UserComponentPort(_Port):
+class _UserComponentPort(_PortConst):
@classmethod
def _as_port_ptr(cls, ptr):
ptr = cls._as_self_port_ptr(ptr)
return native_bt.self_component_port_get_data(ptr)
-class _UserComponentInputPort(_UserComponentPort, _InputPort):
+class _UserComponentInputPort(_UserComponentPort, _InputPortConst):
_as_self_port_ptr = staticmethod(
native_bt.self_component_port_input_as_self_component_port
)
-class _UserComponentOutputPort(_UserComponentPort, _OutputPort):
+class _UserComponentOutputPort(_UserComponentPort, _OutputPortConst):
_as_self_port_ptr = staticmethod(
native_bt.self_component_port_output_as_self_component_port
)
_PORT_TYPE_TO_PYCLS = {
- native_bt.PORT_TYPE_INPUT: _InputPort,
- native_bt.PORT_TYPE_OUTPUT: _OutputPort,
+ native_bt.PORT_TYPE_INPUT: _InputPortConst,
+ native_bt.PORT_TYPE_OUTPUT: _OutputPortConst,
}
if not self._connect_ports:
return
- if type(port) is bt2_port._InputPort:
+ if type(port) is bt2_port._InputPortConst:
return
if component not in [comp.comp for comp in self._src_comps_and_specs]:
import unittest
import bt2
from bt2 import connection as bt2_connection
+from bt2 import port as bt2_port
class ConnectionTestCase(unittest.TestCase):
conn = graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
self.assertEqual(conn.downstream_port.addr, sink.input_ports['in'].addr)
self.assertIs(type(conn), bt2_connection._ConnectionConst)
+ self.assertIs(type(conn.downstream_port), bt2_port._InputPortConst)
def test_upstream_port(self):
class MyIter(bt2._UserMessageIterator):
sink = graph.add_component(MySink, 'sink')
conn = graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
self.assertEqual(conn.upstream_port.addr, src.output_ports['out'].addr)
+ self.assertIs(type(conn.upstream_port), bt2_port._OutputPortConst)
import unittest
import bt2
+from bt2 import port as bt2_port
class PortTestCase(unittest.TestCase):
comp = self._create_comp(MySource)
self.assertEqual(len(comp.output_ports), 1)
+ self.assertIs(type(comp.output_ports['out']), bt2_port._OutputPortConst)
def test_flt_add_output_port(self):
class MyIter(bt2._UserMessageIterator):
comp = self._create_comp(MyFilter)
self.assertEqual(len(comp.input_ports), 1)
+ self.assertIs(type(comp.input_ports['in']), bt2_port._InputPortConst)
def test_sink_add_input_port(self):
class MySink(bt2._UserSinkComponent):