import collections.abc
import bt2.component
import bt2.connection
+import bt2.notification_iterator
+import bt2.notification
import copy
import bt2
def _create_private_from_ptr(ptr):
- pub_ptr = native_bt.port_from_private_port(ptr)
+ pub_ptr = native_bt.port_from_private(ptr)
utils._handle_ptr(pub_ptr, 'cannot get port object from private port object')
port_type = native_bt.port_get_type(pub_ptr)
assert(port_type == native_bt.PORT_TYPE_INPUT or port_type == native_bt.PORT_TYPE_OUTPUT)
class _OutputPort(_Port):
- pass
+ def create_notification_iterator(self, notification_types=None,
+ colander_component_name=None):
+ notif_types = bt2.notification._notif_types_from_notif_classes(notification_types)
+
+ if colander_component_name is not None:
+ utils._check_str(colander_component_name)
+
+ notif_iter_ptr = native_bt.py3_create_output_port_notif_iter(int(self._ptr),
+ colander_component_name,
+ notif_types)
+
+ if notif_iter_ptr is None:
+ raise bt2.CreationError('cannot create output port notification iterator')
+
+ return bt2.notification_iterator._OutputPortNotificationIterator._create_from_ptr(notif_iter_ptr)
class _PrivatePort(object._PrivateObject, _Port):
if comp_ptr is None:
return
- pub_comp_ptr = native_bt.component_from_private_component(comp_ptr)
+ pub_comp_ptr = native_bt.component_from_private(comp_ptr)
assert(pub_comp_ptr)
comp = bt2.component._create_generic_component_from_ptr(pub_comp_ptr)
native_bt.put(comp_ptr)