edeefd08df4635289c838ec524f52b9b7476e0c4
2 # Copyright (C) 2019 EfficiOS Inc.
4 # This program is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License
6 # as published by the Free Software Foundation; only version 2
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
21 from bt2
import connection
as bt2_connection
24 class ConnectionTestCase(unittest
.TestCase
):
25 def test_create(self
):
26 class MyIter(bt2
._UserMessageIterator
):
30 class MySource(bt2
._UserSourceComponent
, message_iterator_class
=MyIter
):
31 def __init__(self
, params
, obj
):
32 self
._add
_output
_port
('out')
34 class MySink(bt2
._UserSinkComponent
):
35 def __init__(self
, params
, obj
):
36 self
._add
_input
_port
('in')
38 def _user_consume(self
):
42 src
= graph
.add_component(MySource
, 'src')
43 sink
= graph
.add_component(MySink
, 'sink')
44 conn
= graph
.connect_ports(src
.output_ports
['out'], sink
.input_ports
['in'])
45 self
.assertIs(type(conn
), bt2_connection
._ConnectionConst
)
47 def test_downstream_port(self
):
48 class MyIter(bt2
._UserMessageIterator
):
52 class MySource(bt2
._UserSourceComponent
, message_iterator_class
=MyIter
):
53 def __init__(self
, params
, obj
):
54 self
._add
_output
_port
('out')
56 class MySink(bt2
._UserSinkComponent
):
57 def __init__(self
, params
, obj
):
58 self
._add
_input
_port
('in')
60 def _user_consume(self
):
64 src
= graph
.add_component(MySource
, 'src')
65 sink
= graph
.add_component(MySink
, 'sink')
66 conn
= graph
.connect_ports(src
.output_ports
['out'], sink
.input_ports
['in'])
67 self
.assertEqual(conn
.downstream_port
.addr
, sink
.input_ports
['in'].addr
)
68 self
.assertIs(type(conn
), bt2_connection
._ConnectionConst
)
70 def test_upstream_port(self
):
71 class MyIter(bt2
._UserMessageIterator
):
75 class MySource(bt2
._UserSourceComponent
, message_iterator_class
=MyIter
):
76 def __init__(self
, params
, obj
):
77 self
._add
_output
_port
('out')
79 class MySink(bt2
._UserSinkComponent
):
80 def __init__(self
, params
, obj
):
81 self
._add
_input
_port
('in')
83 def _user_consume(self
):
87 src
= graph
.add_component(MySource
, 'src')
88 sink
= graph
.add_component(MySink
, 'sink')
89 conn
= graph
.connect_ports(src
.output_ports
['out'], sink
.input_ports
['in'])
90 self
.assertEqual(conn
.upstream_port
.addr
, src
.output_ports
['out'].addr
)
This page took 0.031968 seconds and 3 git commands to generate.