1 # SPDX-License-Identifier: MIT
3 # Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com>
5 from bt2
import native_bt
, object
9 from bt2
import connection
as bt2_connection
14 def _create_from_const_ptr_and_get_ref(ptr
, port_type
):
15 cls
= _PORT_TYPE_TO_PYCLS
.get(port_type
, None)
18 raise TypeError('unknown port type: {}'.format(port_type
))
20 return cls
._create
_from
_ptr
_and
_get
_ref
(ptr
)
23 def _create_self_from_ptr_and_get_ref(ptr
, port_type
):
24 cls
= _PORT_TYPE_TO_USER_PYCLS
.get(port_type
, None)
27 raise TypeError('unknown port type: {}'.format(port_type
))
29 return cls
._create
_from
_ptr
_and
_get
_ref
(ptr
)
32 class _PortConst(object._SharedObject
):
34 def _get_ref(cls
, ptr
):
35 ptr
= cls
._as
_port
_ptr
(ptr
)
36 return native_bt
.port_get_ref(ptr
)
39 def _put_ref(cls
, ptr
):
40 ptr
= cls
._as
_port
_ptr
(ptr
)
41 return native_bt
.port_put_ref(ptr
)
45 ptr
= self
._as
_port
_ptr
(self
._ptr
)
46 name
= native_bt
.port_get_name(ptr
)
47 assert name
is not None
52 ptr
= self
._as
_port
_ptr
(self
._ptr
)
53 conn_ptr
= native_bt
.port_borrow_connection_const(ptr
)
58 return _bt2_connection()._ConnectionConst
._create
_from
_ptr
_and
_get
_ref
(conn_ptr
)
61 def is_connected(self
):
62 return self
.connection
is not None
65 class _InputPortConst(_PortConst
):
66 _as_port_ptr
= staticmethod(native_bt
.port_input_as_port_const
)
69 class _OutputPortConst(_PortConst
):
70 _as_port_ptr
= staticmethod(native_bt
.port_output_as_port_const
)
73 class _UserComponentPort(_PortConst
):
75 def _as_port_ptr(cls
, ptr
):
76 ptr
= cls
._as
_self
_port
_ptr
(ptr
)
77 return native_bt
.self_component_port_as_port(ptr
)
81 ptr
= self
._as
_port
_ptr
(self
._ptr
)
82 conn_ptr
= native_bt
.port_borrow_connection_const(ptr
)
87 return _bt2_connection()._ConnectionConst
._create
_from
_ptr
_and
_get
_ref
(conn_ptr
)
91 ptr
= self
._as
_self
_port
_ptr
(self
._ptr
)
92 return native_bt
.self_component_port_get_data(ptr
)
95 class _UserComponentInputPort(_UserComponentPort
, _InputPortConst
):
96 _as_self_port_ptr
= staticmethod(
97 native_bt
.self_component_port_input_as_self_component_port
101 class _UserComponentOutputPort(_UserComponentPort
, _OutputPortConst
):
102 _as_self_port_ptr
= staticmethod(
103 native_bt
.self_component_port_output_as_self_component_port
107 _PORT_TYPE_TO_PYCLS
= {
108 native_bt
.PORT_TYPE_INPUT
: _InputPortConst
,
109 native_bt
.PORT_TYPE_OUTPUT
: _OutputPortConst
,
113 _PORT_TYPE_TO_USER_PYCLS
= {
114 native_bt
.PORT_TYPE_INPUT
: _UserComponentInputPort
,
115 native_bt
.PORT_TYPE_OUTPUT
: _UserComponentOutputPort
,