X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;ds=sidebyside;f=bindings%2Fpython%2Fbt2%2Fconnection.py;fp=bindings%2Fpython%2Fbt2%2Fconnection.py;h=0000000000000000000000000000000000000000;hb=1b8fb86234d51aff255b8e97435d4dbb3316eaec;hp=09092af087c1505396704673d56d11261ae32434;hpb=375d5f6a40f8021639019995c8dc8e6f190b7b55;p=babeltrace.git diff --git a/bindings/python/bt2/connection.py b/bindings/python/bt2/connection.py deleted file mode 100644 index 09092af0..00000000 --- a/bindings/python/bt2/connection.py +++ /dev/null @@ -1,110 +0,0 @@ -# The MIT License (MIT) -# -# Copyright (c) 2017 Philippe Proulx -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -from bt2 import native_bt, object, utils -import bt2.notification_iterator -import collections.abc -import bt2.port -import copy -import bt2 - - -def _handle_status(status, gen_error_msg): - if status == native_bt.CONNECTION_STATUS_GRAPH_IS_CANCELED: - raise bt2.GraphCanceled - elif status == native_bt.CONNECTION_STATUS_IS_ENDED: - raise bt2.ConnectionEnded - elif status < 0: - raise bt2.Error(gen_error_msg) - - -def _create_private_from_ptr(ptr): - obj = _PrivateConnection._create_from_ptr(ptr) - obj._pub_ptr = native_bt.connection_from_private_connection(ptr) - assert(obj._pub_ptr) - return obj - - -class _Connection(object._Object): - @staticmethod - def _downstream_port(ptr): - port_ptr = native_bt.connection_get_downstream_port(ptr) - utils._handle_ptr(port_ptr, "cannot get connection object's downstream port object") - return bt2.port._create_from_ptr(port_ptr) - - @staticmethod - def _upstream_port(ptr): - port_ptr = native_bt.connection_get_upstream_port(ptr) - utils._handle_ptr(port_ptr, "cannot get connection object's upstream port object") - return bt2.port._create_from_ptr(port_ptr) - - @property - def downstream_port(self): - return self._downstream_port(self._ptr) - - @property - def upstream_port(self): - return self._upstream_port(self._ptr) - - @staticmethod - def _is_ended(ptr): - return native_bt.connection_is_ended(ptr) == 1 - - @property - def is_ended(self): - return self._is_ended(self._ptr) - - def __eq__(self, other): - if type(other) not in (_Connection, _PrivateConnection): - return False - - return self.addr == other.addr - - -class _PrivateConnection(object._PrivateObject, _Connection): - def create_notification_iterator(self, notification_types=None): - if notification_types is None: - notif_types = None - else: - for notif_cls in notification_types: - if notif_cls not in bt2.notification._NOTIF_TYPE_TO_CLS.values(): - raise ValueError("'{}' is not a notification class".format(notif_cls)) - - notif_types = [notif_cls._TYPE for notif_cls in notification_types] - - status, notif_iter_ptr = native_bt.py3_create_notif_iter(int(self._ptr), - notif_types) - _handle_status(status, 'cannot create notification iterator object') - assert(notif_iter_ptr) - return bt2.notification_iterator._GenericNotificationIterator._create_from_ptr(notif_iter_ptr) - - @property - def is_ended(self): - return self._is_ended(self._pub_ptr) - - @property - def downstream_port(self): - return self._downstream_port(self._pub_ptr) - - @property - def upstream_port(self): - return self._upstream_port(self._pub_ptr)