bt2c::Logger: remove unused cLevel() method
[babeltrace.git] / tests / bindings / python / bt2 / test_connection.py
CommitLineData
0235b0db 1# SPDX-License-Identifier: GPL-2.0-only
d2d857a8
MJ
2#
3# Copyright (C) 2019 EfficiOS Inc.
4#
d2d857a8 5
811644b8 6import unittest
5995b304 7
811644b8 8import bt2
5813b3a3 9from bt2 import port as bt2_port
5995b304 10from bt2 import connection as bt2_connection
811644b8
PP
11
12
13class ConnectionTestCase(unittest.TestCase):
14 def test_create(self):
5c61fb9d
SM
15 class MySource(
16 bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
17 ):
59225a3e 18 def __init__(self, config, params, obj):
f5567ea8 19 self._add_output_port("out")
811644b8
PP
20
21 class MySink(bt2._UserSinkComponent):
59225a3e 22 def __init__(self, config, params, obj):
f5567ea8 23 self._add_input_port("in")
811644b8 24
6a91742b 25 def _user_consume(self):
811644b8
PP
26 raise bt2.Stop
27
28 graph = bt2.Graph()
f5567ea8
FD
29 src = graph.add_component(MySource, "src")
30 sink = graph.add_component(MySink, "sink")
31 conn = graph.connect_ports(src.output_ports["out"], sink.input_ports["in"])
c7e5224b 32 self.assertIs(type(conn), bt2_connection._ConnectionConst)
811644b8
PP
33
34 def test_downstream_port(self):
5c61fb9d
SM
35 class MySource(
36 bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
37 ):
59225a3e 38 def __init__(self, config, params, obj):
f5567ea8 39 self._add_output_port("out")
811644b8
PP
40
41 class MySink(bt2._UserSinkComponent):
59225a3e 42 def __init__(self, config, params, obj):
f5567ea8 43 self._add_input_port("in")
811644b8 44
6a91742b 45 def _user_consume(self):
811644b8
PP
46 raise bt2.Stop
47
48 graph = bt2.Graph()
f5567ea8
FD
49 src = graph.add_component(MySource, "src")
50 sink = graph.add_component(MySink, "sink")
51 conn = graph.connect_ports(src.output_ports["out"], sink.input_ports["in"])
52 self.assertEqual(conn.downstream_port.addr, sink.input_ports["in"].addr)
c7e5224b 53 self.assertIs(type(conn), bt2_connection._ConnectionConst)
5813b3a3 54 self.assertIs(type(conn.downstream_port), bt2_port._InputPortConst)
811644b8
PP
55
56 def test_upstream_port(self):
5c61fb9d
SM
57 class MySource(
58 bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
59 ):
59225a3e 60 def __init__(self, config, params, obj):
f5567ea8 61 self._add_output_port("out")
811644b8
PP
62
63 class MySink(bt2._UserSinkComponent):
59225a3e 64 def __init__(self, config, params, obj):
f5567ea8 65 self._add_input_port("in")
811644b8 66
6a91742b 67 def _user_consume(self):
811644b8
PP
68 raise bt2.Stop
69
70 graph = bt2.Graph()
f5567ea8
FD
71 src = graph.add_component(MySource, "src")
72 sink = graph.add_component(MySink, "sink")
73 conn = graph.connect_ports(src.output_ports["out"], sink.input_ports["in"])
74 self.assertEqual(conn.upstream_port.addr, src.output_ports["out"].addr)
5813b3a3 75 self.assertIs(type(conn.upstream_port), bt2_port._OutputPortConst)
d14ddbba
SM
76
77
f5567ea8 78if __name__ == "__main__":
d14ddbba 79 unittest.main()
This page took 0.075907 seconds and 5 git commands to generate.