# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
-from bt2 import native_bt, object, utils
+from bt2 import native_bt, utils
import bt2.message_iterator
-import collections.abc
import bt2.port
-import copy
import bt2
-def _handle_status(status, gen_error_msg):
- if status == native_bt.CONNECTION_STATUS_GRAPH_IS_CANCELED:
- raise bt2.GraphCanceled
- elif status == native_bt.CONNECTION_STATUS_IS_ENDED:
- raise bt2.ConnectionEnded
- elif status < 0:
- raise bt2.Error(gen_error_msg)
-
-
-def _create_private_from_ptr(ptr):
- obj = _PrivateConnection._create_from_ptr(ptr)
- obj._pub_ptr = native_bt.connection_from_private(ptr)
- assert(obj._pub_ptr)
- return obj
-
-
-class _Connection(object._SharedObject):
- @staticmethod
- def _downstream_port(ptr):
- port_ptr = native_bt.connection_get_downstream_port(ptr)
- utils._handle_ptr(port_ptr, "cannot get connection object's downstream port object")
- return bt2.port._create_from_ptr(port_ptr)
-
- @staticmethod
- def _upstream_port(ptr):
- port_ptr = native_bt.connection_get_upstream_port(ptr)
- utils._handle_ptr(port_ptr, "cannot get connection object's upstream port object")
- return bt2.port._create_from_ptr(port_ptr)
+class _Connection(bt2.object._SharedObject):
+ _get_ref = staticmethod(native_bt.connection_get_ref)
+ _put_ref = staticmethod(native_bt.connection_put_ref)
@property
def downstream_port(self):
- return self._downstream_port(self._ptr)
+ port_ptr = native_bt.connection_borrow_downstream_port_const(self._ptr)
+ utils._handle_ptr(port_ptr, "cannot get connection object's downstream port object")
+ return bt2.port._create_from_ptr_and_get_ref(port_ptr, native_bt.PORT_TYPE_INPUT)
@property
def upstream_port(self):
- return self._upstream_port(self._ptr)
-
- @staticmethod
- def _is_ended(ptr):
- return native_bt.connection_is_ended(ptr) == 1
-
- @property
- def is_ended(self):
- return self._is_ended(self._ptr)
-
- def __eq__(self, other):
- if type(other) not in (_Connection, _PrivateConnection):
- return False
-
- return self.addr == other.addr
+ port_ptr = native_bt.connection_borrow_upstream_port_const(self._ptr)
+ utils._handle_ptr(port_ptr, "cannot get connection object's upstream port object")
+ return bt2.port._create_from_ptr_and_get_ref(port_ptr, native_bt.PORT_TYPE_OUTPUT)
-from bt2 import value
import unittest
-import copy
import bt2
-@unittest.skip("this is broken")
class ConnectionTestCase(unittest.TestCase):
def test_create(self):
class MyIter(bt2._UserMessageIterator):
conn = graph.connect_ports(src.output_ports['out'],
sink.input_ports['in'])
self.assertIsInstance(conn, bt2._Connection)
- self.assertNotIsInstance(conn, bt2._PrivateConnection)
-
- def test_is_ended_false(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
- self._add_output_port('out')
-
- class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
- self._add_input_port('in')
-
- def _consume(self):
- raise bt2.Stop
-
- graph = bt2.Graph()
- src = graph.add_component(MySource, 'src')
- sink = graph.add_component(MySink, 'sink')
- conn = graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
- self.assertFalse(conn.is_ended)
-
- def test_is_ended_true(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
- self._add_output_port('out')
-
- class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
- self._add_input_port('in')
-
- def _consume(self):
- raise bt2.Stop
-
- graph = bt2.Graph()
- src = graph.add_component(MySource, 'src')
- sink = graph.add_component(MySink, 'sink')
- conn = graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
- src.output_ports['out'].disconnect()
- self.assertTrue(conn.is_ended)
def test_downstream_port(self):
class MyIter(bt2._UserMessageIterator):
conn = graph.connect_ports(src.output_ports['out'],
sink.input_ports['in'])
self.assertEqual(conn.upstream_port.addr, src.output_ports['out'].addr)
-
- def test_eq(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
- self._add_output_port('out')
-
- class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
- self._add_input_port('in')
-
- def _consume(self):
- raise bt2.Stop
-
- graph = bt2.Graph()
- src = graph.add_component(MySource, 'src')
- sink = graph.add_component(MySink, 'sink')
- conn = graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
- self.assertEqual(conn, conn)
-
- def test_eq_invalid(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
- self._add_output_port('out')
-
- class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
- self._add_input_port('in')
-
- def _consume(self):
- raise bt2.Stop
-
- graph = bt2.Graph()
- src = graph.add_component(MySource, 'src')
- sink = graph.add_component(MySink, 'sink')
- conn = graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
- self.assertNotEqual(conn, 23)
-
-
-@unittest.skip("this is broken")
-class PrivateConnectionTestCase(unittest.TestCase):
- def test_create(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
- self._add_output_port('out')
-
- class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
- self._add_input_port('in')
-
- def _consume(self):
- raise bt2.Stop
-
- def _port_connected(self, port, other_port):
- nonlocal priv_conn
- priv_conn = port.connection
-
- priv_conn = None
- graph = bt2.Graph()
- src = graph.add_component(MySource, 'src')
- sink = graph.add_component(MySink, 'sink')
- conn = graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
- self.assertIsInstance(priv_conn, bt2._PrivateConnection)
- self.assertEqual(conn, priv_conn)
- del priv_conn
-
- def test_is_ended_false(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
- self._add_output_port('out')
-
- class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
- self._add_input_port('in')
-
- def _consume(self):
- raise bt2.Stop
-
- def _port_connected(self, port, other_port):
- nonlocal priv_conn
- priv_conn = port.connection
-
- priv_conn = None
- graph = bt2.Graph()
- src = graph.add_component(MySource, 'src')
- sink = graph.add_component(MySink, 'sink')
- conn = graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
- self.assertFalse(priv_conn.is_ended)
- del priv_conn
-
- def test_is_ended_true(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
- self._add_output_port('out')
-
- class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
- self._add_input_port('in')
-
- def _consume(self):
- raise bt2.Stop
-
- def _port_connected(self, port, other_port):
- nonlocal priv_conn
- priv_conn = port.connection
-
- priv_conn = None
- graph = bt2.Graph()
- src = graph.add_component(MySource, 'src')
- sink = graph.add_component(MySink, 'sink')
- conn = graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
- sink.input_ports['in'].disconnect()
- self.assertTrue(priv_conn.is_ended)
- del priv_conn
-
- def test_downstream_port(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
- self._add_output_port('out')
-
- class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
- self._add_input_port('in')
-
- def _consume(self):
- raise bt2.Stop
-
- def _port_connected(self, port, other_port):
- nonlocal priv_port
- priv_conn = port.connection
- priv_port = priv_conn.downstream_port
-
- priv_port = None
- graph = bt2.Graph()
- src = graph.add_component(MySource, 'src')
- sink = graph.add_component(MySink, 'sink')
- conn = graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
- self.assertEqual(priv_port.addr, sink.input_ports['in'].addr)
- del priv_port
-
- def test_upstream_port(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
- self._add_output_port('out')
-
- class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
- self._add_input_port('in')
-
- def _consume(self):
- raise bt2.Stop
-
- def _port_connected(self, port, other_port):
- nonlocal priv_port
- priv_conn = port.connection
- priv_port = priv_conn.upstream_port
-
- priv_port = None
- graph = bt2.Graph()
- src = graph.add_component(MySource, 'src')
- sink = graph.add_component(MySink, 'sink')
- conn = graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
- self.assertEqual(priv_port.addr, src.output_ports['out'].addr)
- del priv_port
-
- def test_eq(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
- self._add_output_port('out')
-
- class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
- self._add_input_port('in')
-
- def _consume(self):
- raise bt2.Stop
-
- def _port_connected(self, port, other_port):
- nonlocal priv_conn
- priv_conn = port.connection
-
- priv_conn = None
- graph = bt2.Graph()
- src = graph.add_component(MySource, 'src')
- sink = graph.add_component(MySink, 'sink')
- conn = graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
- self.assertEqual(priv_conn, conn)
- del priv_conn
-
- def test_eq_invalid(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
- self._add_output_port('out')
-
- class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
- self._add_input_port('in')
-
- def _consume(self):
- raise bt2.Stop
-
- def _port_connected(self, port, other_port):
- nonlocal priv_conn
- priv_conn = port.connection
-
- priv_conn = None
- graph = bt2.Graph()
- src = graph.add_component(MySource, 'src')
- sink = graph.add_component(MySink, 'sink')
- conn = graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
- self.assertNotEqual(priv_conn, 23)
- del priv_conn