# THE SOFTWARE.
from bt2 import native_bt, object
-import bt2.component
-import bt2.connection
-import bt2.message_iterator
-import bt2.message
+from bt2 import component as bt2_component
+from bt2 import connection as bt2_connection
+from bt2 import message_iterator as bt2_message_iterator
+from bt2 import message as bt2_message
import bt2
if conn_ptr is None:
return
- return bt2.connection._Connection._create_from_ptr_and_get_ref(conn_ptr)
+ return bt2_connection._Connection._create_from_ptr_and_get_ref(conn_ptr)
@property
def is_connected(self):
if conn_ptr is None:
return
- return bt2.connection._Connection._create_from_ptr_and_get_ref(conn_ptr)
+ return bt2_connection._Connection._create_from_ptr_and_get_ref(conn_ptr)
@property
def user_data(self):
class _UserComponentInputPort(_UserComponentPort, _InputPort):
- _as_self_port_ptr = staticmethod(native_bt.self_component_port_input_as_self_component_port)
-
- def create_message_iterator(self):
- msg_iter_ptr = native_bt.self_component_port_input_message_iterator_create(self._ptr)
- if msg_iter_ptr is None:
- raise bt2.CreationError('cannot create message iterator object')
-
- return bt2.message_iterator._UserComponentInputPortMessageIterator(msg_iter_ptr)
+ _as_self_port_ptr = staticmethod(
+ native_bt.self_component_port_input_as_self_component_port
+ )
class _UserComponentOutputPort(_UserComponentPort, _OutputPort):
- _as_self_port_ptr = staticmethod(native_bt.self_component_port_output_as_self_component_port)
+ _as_self_port_ptr = staticmethod(
+ native_bt.self_component_port_output_as_self_component_port
+ )
_PORT_TYPE_TO_PYCLS = {