Commit | Line | Data |
---|---|---|
0235b0db | 1 | # SPDX-License-Identifier: GPL-2.0-only |
d2d857a8 MJ |
2 | # |
3 | # Copyright (C) 2019 EfficiOS Inc. | |
4 | # | |
d2d857a8 | 5 | |
811644b8 | 6 | import unittest |
5995b304 | 7 | |
811644b8 | 8 | import bt2 |
5813b3a3 | 9 | from bt2 import port as bt2_port |
5995b304 | 10 | from bt2 import connection as bt2_connection |
811644b8 PP |
11 | |
12 | ||
13 | class ConnectionTestCase(unittest.TestCase): | |
14 | def test_create(self): | |
5c61fb9d SM |
15 | class MySource( |
16 | bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator | |
17 | ): | |
59225a3e | 18 | def __init__(self, config, params, obj): |
f5567ea8 | 19 | self._add_output_port("out") |
811644b8 PP |
20 | |
21 | class MySink(bt2._UserSinkComponent): | |
59225a3e | 22 | def __init__(self, config, params, obj): |
f5567ea8 | 23 | self._add_input_port("in") |
811644b8 | 24 | |
6a91742b | 25 | def _user_consume(self): |
811644b8 PP |
26 | raise bt2.Stop |
27 | ||
28 | graph = bt2.Graph() | |
f5567ea8 FD |
29 | src = graph.add_component(MySource, "src") |
30 | sink = graph.add_component(MySink, "sink") | |
31 | conn = graph.connect_ports(src.output_ports["out"], sink.input_ports["in"]) | |
c7e5224b | 32 | self.assertIs(type(conn), bt2_connection._ConnectionConst) |
811644b8 PP |
33 | |
34 | def test_downstream_port(self): | |
5c61fb9d SM |
35 | class MySource( |
36 | bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator | |
37 | ): | |
59225a3e | 38 | def __init__(self, config, params, obj): |
f5567ea8 | 39 | self._add_output_port("out") |
811644b8 PP |
40 | |
41 | class MySink(bt2._UserSinkComponent): | |
59225a3e | 42 | def __init__(self, config, params, obj): |
f5567ea8 | 43 | self._add_input_port("in") |
811644b8 | 44 | |
6a91742b | 45 | def _user_consume(self): |
811644b8 PP |
46 | raise bt2.Stop |
47 | ||
48 | graph = bt2.Graph() | |
f5567ea8 FD |
49 | src = graph.add_component(MySource, "src") |
50 | sink = graph.add_component(MySink, "sink") | |
51 | conn = graph.connect_ports(src.output_ports["out"], sink.input_ports["in"]) | |
52 | self.assertEqual(conn.downstream_port.addr, sink.input_ports["in"].addr) | |
c7e5224b | 53 | self.assertIs(type(conn), bt2_connection._ConnectionConst) |
5813b3a3 | 54 | self.assertIs(type(conn.downstream_port), bt2_port._InputPortConst) |
811644b8 PP |
55 | |
56 | def test_upstream_port(self): | |
5c61fb9d SM |
57 | class MySource( |
58 | bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator | |
59 | ): | |
59225a3e | 60 | def __init__(self, config, params, obj): |
f5567ea8 | 61 | self._add_output_port("out") |
811644b8 PP |
62 | |
63 | class MySink(bt2._UserSinkComponent): | |
59225a3e | 64 | def __init__(self, config, params, obj): |
f5567ea8 | 65 | self._add_input_port("in") |
811644b8 | 66 | |
6a91742b | 67 | def _user_consume(self): |
811644b8 PP |
68 | raise bt2.Stop |
69 | ||
70 | graph = bt2.Graph() | |
f5567ea8 FD |
71 | src = graph.add_component(MySource, "src") |
72 | sink = graph.add_component(MySink, "sink") | |
73 | conn = graph.connect_ports(src.output_ports["out"], sink.input_ports["in"]) | |
74 | self.assertEqual(conn.upstream_port.addr, src.output_ports["out"].addr) | |
5813b3a3 | 75 | self.assertIs(type(conn.upstream_port), bt2_port._OutputPortConst) |
d14ddbba SM |
76 | |
77 | ||
f5567ea8 | 78 | if __name__ == "__main__": |
d14ddbba | 79 | unittest.main() |