X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=tests%2Fbindings%2Fpython%2Fbt2%2Ftest_graph.py;h=bda08de2ec874aecb0c016c7e171fd14bf73e3e9;hb=6a91742b3a25285222551341c8a134b4b2b5aba9;hp=9021821a11c22d54cb18c43fd2a8d07c289946b3;hpb=b4f458518ad1872074286cdd6414b86ef1fb9fbd;p=babeltrace.git diff --git a/tests/bindings/python/bt2/test_graph.py b/tests/bindings/python/bt2/test_graph.py index 9021821a..bda08de2 100644 --- a/tests/bindings/python/bt2/test_graph.py +++ b/tests/bindings/python/bt2/test_graph.py @@ -1,11 +1,52 @@ -from bt2 import values +# +# Copyright (C) 2019 EfficiOS Inc. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; only version 2 +# of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# + +from bt2 import value import collections import unittest import copy import bt2 -@unittest.skip("this is broken") +class _MyIter(bt2._UserMessageIterator): + def __init__(self, self_output_port): + self._build_meta() + self._at = 0 + + def _build_meta(self): + self._tc = self._component._create_trace_class() + self._t = self._tc() + self._sc = self._tc.create_stream_class(supports_packets=True) + self._ec = self._sc.create_event_class(name='salut') + self._my_int_ft = self._tc.create_signed_integer_field_class(32) + payload_ft = self._tc.create_structure_field_class() + payload_ft += [('my_int', self._my_int_ft)] + self._ec.payload_field_type = payload_ft + self._stream = self._t.create_stream(self._sc) + self._packet = self._stream.create_packet() + + def _create_event(self, value): + ev = self._ec() + ev.payload_field['my_int'] = value + ev.packet = self._packet + return ev + + class GraphTestCase(unittest.TestCase): def setUp(self): self._graph = bt2.Graph() @@ -18,7 +59,7 @@ class GraphTestCase(unittest.TestCase): def test_add_component_user_cls(self): class MySink(bt2._UserSinkComponent): - def _consume(self): + def _user_consume(self): pass comp = self._graph.add_component(MySink, 'salut') @@ -26,12 +67,12 @@ class GraphTestCase(unittest.TestCase): def test_add_component_gen_cls(self): class MySink(bt2._UserSinkComponent): - def _consume(self): + def _user_consume(self): pass comp = self._graph.add_component(MySink, 'salut') - assert(comp) - comp2 = self._graph.add_component(comp.component_class, 'salut2') + assert comp + comp2 = self._graph.add_component(comp.cls, 'salut2') self.assertEqual(comp2.name, 'salut2') def test_add_component_params(self): @@ -42,7 +83,7 @@ class GraphTestCase(unittest.TestCase): nonlocal comp_params comp_params = params - def _consume(self): + def _user_consume(self): pass params = {'hello': 23, 'path': '/path/to/stuff'} @@ -54,39 +95,38 @@ class GraphTestCase(unittest.TestCase): with self.assertRaises(TypeError): self._graph.add_component(int, 'salut') - def test_connect_ports(self): - class MyIter(bt2._UserMessageIterator): - def __next__(self): - raise bt2.Stop + def test_add_component_invalid_logging_level_type(self): + class MySink(bt2._UserSinkComponent): + def _user_consume(self): + pass - class MySource(bt2._UserSourceComponent, - message_iterator_class=MyIter): - def __init__(self, params): - self._add_output_port('out') + with self.assertRaises(TypeError): + self._graph.add_component(MySink, 'salut', logging_level='yo') + def test_add_component_invalid_logging_level_value(self): class MySink(bt2._UserSinkComponent): - def __init__(self, params): - self._add_input_port('in') + def _user_consume(self): + pass - def _consume(self): - raise bt2.Stop + with self.assertRaises(ValueError): + self._graph.add_component(MySink, 'salut', logging_level=12345) - src = self._graph.add_component(MySource, 'src') - sink = self._graph.add_component(MySink, 'sink') - conn = self._graph.connect_ports(src.output_ports['out'], - sink.input_ports['in']) - self.assertTrue(src.output_ports['out'].is_connected) - self.assertTrue(sink.input_ports['in'].is_connected) - self.assertEqual(src.output_ports['out'].connection, conn) - self.assertEqual(sink.input_ports['in'].connection, conn) + def test_add_component_logging_level(self): + class MySink(bt2._UserSinkComponent): + def _user_consume(self): + pass - def test_connect_ports_invalid_direction(self): + comp = self._graph.add_component( + MySink, 'salut', logging_level=bt2.LoggingLevel.DEBUG + ) + self.assertEqual(comp.logging_level, bt2.LoggingLevel.DEBUG) + + def test_connect_ports(self): class MyIter(bt2._UserMessageIterator): def __next__(self): raise bt2.Stop - class MySource(bt2._UserSourceComponent, - message_iterator_class=MyIter): + class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter): def __init__(self, params): self._add_output_port('out') @@ -94,23 +134,26 @@ class GraphTestCase(unittest.TestCase): def __init__(self, params): self._add_input_port('in') - def _consume(self): + def _user_consume(self): raise bt2.Stop src = self._graph.add_component(MySource, 'src') sink = self._graph.add_component(MySink, 'sink') - with self.assertRaises(TypeError): - conn = self._graph.connect_ports(sink.input_ports['in'], - src.output_ports['out']) + conn = self._graph.connect_ports( + src.output_ports['out'], sink.input_ports['in'] + ) + self.assertTrue(src.output_ports['out'].is_connected) + self.assertTrue(sink.input_ports['in'].is_connected) + self.assertEqual(src.output_ports['out'].connection.addr, conn.addr) + self.assertEqual(sink.input_ports['in'].connection.addr, conn.addr) - def test_connect_ports_refused(self): + def test_connect_ports_invalid_direction(self): class MyIter(bt2._UserMessageIterator): def __next__(self): raise bt2.Stop - class MySource(bt2._UserSourceComponent, - message_iterator_class=MyIter): + class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter): def __init__(self, params): self._add_output_port('out') @@ -118,26 +161,23 @@ class GraphTestCase(unittest.TestCase): def __init__(self, params): self._add_input_port('in') - def _consume(self): + def _user_consume(self): raise bt2.Stop - def _accept_port_connection(self, port, other_port): - return False - src = self._graph.add_component(MySource, 'src') sink = self._graph.add_component(MySink, 'sink') - with self.assertRaises(bt2.PortConnectionRefused): - conn = self._graph.connect_ports(src.output_ports['out'], - sink.input_ports['in']) + with self.assertRaises(TypeError): + conn = self._graph.connect_ports( + sink.input_ports['in'], src.output_ports['out'] + ) - def test_connect_ports_canceled(self): + def test_add_interrupter(self): class MyIter(bt2._UserMessageIterator): def __next__(self): - raise bt2.Stop + raise TypeError - class MySource(bt2._UserSourceComponent, - message_iterator_class=MyIter): + class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter): def __init__(self, params): self._add_output_port('out') @@ -145,58 +185,44 @@ class GraphTestCase(unittest.TestCase): def __init__(self, params): self._add_input_port('in') - def _consume(self): - raise bt2.Stop + def _user_consume(self): + next(self._msg_iter) + def _user_graph_is_configured(self): + self._msg_iter = self._create_input_port_message_iterator( + self._input_ports['in'] + ) + + # add two interrupters, set one of them + interrupter1 = bt2.Interrupter() + interrupter2 = bt2.Interrupter() + self._graph.add_interrupter(interrupter1) src = self._graph.add_component(MySource, 'src') sink = self._graph.add_component(MySink, 'sink') - self._graph.cancel() - - with self.assertRaises(bt2.GraphCanceled): - conn = self._graph.connect_ports(src.output_ports['out'], - sink.input_ports['in']) + self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in']) + self._graph.add_interrupter(interrupter2) - def test_connect_ports_cannot_consume_accept(self): - class MyIter(bt2._UserMessageIterator): - def __next__(self): - raise bt2.Stop - - class MySource(bt2._UserSourceComponent, - message_iterator_class=MyIter): - def __init__(self, params): - self._add_output_port('out') - - class MySink(bt2._UserSinkComponent): - def __init__(self, params): - self._add_input_port('in') - - def _consume(self): - raise bt2.Stop + with self.assertRaises(bt2._Error): + self._graph.run() - def _accept_port_connection(self, port, other_port): - nonlocal exc + interrupter2.set() - try: - self.graph.run() - except Exception as e: - exc = e + with self.assertRaises(bt2.TryAgain): + self._graph.run() - return True + interrupter2.reset() - exc = None - src = self._graph.add_component(MySource, 'src') - sink = self._graph.add_component(MySink, 'sink') - self._graph.connect_ports(src.output_ports['out'], - sink.input_ports['in']) - self.assertIs(type(exc), bt2.CannotConsumeGraph) + with self.assertRaises(bt2._Error): + self._graph.run() - def test_connect_ports_cannot_consume_connected(self): - class MyIter(bt2._UserMessageIterator): + # Test that Graph.run() raises bt2.Interrupted if the graph gets + # interrupted during execution. + def test_interrupt_while_running(self): + class MyIter(_MyIter): def __next__(self): - raise bt2.Stop + return self._create_stream_beginning_message(self._stream) - class MySource(bt2._UserSourceComponent, - message_iterator_class=MyIter): + class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter): def __init__(self, params): self._add_output_port('out') @@ -204,372 +230,344 @@ class GraphTestCase(unittest.TestCase): def __init__(self, params): self._add_input_port('in') - def _consume(self): - raise bt2.Stop - - def _port_connected(self, port, other_port): - nonlocal exc - - try: - self.graph.run() - except Exception as e: - exc = e + def _user_consume(self): + # Pretend that somebody asynchronously interrupted the graph. + nonlocal graph + graph.interrupt() + return next(self._msg_iter) - return True + def _user_graph_is_configured(self): + self._msg_iter = self._create_input_port_message_iterator( + self._input_ports['in'] + ) - exc = None - src = self._graph.add_component(MySource, 'src') - sink = self._graph.add_component(MySink, 'sink') - self._graph.connect_ports(src.output_ports['out'], - sink.input_ports['in']) - self._graph.run() - self.assertIs(type(exc), bt2.CannotConsumeGraph) + graph = self._graph + up = self._graph.add_component(MySource, 'down') + down = self._graph.add_component(MySink, 'up') + self._graph.connect_ports(up.output_ports['out'], down.input_ports['in']) - def test_cancel(self): - self.assertFalse(self._graph.is_canceled) - self._graph.cancel() - self.assertTrue(self._graph.is_canceled) + with self.assertRaises(bt2.TryAgain): + self._graph.run() def test_run(self): - class MyIter(bt2._UserMessageIterator): - def __init__(self): - self._build_meta() - self._at = 0 - - def _build_meta(self): - self._trace = bt2.Trace() - self._sc = bt2.StreamClass() - self._ec = bt2.EventClass('salut') - self._my_int_fc = bt2.IntegerFieldClass(32) - self._ec.payload_field_class = bt2.StructureFieldClass() - self._ec.payload_field_class += collections.OrderedDict([ - ('my_int', self._my_int_fc), - ]) - self._sc.add_event_class(self._ec) - self._trace.add_stream_class(self._sc) - self._stream = self._sc() - self._packet = self._stream.create_packet() - - def _create_event(self, value): - ev = self._ec() - ev.payload_field['my_int'] = value - ev.packet = self._packet - return ev - + class MyIter(_MyIter): def __next__(self): - if self._at == 5: - raise bt2.Stop + if self._at == 9: + raise StopIteration + + if self._at == 0: + msg = self._create_stream_beginning_message(self._stream) + elif self._at == 1: + msg = self._create_packet_beginning_message(self._packet) + elif self._at == 7: + msg = self._create_packet_end_message(self._packet) + elif self._at == 8: + msg = self._create_stream_end_message(self._stream) + else: + msg = self._create_event_message(self._ec, self._packet) - msg = bt2.EventMessage(self._create_event(self._at * 3)) self._at += 1 return msg - class MySource(bt2._UserSourceComponent, - message_iterator_class=MyIter): + class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter): def __init__(self, params): self._add_output_port('out') class MySink(bt2._UserSinkComponent): def __init__(self, params): - self._add_input_port('in') + self._input_port = self._add_input_port('in') self._at = 0 - def _consume(comp_self): + def _user_consume(comp_self): msg = next(comp_self._msg_iter) if comp_self._at == 0: - self.assertIsInstance(msg, bt2.StreamBeginningMessage) + self.assertIsInstance(msg, bt2._StreamBeginningMessage) elif comp_self._at == 1: - self.assertIsInstance(msg, bt2.PacketBeginningMessage) + self.assertIsInstance(msg, bt2._PacketBeginningMessage) elif comp_self._at >= 2 and comp_self._at <= 6: - self.assertIsInstance(msg, bt2.EventMessage) - self.assertEqual(msg.event.event_class.name, 'salut') - field = msg.event.payload_field['my_int'] - self.assertEqual(field, (comp_self._at - 2) * 3) + self.assertIsInstance(msg, bt2._EventMessage) + self.assertEqual(msg.event.cls.name, 'salut') elif comp_self._at == 7: - self.assertIsInstance(msg, bt2.PacketEndMessage) + self.assertIsInstance(msg, bt2._PacketEndMessage) elif comp_self._at == 8: - self.assertIsInstance(msg, bt2.StreamEndMessage) + self.assertIsInstance(msg, bt2._StreamEndMessage) comp_self._at += 1 - def _port_connected(self, port, other_port): - self._msg_iter = port.connection.create_message_iterator() + def _user_graph_is_configured(self): + self._msg_iter = self._create_input_port_message_iterator( + self._input_port + ) src = self._graph.add_component(MySource, 'src') sink = self._graph.add_component(MySink, 'sink') - conn = self._graph.connect_ports(src.output_ports['out'], - sink.input_ports['in']) + conn = self._graph.connect_ports( + src.output_ports['out'], sink.input_ports['in'] + ) self._graph.run() def test_run_again(self): - class MyIter(bt2._UserMessageIterator): - def __init__(self): - self._build_meta() - self._at = 0 - - def _build_meta(self): - self._trace = bt2.Trace() - self._sc = bt2.StreamClass() - self._ec = bt2.EventClass('salut') - self._my_int_fc = bt2.IntegerFieldClass(32) - self._ec.payload_field_class = bt2.StructureFieldClass() - self._ec.payload_field_class += collections.OrderedDict([ - ('my_int', self._my_int_fc), - ]) - self._sc.add_event_class(self._ec) - self._trace.add_stream_class(self._sc) - self._stream = self._sc() - self._packet = self._stream.create_packet() - - def _create_event(self, value): - ev = self._ec() - ev.payload_field['my_int'] = value - ev.packet = self._packet - return ev - + class MyIter(_MyIter): def __next__(self): - if self._at == 1: + if self._at == 3: raise bt2.TryAgain - msg = bt2.EventMessage(self._create_event(self._at * 3)) + if self._at == 0: + msg = self._create_stream_beginning_message(self._stream) + elif self._at == 1: + msg = self._create_packet_beginning_message(self._packet) + elif self._at == 2: + msg = self._create_event_message(self._ec, self._packet) + self._at += 1 return msg - class MySource(bt2._UserSourceComponent, - message_iterator_class=MyIter): + class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter): def __init__(self, params): self._add_output_port('out') class MySink(bt2._UserSinkComponent): def __init__(self, params): - self._add_input_port('in') + self._input_port = self._add_input_port('in') self._at = 0 - def _consume(comp_self): + def _user_consume(comp_self): + msg = next(comp_self._msg_iter) if comp_self._at == 0: - msg = next(comp_self._msg_iter) - self.assertIsInstance(msg, bt2.EventMessage) + self.assertIsInstance(msg, bt2._StreamBeginningMessage) elif comp_self._at == 1: - with self.assertRaises(bt2.TryAgain): - msg = next(comp_self._msg_iter) - + self.assertIsInstance(msg, bt2._PacketBeginningMessage) + elif comp_self._at == 2: + self.assertIsInstance(msg, bt2._EventMessage) raise bt2.TryAgain + else: + pass comp_self._at += 1 - def _port_connected(self, port, other_port): - types = [bt2.EventMessage] - self._msg_iter = port.connection.create_message_iterator(types) + def _user_graph_is_configured(self): + self._msg_iter = self._create_input_port_message_iterator( + self._input_port + ) src = self._graph.add_component(MySource, 'src') sink = self._graph.add_component(MySink, 'sink') - conn = self._graph.connect_ports(src.output_ports['out'], - sink.input_ports['in']) + conn = self._graph.connect_ports( + src.output_ports['out'], sink.input_ports['in'] + ) with self.assertRaises(bt2.TryAgain): self._graph.run() - def test_run_no_sink(self): - class MyIter(bt2._UserMessageIterator): - pass - - class MySource(bt2._UserSourceComponent, - message_iterator_class=MyIter): - def __init__(self, params): - self._add_output_port('out') - - class MyFilter(bt2._UserFilterComponent, - message_iterator_class=MyIter): - def __init__(self, params): - self._add_output_port('out') - self._add_input_port('in') - - src = self._graph.add_component(MySource, 'src') - flt = self._graph.add_component(MyFilter, 'flt') - conn = self._graph.connect_ports(src.output_ports['out'], - flt.input_ports['in']) - - with self.assertRaises(bt2.NoSinkComponent): - self._graph.run() - def test_run_error(self): - class MyIter(bt2._UserMessageIterator): - def __init__(self): - self._build_meta() - self._at = 0 - - def _build_meta(self): - self._trace = bt2.Trace() - self._sc = bt2.StreamClass() - self._ec = bt2.EventClass('salut') - self._my_int_fc = bt2.IntegerFieldClass(32) - self._ec.payload_field_class = bt2.StructureFieldClass() - self._ec.payload_field_class += collections.OrderedDict([ - ('my_int', self._my_int_fc), - ]) - self._sc.add_event_class(self._ec) - self._trace.add_stream_class(self._sc) - self._stream = self._sc() - self._packet = self._stream.create_packet() - - def _create_event(self, value): - ev = self._ec() - ev.payload_field['my_int'] = value - ev.packet = self._packet - return ev + raised_in_sink = False + class MyIter(_MyIter): def __next__(self): - if self._at == 1: + # If this gets called after the sink raised an exception, it is + # an error. + nonlocal raised_in_sink + assert raised_in_sink is False + + if self._at == 0: + msg = self._create_stream_beginning_message(self._stream) + elif self._at == 1: + msg = self._create_packet_beginning_message(self._packet) + elif self._at == 2 or self._at == 3: + msg = self._create_event_message(self._ec, self._packet) + else: raise bt2.TryAgain - - msg = bt2.EventMessage(self._create_event(self._at * 3)) self._at += 1 return msg - class MySource(bt2._UserSourceComponent, - message_iterator_class=MyIter): + class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter): def __init__(self, params): self._add_output_port('out') class MySink(bt2._UserSinkComponent): def __init__(self, params): - self._add_input_port('in') + self._input_port = self._add_input_port('in') self._at = 0 - def _consume(comp_self): + def _user_consume(comp_self): + msg = next(comp_self._msg_iter) if comp_self._at == 0: - msg = next(comp_self._msg_iter) - self.assertIsInstance(msg, bt2.EventMessage) + self.assertIsInstance(msg, bt2._StreamBeginningMessage) elif comp_self._at == 1: + self.assertIsInstance(msg, bt2._PacketBeginningMessage) + elif comp_self._at == 2: + self.assertIsInstance(msg, bt2._EventMessage) + elif comp_self._at == 3: + nonlocal raised_in_sink + raised_in_sink = True raise RuntimeError('error!') comp_self._at += 1 - def _port_connected(self, port, other_port): - types = [bt2.EventMessage] - self._msg_iter = port.connection.create_message_iterator(types) + def _user_graph_is_configured(self): + self._msg_iter = self._create_input_port_message_iterator( + self._input_port + ) src = self._graph.add_component(MySource, 'src') sink = self._graph.add_component(MySink, 'sink') - conn = self._graph.connect_ports(src.output_ports['out'], - sink.input_ports['in']) + conn = self._graph.connect_ports( + src.output_ports['out'], sink.input_ports['in'] + ) - with self.assertRaises(bt2.Error): + with self.assertRaises(bt2._Error): self._graph.run() - def test_run_cannot_consume(self): + def test_listeners(self): class MyIter(bt2._UserMessageIterator): - pass + def __next__(self): + raise bt2.Stop - class MySource(bt2._UserSourceComponent, - message_iterator_class=MyIter): + class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter): def __init__(self, params): self._add_output_port('out') + self._add_output_port('zero') class MySink(bt2._UserSinkComponent): def __init__(self, params): self._add_input_port('in') - self._at = 0 - def _consume(comp_self): - nonlocal exc + def _user_consume(self): + raise bt2.Stop - try: - print('going in') - comp_self.graph.run() - print('going out') - except Exception as e: - exc = e + def _user_port_connected(self, port, other_port): + self._add_input_port('taste') - raise bt2.Stop + def port_added_listener(component, port): + nonlocal calls + calls.append((port_added_listener, component, port)) + + def ports_connected_listener( + upstream_component, upstream_port, downstream_component, downstream_port + ): + nonlocal calls + calls.append( + ( + ports_connected_listener, + upstream_component, + upstream_port, + downstream_component, + downstream_port, + ) + ) - exc = None + calls = [] + self._graph.add_port_added_listener(port_added_listener) + self._graph.add_ports_connected_listener(ports_connected_listener) src = self._graph.add_component(MySource, 'src') sink = self._graph.add_component(MySink, 'sink') - conn = self._graph.connect_ports(src.output_ports['out'], - sink.input_ports['in']) - self._graph.run() - self.assertIs(type(exc), bt2.CannotConsumeGraph) + self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in']) - def test_listeners(self): + self.assertEqual(len(calls), 5) + + self.assertIs(calls[0][0], port_added_listener) + self.assertEqual(calls[0][1].name, 'src') + self.assertEqual(calls[0][2].name, 'out') + + self.assertIs(calls[1][0], port_added_listener) + self.assertEqual(calls[1][1].name, 'src') + self.assertEqual(calls[1][2].name, 'zero') + + self.assertIs(calls[2][0], port_added_listener) + self.assertEqual(calls[2][1].name, 'sink') + self.assertEqual(calls[2][2].name, 'in') + + self.assertIs(calls[3][0], port_added_listener) + self.assertEqual(calls[3][1].name, 'sink') + self.assertEqual(calls[3][2].name, 'taste') + + self.assertIs(calls[4][0], ports_connected_listener) + self.assertEqual(calls[4][1].name, 'src') + self.assertEqual(calls[4][2].name, 'out') + self.assertEqual(calls[4][3].name, 'sink') + self.assertEqual(calls[4][4].name, 'in') + + def test_invalid_listeners(self): class MyIter(bt2._UserMessageIterator): def __next__(self): raise bt2.Stop - class MySource(bt2._UserSourceComponent, - message_iterator_class=MyIter): + class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter): def __init__(self, params): self._add_output_port('out') self._add_output_port('zero') - def _port_connected(self, port, other_port): - self._output_ports['zero'].remove_from_component() - class MySink(bt2._UserSinkComponent): def __init__(self, params): self._add_input_port('in') - def _consume(self): + def _user_consume(self): raise bt2.Stop - def _port_connected(self, port, other_port): + def _user_port_connected(self, port, other_port): self._add_input_port('taste') - def _port_disconnected(self, port): - port.remove_from_component() + with self.assertRaises(TypeError): + self._graph.add_port_added_listener(1234) + with self.assertRaises(TypeError): + self._graph.add_ports_connected_listener(1234) - def port_added_listener(port): - nonlocal calls - calls.append((port_added_listener, port)) + def test_raise_in_component_init(self): + class MySink(bt2._UserSinkComponent): + def __init__(self, params): + raise ValueError('oops!') - def port_removed_listener(port): - nonlocal calls - calls.append((port_removed_listener, port)) + def _user_consume(self): + raise bt2.Stop - def ports_connected_listener(upstream_port, downstream_port): - nonlocal calls - calls.append((ports_connected_listener, upstream_port, - downstream_port)) + graph = bt2.Graph() - def ports_disconnected_listener(upstream_comp, downstream_comp, - upstream_port, downstream_port): - nonlocal calls - calls.append((ports_disconnected_listener, upstream_comp, - downstream_comp, upstream_port, downstream_port)) + with self.assertRaises(bt2._Error): + graph.add_component(MySink, 'comp') - calls = [] - self._graph.add_listener(bt2.GraphListenerType.PORT_ADDED, - port_added_listener) - self._graph.add_listener(bt2.GraphListenerType.PORT_REMOVED, - port_removed_listener) - self._graph.add_listener(bt2.GraphListenerType.PORTS_CONNECTED, - ports_connected_listener) - self._graph.add_listener(bt2.GraphListenerType.PORTS_DISCONNECTED, - ports_disconnected_listener) - src = self._graph.add_component(MySource, 'src') - sink = self._graph.add_component(MySink, 'sink') - self._graph.connect_ports(src.output_ports['out'], - sink.input_ports['in']) - sink.input_ports['in'].disconnect() - self.assertIs(calls[0][0], port_added_listener) - self.assertEqual(calls[0][1].name, 'out') - self.assertIs(calls[1][0], port_added_listener) - self.assertEqual(calls[1][1].name, 'zero') - self.assertIs(calls[2][0], port_added_listener) - self.assertEqual(calls[2][1].name, 'in') - self.assertIs(calls[3][0], port_removed_listener) - self.assertEqual(calls[3][1].name, 'zero') - self.assertIs(calls[4][0], port_added_listener) - self.assertEqual(calls[4][1].name, 'taste') - self.assertIs(calls[5][0], ports_connected_listener) - self.assertEqual(calls[5][1].name, 'out') - self.assertEqual(calls[5][2].name, 'in') - self.assertIs(calls[6][0], port_removed_listener) - self.assertEqual(calls[6][1].name, 'in') - self.assertIs(calls[7][0], ports_disconnected_listener) - self.assertEqual(calls[7][1].name, 'src') - self.assertEqual(calls[7][2].name, 'sink') - self.assertEqual(calls[7][3].name, 'out') - self.assertEqual(calls[7][4].name, 'in') - del calls + def test_raise_in_port_added_listener(self): + class MySink(bt2._UserSinkComponent): + def __init__(self, params): + self._add_input_port('in') + + def _user_consume(self): + raise bt2.Stop + + def port_added_listener(component, port): + raise ValueError('oh noes!') + + graph = bt2.Graph() + graph.add_port_added_listener(port_added_listener) + + with self.assertRaises(bt2._Error): + graph.add_component(MySink, 'comp') + + def test_raise_in_ports_connected_listener(self): + class MyIter(bt2._UserMessageIterator): + def __next__(self): + raise bt2.Stop + + class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter): + def __init__(self, params): + self._add_output_port('out') + + class MySink(bt2._UserSinkComponent): + def __init__(self, params): + self._add_input_port('in') + + def _user_consume(self): + raise bt2.Stop + + def ports_connected_listener( + upstream_component, upstream_port, downstream_component, downstream_port + ): + raise ValueError('oh noes!') + + graph = bt2.Graph() + graph.add_ports_connected_listener(ports_connected_listener) + up = graph.add_component(MySource, 'down') + down = graph.add_component(MySink, 'up') + + with self.assertRaises(bt2._Error): + graph.connect_ports(up.output_ports['out'], down.input_ports['in'])