component = bt2_component._create_component_from_ptr_and_get_ref(
component_ptr, component_type
)
- port = bt2_port._create_from_ptr_and_get_ref(port_ptr, port_type)
+ port = bt2_port._create_from_const_ptr_and_get_ref(port_ptr, port_type)
user_listener(component, port)
upstream_component = bt2_component._create_component_from_ptr_and_get_ref(
upstream_component_ptr, upstream_component_type
)
- upstream_port = bt2_port._create_from_ptr_and_get_ref(
+ upstream_port = bt2_port._create_from_const_ptr_and_get_ref(
upstream_port_ptr, native_bt.PORT_TYPE_OUTPUT
)
downstream_component = bt2_component._create_component_from_ptr_and_get_ref(
downstream_component_ptr, downstream_component_type
)
- downstream_port = bt2_port._create_from_ptr_and_get_ref(
+ downstream_port = bt2_port._create_from_const_ptr_and_get_ref(
downstream_port_ptr, native_bt.PORT_TYPE_INPUT
)
user_listener(
return bt2_component._create_component_from_ptr(comp_ptr, cc_type)
def connect_ports(self, upstream_port, downstream_port):
- utils._check_type(upstream_port, bt2_port._OutputPort)
- utils._check_type(downstream_port, bt2_port._InputPort)
+ utils._check_type(upstream_port, bt2_port._OutputPortConst)
+ utils._check_type(downstream_port, bt2_port._InputPortConst)
status, conn_ptr = native_bt.graph_connect_ports(
self._ptr, upstream_port._ptr, downstream_port._ptr
)