-@unittest.skip("this is broken")
-class PrivateConnectionMessageIteratorTestCase(unittest.TestCase):
- def test_component(self):
- class MyIter(bt2._UserMessageIterator):
- pass
-
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
- self._add_output_port('out')
-
- class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
- self._add_input_port('in')
-
- def _consume(self):
- next(self._msg_iter)
-
- def _port_connected(self, port, other_port):
- nonlocal upstream_comp
- self._msg_iter = port.connection.create_message_iterator()
- upstream_comp = self._msg_iter.component
-
- upstream_comp = None
- graph = bt2.Graph()
- src_comp = graph.add_component(MySource, 'src')
- sink_comp = graph.add_component(MySink, 'sink')
- graph.connect_ports(src_comp.output_ports['out'],
- sink_comp.input_ports['in'])
- self.assertEqual(src_comp, upstream_comp)
- del upstream_comp
-
-
-@unittest.skip("this is broken")