return obj
-class _Connection(object._Object):
+class _Connection(object._SharedObject):
@staticmethod
def _downstream_port(ptr):
port_ptr = native_bt.connection_get_downstream_port(ptr)
return False
return self.addr == other.addr
-
-
-class _PrivateConnection(object._PrivateObject, _Connection):
- def create_message_iterator(self, message_types=None):
- msg_types = bt2.message._msg_types_from_msg_classes(message_types)
- status, msg_iter_ptr = native_bt.py3_create_priv_conn_msg_iter(int(self._ptr),
- msg_types)
- _handle_status(status, 'cannot create message iterator object')
- assert(msg_iter_ptr)
- return bt2.message_iterator._PrivateConnectionMessageIterator._create_from_ptr(msg_iter_ptr)
-
- @property
- def is_ended(self):
- return self._is_ended(self._pub_ptr)
-
- @property
- def downstream_port(self):
- return self._downstream_port(self._pub_ptr)
-
- @property
- def upstream_port(self):
- return self._upstream_port(self._pub_ptr)