2 # Copyright (C) 2019 EfficiOS Inc.
4 # This program is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License
6 # as published by the Free Software Foundation; only version 2
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
23 class ConnectionTestCase(unittest
.TestCase
):
24 def test_create(self
):
25 class MyIter(bt2
._UserMessageIterator
):
29 class MySource(bt2
._UserSourceComponent
,
30 message_iterator_class
=MyIter
):
31 def __init__(self
, params
):
32 self
._add
_output
_port
('out')
34 class MySink(bt2
._UserSinkComponent
):
35 def __init__(self
, params
):
36 self
._add
_input
_port
('in')
41 def _graph_is_configured(self
):
45 src
= graph
.add_component(MySource
, 'src')
46 sink
= graph
.add_component(MySink
, 'sink')
47 conn
= graph
.connect_ports(src
.output_ports
['out'],
48 sink
.input_ports
['in'])
49 self
.assertIsInstance(conn
, bt2
._Connection
)
51 def test_downstream_port(self
):
52 class MyIter(bt2
._UserMessageIterator
):
56 class MySource(bt2
._UserSourceComponent
,
57 message_iterator_class
=MyIter
):
58 def __init__(self
, params
):
59 self
._add
_output
_port
('out')
61 class MySink(bt2
._UserSinkComponent
):
62 def __init__(self
, params
):
63 self
._add
_input
_port
('in')
68 def _graph_is_configured(self
):
72 src
= graph
.add_component(MySource
, 'src')
73 sink
= graph
.add_component(MySink
, 'sink')
74 conn
= graph
.connect_ports(src
.output_ports
['out'],
75 sink
.input_ports
['in'])
76 self
.assertEqual(conn
.downstream_port
.addr
, sink
.input_ports
['in'].addr
)
78 def test_upstream_port(self
):
79 class MyIter(bt2
._UserMessageIterator
):
83 class MySource(bt2
._UserSourceComponent
,
84 message_iterator_class
=MyIter
):
85 def __init__(self
, params
):
86 self
._add
_output
_port
('out')
88 class MySink(bt2
._UserSinkComponent
):
89 def __init__(self
, params
):
90 self
._add
_input
_port
('in')
95 def _graph_is_configured(self
):
99 src
= graph
.add_component(MySource
, 'src')
100 sink
= graph
.add_component(MySink
, 'sink')
101 conn
= graph
.connect_ports(src
.output_ports
['out'],
102 sink
.input_ports
['in'])
103 self
.assertEqual(conn
.upstream_port
.addr
, src
.output_ports
['out'].addr
)
This page took 0.034953 seconds and 4 git commands to generate.