- def test_run_no_sink(self):
- class MyIter(bt2._UserMessageIterator):
- pass
-
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
- self._add_output_port('out')
-
- class MyFilter(bt2._UserFilterComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
- self._add_output_port('out')
- self._add_input_port('in')
-
- src = self._graph.add_component(MySource, 'src')
- flt = self._graph.add_component(MyFilter, 'flt')
- conn = self._graph.connect_ports(src.output_ports['out'],
- flt.input_ports['in'])
-
- with self.assertRaises(bt2.NoSinkComponent):
- self._graph.run()
-