Fix: add missing void param to bt_clock_class_priority_map_create
[babeltrace.git] / bindings / python / bt2 / connection.py
1 # The MIT License (MIT)
2 #
3 # Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com>
4 #
5 # Permission is hereby granted, free of charge, to any person obtaining a copy
6 # of this software and associated documentation files (the "Software"), to deal
7 # in the Software without restriction, including without limitation the rights
8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 # copies of the Software, and to permit persons to whom the Software is
10 # furnished to do so, subject to the following conditions:
11 #
12 # The above copyright notice and this permission notice shall be included in
13 # all copies or substantial portions of the Software.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 # THE SOFTWARE.
22
23 from bt2 import native_bt, object, utils
24 import bt2.notification_iterator
25 import collections.abc
26 import bt2.port
27 import copy
28 import bt2
29
30
31 def _handle_status(status, gen_error_msg):
32 if status == native_bt.CONNECTION_STATUS_GRAPH_IS_CANCELED:
33 raise bt2.GraphCanceled
34 elif status == native_bt.CONNECTION_STATUS_IS_ENDED:
35 raise bt2.ConnectionEnded
36 elif status < 0:
37 raise bt2.Error(gen_error_msg)
38
39
40 def _create_private_from_ptr(ptr):
41 obj = _PrivateConnection._create_from_ptr(ptr)
42 obj._pub_ptr = native_bt.connection_from_private_connection(ptr)
43 assert(obj._pub_ptr)
44 return obj
45
46
47 class _Connection(object._Object):
48 @staticmethod
49 def _downstream_port(ptr):
50 port_ptr = native_bt.connection_get_downstream_port(ptr)
51 utils._handle_ptr(port_ptr, "cannot get connection object's downstream port object")
52 return bt2.port._create_from_ptr(port_ptr)
53
54 @staticmethod
55 def _upstream_port(ptr):
56 port_ptr = native_bt.connection_get_upstream_port(ptr)
57 utils._handle_ptr(port_ptr, "cannot get connection object's upstream port object")
58 return bt2.port._create_from_ptr(port_ptr)
59
60 @property
61 def downstream_port(self):
62 return self._downstream_port(self._ptr)
63
64 @property
65 def upstream_port(self):
66 return self._upstream_port(self._ptr)
67
68 @staticmethod
69 def _is_ended(ptr):
70 return native_bt.connection_is_ended(ptr) == 1
71
72 @property
73 def is_ended(self):
74 return self._is_ended(self._ptr)
75
76 def __eq__(self, other):
77 if type(other) not in (_Connection, _PrivateConnection):
78 return False
79
80 return self.addr == other.addr
81
82
83 class _PrivateConnection(object._PrivateObject, _Connection):
84 def create_notification_iterator(self, notification_types=None):
85 if notification_types is None:
86 notif_types = None
87 else:
88 for notif_cls in notification_types:
89 if notif_cls not in bt2.notification._NOTIF_TYPE_TO_CLS.values():
90 raise ValueError("'{}' is not a notification class".format(notif_cls))
91
92 notif_types = [notif_cls._TYPE for notif_cls in notification_types]
93
94 status, notif_iter_ptr = native_bt.py3_create_notif_iter(int(self._ptr),
95 notif_types)
96 _handle_status(status, 'cannot create notification iterator object')
97 assert(notif_iter_ptr)
98 return bt2.notification_iterator._GenericNotificationIterator._create_from_ptr(notif_iter_ptr)
99
100 @property
101 def is_ended(self):
102 return self._is_ended(self._pub_ptr)
103
104 @property
105 def downstream_port(self):
106 return self._downstream_port(self._pub_ptr)
107
108 @property
109 def upstream_port(self):
110 return self._upstream_port(self._pub_ptr)
This page took 0.030846 seconds and 4 git commands to generate.