bt2: Rename files to use singular form
[babeltrace.git] / bindings / python / bt2 / bt2 / connection.py
1 # The MIT License (MIT)
2 #
3 # Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com>
4 #
5 # Permission is hereby granted, free of charge, to any person obtaining a copy
6 # of this software and associated documentation files (the "Software"), to deal
7 # in the Software without restriction, including without limitation the rights
8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 # copies of the Software, and to permit persons to whom the Software is
10 # furnished to do so, subject to the following conditions:
11 #
12 # The above copyright notice and this permission notice shall be included in
13 # all copies or substantial portions of the Software.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 # THE SOFTWARE.
22
23 from bt2 import native_bt, object, utils
24 import bt2.message_iterator
25 import collections.abc
26 import bt2.port
27 import copy
28 import bt2
29
30
31 def _handle_status(status, gen_error_msg):
32 if status == native_bt.CONNECTION_STATUS_GRAPH_IS_CANCELED:
33 raise bt2.GraphCanceled
34 elif status == native_bt.CONNECTION_STATUS_IS_ENDED:
35 raise bt2.ConnectionEnded
36 elif status < 0:
37 raise bt2.Error(gen_error_msg)
38
39
40 def _create_private_from_ptr(ptr):
41 obj = _PrivateConnection._create_from_ptr(ptr)
42 obj._pub_ptr = native_bt.connection_from_private(ptr)
43 assert(obj._pub_ptr)
44 return obj
45
46
47 class _Connection(object._Object):
48 @staticmethod
49 def _downstream_port(ptr):
50 port_ptr = native_bt.connection_get_downstream_port(ptr)
51 utils._handle_ptr(port_ptr, "cannot get connection object's downstream port object")
52 return bt2.port._create_from_ptr(port_ptr)
53
54 @staticmethod
55 def _upstream_port(ptr):
56 port_ptr = native_bt.connection_get_upstream_port(ptr)
57 utils._handle_ptr(port_ptr, "cannot get connection object's upstream port object")
58 return bt2.port._create_from_ptr(port_ptr)
59
60 @property
61 def downstream_port(self):
62 return self._downstream_port(self._ptr)
63
64 @property
65 def upstream_port(self):
66 return self._upstream_port(self._ptr)
67
68 @staticmethod
69 def _is_ended(ptr):
70 return native_bt.connection_is_ended(ptr) == 1
71
72 @property
73 def is_ended(self):
74 return self._is_ended(self._ptr)
75
76 def __eq__(self, other):
77 if type(other) not in (_Connection, _PrivateConnection):
78 return False
79
80 return self.addr == other.addr
81
82
83 class _PrivateConnection(object._PrivateObject, _Connection):
84 def create_message_iterator(self, message_types=None):
85 msg_types = bt2.message._msg_types_from_msg_classes(message_types)
86 status, msg_iter_ptr = native_bt.py3_create_priv_conn_msg_iter(int(self._ptr),
87 msg_types)
88 _handle_status(status, 'cannot create message iterator object')
89 assert(msg_iter_ptr)
90 return bt2.message_iterator._PrivateConnectionMessageIterator._create_from_ptr(msg_iter_ptr)
91
92 @property
93 def is_ended(self):
94 return self._is_ended(self._pub_ptr)
95
96 @property
97 def downstream_port(self):
98 return self._downstream_port(self._pub_ptr)
99
100 @property
101 def upstream_port(self):
102 return self._upstream_port(self._pub_ptr)
This page took 0.037354 seconds and 4 git commands to generate.