bt2: make `bt2._OverflowError` inherit `bt2._Error`
[babeltrace.git] / tests / bindings / python / bt2 / test_connection.py
CommitLineData
d2d857a8
MJ
1#
2# Copyright (C) 2019 EfficiOS Inc.
3#
4# This program is free software; you can redistribute it and/or
5# modify it under the terms of the GNU General Public License
6# as published by the Free Software Foundation; only version 2
7# of the License.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License
15# along with this program; if not, write to the Free Software
16# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17#
18
811644b8 19import unittest
811644b8
PP
20import bt2
21
22
23class ConnectionTestCase(unittest.TestCase):
24 def test_create(self):
5602ef81 25 class MyIter(bt2._UserMessageIterator):
811644b8
PP
26 def __next__(self):
27 raise bt2.Stop
28
cfbd7cf3 29 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
811644b8
PP
30 def __init__(self, params):
31 self._add_output_port('out')
32
33 class MySink(bt2._UserSinkComponent):
34 def __init__(self, params):
35 self._add_input_port('in')
36
37 def _consume(self):
38 raise bt2.Stop
39
a01b452b
SM
40 def _graph_is_configured(self):
41 pass
42
811644b8
PP
43 graph = bt2.Graph()
44 src = graph.add_component(MySource, 'src')
45 sink = graph.add_component(MySink, 'sink')
cfbd7cf3 46 conn = graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
811644b8
PP
47
48 def test_downstream_port(self):
5602ef81 49 class MyIter(bt2._UserMessageIterator):
811644b8
PP
50 def __next__(self):
51 raise bt2.Stop
52
cfbd7cf3 53 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
811644b8
PP
54 def __init__(self, params):
55 self._add_output_port('out')
56
57 class MySink(bt2._UserSinkComponent):
58 def __init__(self, params):
59 self._add_input_port('in')
60
61 def _consume(self):
62 raise bt2.Stop
63
a01b452b
SM
64 def _graph_is_configured(self):
65 pass
66
811644b8
PP
67 graph = bt2.Graph()
68 src = graph.add_component(MySource, 'src')
69 sink = graph.add_component(MySink, 'sink')
cfbd7cf3 70 conn = graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
811644b8
PP
71 self.assertEqual(conn.downstream_port.addr, sink.input_ports['in'].addr)
72
73 def test_upstream_port(self):
5602ef81 74 class MyIter(bt2._UserMessageIterator):
811644b8
PP
75 def __next__(self):
76 raise bt2.Stop
77
cfbd7cf3 78 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
811644b8
PP
79 def __init__(self, params):
80 self._add_output_port('out')
81
82 class MySink(bt2._UserSinkComponent):
83 def __init__(self, params):
84 self._add_input_port('in')
85
86 def _consume(self):
87 raise bt2.Stop
88
a01b452b
SM
89 def _graph_is_configured(self):
90 pass
91
811644b8
PP
92 graph = bt2.Graph()
93 src = graph.add_component(MySource, 'src')
94 sink = graph.add_component(MySink, 'sink')
cfbd7cf3 95 conn = graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
811644b8 96 self.assertEqual(conn.upstream_port.addr, src.output_ports['out'].addr)
This page took 0.039844 seconds and 4 git commands to generate.