lib: make packets and packet messages optional, disabled by default
[babeltrace.git] / tests / bindings / python / bt2 / test_graph.py
index 73788b48001f3b31dd2b3ce00171a7269fa2153e..852e3f01ce23e24c55f57fe15f64234fe9a6fa8d 100644 (file)
@@ -1,3 +1,21 @@
+#
+# Copyright (C) 2019 EfficiOS Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; only version 2
+# of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+#
+
 from bt2 import value
 import collections
 import unittest
@@ -5,7 +23,32 @@ import copy
 import bt2
 
 
-@unittest.skip("this is broken")
+class _MyIter(bt2._UserMessageIterator):
+    def __init__(self, self_output_port):
+        self._build_meta()
+        self._at = 0
+
+    def _build_meta(self):
+        self._tc = self._component._create_trace_class()
+        self._t = self._tc()
+        self._sc = self._tc.create_stream_class(supports_packets=True)
+        self._ec = self._sc.create_event_class(name='salut')
+        self._my_int_ft = self._tc.create_signed_integer_field_class(32)
+        payload_ft = self._tc.create_structure_field_class()
+        payload_ft += collections.OrderedDict([
+            ('my_int', self._my_int_ft),
+        ])
+        self._ec.payload_field_type = payload_ft
+        self._stream = self._t.create_stream(self._sc)
+        self._packet = self._stream.create_packet()
+
+    def _create_event(self, value):
+        ev = self._ec()
+        ev.payload_field['my_int'] = value
+        ev.packet = self._packet
+        return ev
+
+
 class GraphTestCase(unittest.TestCase):
     def setUp(self):
         self._graph = bt2.Graph()
@@ -30,8 +73,8 @@ class GraphTestCase(unittest.TestCase):
                 pass
 
         comp = self._graph.add_component(MySink, 'salut')
-        assert(comp)
-        comp2 = self._graph.add_component(comp.component_class, 'salut2')
+        assert comp
+        comp2 = self._graph.add_component(comp.cls, 'salut2')
         self.assertEqual(comp2.name, 'salut2')
 
     def test_add_component_params(self):
@@ -54,84 +97,32 @@ class GraphTestCase(unittest.TestCase):
         with self.assertRaises(TypeError):
             self._graph.add_component(int, 'salut')
 
-    def test_connect_ports(self):
-        class MyIter(bt2._UserMessageIterator):
-            def __next__(self):
-                raise bt2.Stop
-
-        class MySource(bt2._UserSourceComponent,
-                       message_iterator_class=MyIter):
-            def __init__(self, params):
-                self._add_output_port('out')
-
+    def test_add_component_invalid_logging_level_type(self):
         class MySink(bt2._UserSinkComponent):
-            def __init__(self, params):
-                self._add_input_port('in')
-
             def _consume(self):
-                raise bt2.Stop
-
-        src = self._graph.add_component(MySource, 'src')
-        sink = self._graph.add_component(MySink, 'sink')
-        conn = self._graph.connect_ports(src.output_ports['out'],
-                                         sink.input_ports['in'])
-        self.assertTrue(src.output_ports['out'].is_connected)
-        self.assertTrue(sink.input_ports['in'].is_connected)
-        self.assertEqual(src.output_ports['out'].connection, conn)
-        self.assertEqual(sink.input_ports['in'].connection, conn)
-
-    def test_connect_ports_invalid_direction(self):
-        class MyIter(bt2._UserMessageIterator):
-            def __next__(self):
-                raise bt2.Stop
+                pass
 
-        class MySource(bt2._UserSourceComponent,
-                       message_iterator_class=MyIter):
-            def __init__(self, params):
-                self._add_output_port('out')
+        with self.assertRaises(TypeError):
+            self._graph.add_component(MySink, 'salut', logging_level='yo')
 
+    def test_add_component_invalid_logging_level_value(self):
         class MySink(bt2._UserSinkComponent):
-            def __init__(self, params):
-                self._add_input_port('in')
-
             def _consume(self):
-                raise bt2.Stop
-
-        src = self._graph.add_component(MySource, 'src')
-        sink = self._graph.add_component(MySink, 'sink')
-
-        with self.assertRaises(TypeError):
-            conn = self._graph.connect_ports(sink.input_ports['in'],
-                                             src.output_ports['out'])
-
-    def test_connect_ports_refused(self):
-        class MyIter(bt2._UserMessageIterator):
-            def __next__(self):
-                raise bt2.Stop
+                pass
 
-        class MySource(bt2._UserSourceComponent,
-                       message_iterator_class=MyIter):
-            def __init__(self, params):
-                self._add_output_port('out')
+        with self.assertRaises(ValueError):
+            self._graph.add_component(MySink, 'salut', logging_level=12345)
 
+    def test_add_component_logging_level(self):
         class MySink(bt2._UserSinkComponent):
-            def __init__(self, params):
-                self._add_input_port('in')
-
             def _consume(self):
-                raise bt2.Stop
-
-            def _accept_port_connection(self, port, other_port):
-                return False
-
-        src = self._graph.add_component(MySource, 'src')
-        sink = self._graph.add_component(MySink, 'sink')
+                pass
 
-        with self.assertRaises(bt2.PortConnectionRefused):
-            conn = self._graph.connect_ports(src.output_ports['out'],
-                                             sink.input_ports['in'])
+        comp = self._graph.add_component(MySink, 'salut',
+                                         logging_level=bt2.LoggingLevel.DEBUG)
+        self.assertEqual(comp.logging_level, bt2.LoggingLevel.DEBUG)
 
-    def test_connect_ports_canceled(self):
+    def test_connect_ports(self):
         class MyIter(bt2._UserMessageIterator):
             def __next__(self):
                 raise bt2.Stop
@@ -150,13 +141,15 @@ class GraphTestCase(unittest.TestCase):
 
         src = self._graph.add_component(MySource, 'src')
         sink = self._graph.add_component(MySink, 'sink')
-        self._graph.cancel()
 
-        with self.assertRaises(bt2.GraphCanceled):
-            conn = self._graph.connect_ports(src.output_ports['out'],
-                                             sink.input_ports['in'])
+        conn = self._graph.connect_ports(src.output_ports['out'],
+                                         sink.input_ports['in'])
+        self.assertTrue(src.output_ports['out'].is_connected)
+        self.assertTrue(sink.input_ports['in'].is_connected)
+        self.assertEqual(src.output_ports['out'].connection.addr, conn.addr)
+        self.assertEqual(sink.input_ports['in'].connection.addr, conn.addr)
 
-    def test_connect_ports_cannot_consume_accept(self):
+    def test_connect_ports_invalid_direction(self):
         class MyIter(bt2._UserMessageIterator):
             def __next__(self):
                 raise bt2.Stop
@@ -173,27 +166,24 @@ class GraphTestCase(unittest.TestCase):
             def _consume(self):
                 raise bt2.Stop
 
-            def _accept_port_connection(self, port, other_port):
-                nonlocal exc
-
-                try:
-                    self.graph.run()
-                except Exception as e:
-                    exc = e
-
-                return True
-
-        exc = None
         src = self._graph.add_component(MySource, 'src')
         sink = self._graph.add_component(MySink, 'sink')
-        self._graph.connect_ports(src.output_ports['out'],
-                                  sink.input_ports['in'])
-        self.assertIs(type(exc), bt2.CannotConsumeGraph)
 
-    def test_connect_ports_cannot_consume_connected(self):
-        class MyIter(bt2._UserMessageIterator):
+        with self.assertRaises(TypeError):
+            conn = self._graph.connect_ports(sink.input_ports['in'],
+                                             src.output_ports['out'])
+
+    def test_cancel(self):
+        self.assertFalse(self._graph.is_canceled)
+        self._graph.cancel()
+        self.assertTrue(self._graph.is_canceled)
+
+    # Test that Graph.run() raises bt2.Canceled if the graph gets canceled
+    # during execution.
+    def test_cancel_while_running(self):
+        class MyIter(_MyIter):
             def __next__(self):
-                raise bt2.Stop
+                return self._create_stream_beginning_message(self._stream)
 
         class MySource(bt2._UserSourceComponent,
                        message_iterator_class=MyIter):
@@ -205,62 +195,39 @@ class GraphTestCase(unittest.TestCase):
                 self._add_input_port('in')
 
             def _consume(self):
-                raise bt2.Stop
-
-            def _port_connected(self, port, other_port):
-                nonlocal exc
-
-                try:
-                    self.graph.run()
-                except Exception as e:
-                    exc = e
+                # Pretend that somebody asynchronously cancelled the graph.
+                nonlocal graph
+                graph.cancel()
 
-                return True
+                return next(self._msg_iter)
 
-        exc = None
-        src = self._graph.add_component(MySource, 'src')
-        sink = self._graph.add_component(MySink, 'sink')
-        self._graph.connect_ports(src.output_ports['out'],
-                                  sink.input_ports['in'])
-        self._graph.run()
-        self.assertIs(type(exc), bt2.CannotConsumeGraph)
+            def _graph_is_configured(self):
+                self._msg_iter = self._input_ports['in'].create_message_iterator()
 
-    def test_cancel(self):
-        self.assertFalse(self._graph.is_canceled)
-        self._graph.cancel()
-        self.assertTrue(self._graph.is_canceled)
+        graph = bt2.Graph()
+        up = graph.add_component(MySource, 'down')
+        down = graph.add_component(MySink, 'up')
+        graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
+        with self.assertRaises(bt2.Canceled):
+            graph.run()
 
     def test_run(self):
-        class MyIter(bt2._UserMessageIterator):
-            def __init__(self):
-                self._build_meta()
-                self._at = 0
-
-            def _build_meta(self):
-                self._trace = bt2.Trace()
-                self._sc = bt2.StreamClass()
-                self._ec = bt2.EventClass('salut')
-                self._my_int_fc = bt2.IntegerFieldClass(32)
-                self._ec.payload_field_class = bt2.StructureFieldClass()
-                self._ec.payload_field_class += collections.OrderedDict([
-                    ('my_int', self._my_int_fc),
-                ])
-                self._sc.add_event_class(self._ec)
-                self._trace.add_stream_class(self._sc)
-                self._stream = self._sc()
-                self._packet = self._stream.create_packet()
-
-            def _create_event(self, value):
-                ev = self._ec()
-                ev.payload_field['my_int'] = value
-                ev.packet = self._packet
-                return ev
-
+        class MyIter(_MyIter):
             def __next__(self):
-                if self._at == 5:
-                    raise bt2.Stop
+                if self._at == 9:
+                    raise StopIteration
+
+                if self._at == 0:
+                    msg = self._create_stream_beginning_message(self._stream)
+                elif self._at == 1:
+                    msg = self._create_packet_beginning_message(self._packet)
+                elif self._at == 7:
+                    msg = self._create_packet_end_message(self._packet)
+                elif self._at == 8:
+                    msg = self._create_stream_end_message(self._stream)
+                else:
+                    msg = self._create_event_message(self._ec, self._packet)
 
-                msg = bt2.EventMessage(self._create_event(self._at * 3))
                 self._at += 1
                 return msg
 
@@ -271,30 +238,28 @@ class GraphTestCase(unittest.TestCase):
 
         class MySink(bt2._UserSinkComponent):
             def __init__(self, params):
-                self._add_input_port('in')
+                self._input_port = self._add_input_port('in')
                 self._at = 0
 
             def _consume(comp_self):
                 msg = next(comp_self._msg_iter)
 
                 if comp_self._at == 0:
-                    self.assertIsInstance(msg, bt2.StreamBeginningMessage)
+                    self.assertIsInstance(msg, bt2.message._StreamBeginningMessage)
                 elif comp_self._at == 1:
-                    self.assertIsInstance(msg, bt2.PacketBeginningMessage)
+                    self.assertIsInstance(msg, bt2.message._PacketBeginningMessage)
                 elif comp_self._at >= 2 and comp_self._at <= 6:
-                    self.assertIsInstance(msg, bt2.EventMessage)
-                    self.assertEqual(msg.event.event_class.name, 'salut')
-                    field = msg.event.payload_field['my_int']
-                    self.assertEqual(field, (comp_self._at - 2) * 3)
+                    self.assertIsInstance(msg, bt2.message._EventMessage)
+                    self.assertEqual(msg.event.cls.name, 'salut')
                 elif comp_self._at == 7:
-                    self.assertIsInstance(msg, bt2.PacketEndMessage)
+                    self.assertIsInstance(msg, bt2.message._PacketEndMessage)
                 elif comp_self._at == 8:
-                    self.assertIsInstance(msg, bt2.StreamEndMessage)
+                    self.assertIsInstance(msg, bt2.message._StreamEndMessage)
 
                 comp_self._at += 1
 
-            def _port_connected(self, port, other_port):
-                self._msg_iter = port.connection.create_message_iterator()
+            def _graph_is_configured(self):
+                self._msg_iter = self._input_port.create_message_iterator()
 
         src = self._graph.add_component(MySource, 'src')
         sink = self._graph.add_component(MySink, 'sink')
@@ -303,36 +268,18 @@ class GraphTestCase(unittest.TestCase):
         self._graph.run()
 
     def test_run_again(self):
-        class MyIter(bt2._UserMessageIterator):
-            def __init__(self):
-                self._build_meta()
-                self._at = 0
-
-            def _build_meta(self):
-                self._trace = bt2.Trace()
-                self._sc = bt2.StreamClass()
-                self._ec = bt2.EventClass('salut')
-                self._my_int_fc = bt2.IntegerFieldClass(32)
-                self._ec.payload_field_class = bt2.StructureFieldClass()
-                self._ec.payload_field_class += collections.OrderedDict([
-                    ('my_int', self._my_int_fc),
-                ])
-                self._sc.add_event_class(self._ec)
-                self._trace.add_stream_class(self._sc)
-                self._stream = self._sc()
-                self._packet = self._stream.create_packet()
-
-            def _create_event(self, value):
-                ev = self._ec()
-                ev.payload_field['my_int'] = value
-                ev.packet = self._packet
-                return ev
-
+        class MyIter(_MyIter):
             def __next__(self):
-                if self._at == 1:
+                if self._at == 3:
                     raise bt2.TryAgain
 
-                msg = bt2.EventMessage(self._create_event(self._at * 3))
+                if self._at == 0:
+                    msg = self._create_stream_beginning_message(self._stream)
+                elif self._at == 1:
+                    msg = self._create_packet_beginning_message(self._packet)
+                elif self._at == 2:
+                    msg = self._create_event_message(self._ec, self._packet)
+
                 self._at += 1
                 return msg
 
@@ -343,24 +290,25 @@ class GraphTestCase(unittest.TestCase):
 
         class MySink(bt2._UserSinkComponent):
             def __init__(self, params):
-                self._add_input_port('in')
+                self._input_port = self._add_input_port('in')
                 self._at = 0
 
             def _consume(comp_self):
+                msg = next(comp_self._msg_iter)
                 if comp_self._at == 0:
-                    msg = next(comp_self._msg_iter)
-                    self.assertIsInstance(msg, bt2.EventMessage)
+                    self.assertIsInstance(msg, bt2.message._StreamBeginningMessage)
                 elif comp_self._at == 1:
-                    with self.assertRaises(bt2.TryAgain):
-                        msg = next(comp_self._msg_iter)
-
+                    self.assertIsInstance(msg, bt2.message._PacketBeginningMessage)
+                elif comp_self._at == 2:
+                    self.assertIsInstance(msg, bt2.message._EventMessage)
                     raise bt2.TryAgain
+                else:
+                    pass
 
                 comp_self._at += 1
 
-            def _port_connected(self, port, other_port):
-                types = [bt2.EventMessage]
-                self._msg_iter = port.connection.create_message_iterator(types)
+            def _graph_is_configured(self):
+                self._msg_iter = self._input_port.create_message_iterator()
 
         src = self._graph.add_component(MySource, 'src')
         sink = self._graph.add_component(MySink, 'sink')
@@ -371,36 +319,23 @@ class GraphTestCase(unittest.TestCase):
             self._graph.run()
 
     def test_run_error(self):
-        class MyIter(bt2._UserMessageIterator):
-            def __init__(self):
-                self._build_meta()
-                self._at = 0
-
-            def _build_meta(self):
-                self._trace = bt2.Trace()
-                self._sc = bt2.StreamClass()
-                self._ec = bt2.EventClass('salut')
-                self._my_int_fc = bt2.IntegerFieldClass(32)
-                self._ec.payload_field_class = bt2.StructureFieldClass()
-                self._ec.payload_field_class += collections.OrderedDict([
-                    ('my_int', self._my_int_fc),
-                ])
-                self._sc.add_event_class(self._ec)
-                self._trace.add_stream_class(self._sc)
-                self._stream = self._sc()
-                self._packet = self._stream.create_packet()
-
-            def _create_event(self, value):
-                ev = self._ec()
-                ev.payload_field['my_int'] = value
-                ev.packet = self._packet
-                return ev
+        raised_in_sink = False
 
+        class MyIter(_MyIter):
             def __next__(self):
-                if self._at == 1:
+                # If this gets called after the sink raised an exception, it is
+                # an error.
+                nonlocal raised_in_sink
+                assert raised_in_sink is False
+
+                if self._at == 0:
+                    msg = self._create_stream_beginning_message(self._stream)
+                elif self._at == 1:
+                    msg = self._create_packet_beginning_message(self._packet)
+                elif self._at == 2 or self._at == 3:
+                    msg = self._create_event_message(self._ec, self._packet)
+                else:
                     raise bt2.TryAgain
-
-                msg = bt2.EventMessage(self._create_event(self._at * 3))
                 self._at += 1
                 return msg
 
@@ -411,21 +346,26 @@ class GraphTestCase(unittest.TestCase):
 
         class MySink(bt2._UserSinkComponent):
             def __init__(self, params):
-                self._add_input_port('in')
+                self._input_port = self._add_input_port('in')
                 self._at = 0
 
             def _consume(comp_self):
+                msg = next(comp_self._msg_iter)
                 if comp_self._at == 0:
-                    msg = next(comp_self._msg_iter)
-                    self.assertIsInstance(msg, bt2.EventMessage)
+                    self.assertIsInstance(msg, bt2.message._StreamBeginningMessage)
                 elif comp_self._at == 1:
+                    self.assertIsInstance(msg, bt2.message._PacketBeginningMessage)
+                elif comp_self._at == 2:
+                    self.assertIsInstance(msg, bt2.message._EventMessage)
+                elif comp_self._at == 3:
+                    nonlocal raised_in_sink
+                    raised_in_sink = True
                     raise RuntimeError('error!')
 
                 comp_self._at += 1
 
-            def _port_connected(self, port, other_port):
-                types = [bt2.EventMessage]
-                self._msg_iter = port.connection.create_message_iterator(types)
+            def _graph_is_configured(self):
+                self._msg_iter = self._input_port.create_message_iterator()
 
         src = self._graph.add_component(MySource, 'src')
         sink = self._graph.add_component(MySink, 'sink')
@@ -435,41 +375,71 @@ class GraphTestCase(unittest.TestCase):
         with self.assertRaises(bt2.Error):
             self._graph.run()
 
-    def test_run_cannot_consume(self):
+    def test_listeners(self):
         class MyIter(bt2._UserMessageIterator):
-            pass
+            def __next__(self):
+                raise bt2.Stop
 
         class MySource(bt2._UserSourceComponent,
                        message_iterator_class=MyIter):
             def __init__(self, params):
                 self._add_output_port('out')
+                self._add_output_port('zero')
 
         class MySink(bt2._UserSinkComponent):
             def __init__(self, params):
                 self._add_input_port('in')
-                self._at = 0
 
-            def _consume(comp_self):
-                nonlocal exc
+            def _consume(self):
+                raise bt2.Stop
 
-                try:
-                    print('going in')
-                    comp_self.graph.run()
-                    print('going out')
-                except Exception as e:
-                    exc = e
+            def _port_connected(self, port, other_port):
+                self._add_input_port('taste')
 
-                raise bt2.Stop
+        def port_added_listener(component, port):
+            nonlocal calls
+            calls.append((port_added_listener, component, port))
+
+        def ports_connected_listener(upstream_component, upstream_port,
+                                     downstream_component, downstream_port):
+            nonlocal calls
+            calls.append((ports_connected_listener,
+                          upstream_component, upstream_port,
+                          downstream_component, downstream_port))
 
-        exc = None
+        calls = []
+        self._graph.add_port_added_listener(port_added_listener)
+        self._graph.add_ports_connected_listener(ports_connected_listener)
         src = self._graph.add_component(MySource, 'src')
         sink = self._graph.add_component(MySink, 'sink')
-        conn = self._graph.connect_ports(src.output_ports['out'],
-                                         sink.input_ports['in'])
-        self._graph.run()
-        self.assertIs(type(exc), bt2.CannotConsumeGraph)
+        self._graph.connect_ports(src.output_ports['out'],
+                                  sink.input_ports['in'])
 
-    def test_listeners(self):
+        self.assertEqual(len(calls), 5)
+
+        self.assertIs(calls[0][0], port_added_listener)
+        self.assertEqual(calls[0][1].name, 'src')
+        self.assertEqual(calls[0][2].name, 'out')
+
+        self.assertIs(calls[1][0], port_added_listener)
+        self.assertEqual(calls[1][1].name, 'src')
+        self.assertEqual(calls[1][2].name, 'zero')
+
+        self.assertIs(calls[2][0], port_added_listener)
+        self.assertEqual(calls[2][1].name, 'sink')
+        self.assertEqual(calls[2][2].name, 'in')
+
+        self.assertIs(calls[3][0], port_added_listener)
+        self.assertEqual(calls[3][1].name, 'sink')
+        self.assertEqual(calls[3][2].name, 'taste')
+
+        self.assertIs(calls[4][0], ports_connected_listener)
+        self.assertEqual(calls[4][1].name, 'src')
+        self.assertEqual(calls[4][2].name, 'out')
+        self.assertEqual(calls[4][3].name, 'sink')
+        self.assertEqual(calls[4][4].name, 'in')
+
+    def test_invalid_listeners(self):
         class MyIter(bt2._UserMessageIterator):
             def __next__(self):
                 raise bt2.Stop
@@ -480,9 +450,6 @@ class GraphTestCase(unittest.TestCase):
                 self._add_output_port('out')
                 self._add_output_port('zero')
 
-            def _port_connected(self, port, other_port):
-                self._output_ports['zero'].remove_from_component()
-
         class MySink(bt2._UserSinkComponent):
             def __init__(self, params):
                 self._add_input_port('in')
@@ -493,60 +460,66 @@ class GraphTestCase(unittest.TestCase):
             def _port_connected(self, port, other_port):
                 self._add_input_port('taste')
 
-            def _port_disconnected(self, port):
-                port.remove_from_component()
+        with self.assertRaises(TypeError):
+            self._graph.add_port_added_listener(1234)
+        with self.assertRaises(TypeError):
+            self._graph.add_ports_connected_listener(1234)
 
-        def port_added_listener(port):
-            nonlocal calls
-            calls.append((port_added_listener, port))
+    def test_raise_in_component_init(self):
+        class MySink(bt2._UserSinkComponent):
+            def __init__(self, params):
+                raise ValueError('oops!')
 
-        def port_removed_listener(port):
-            nonlocal calls
-            calls.append((port_removed_listener, port))
+            def _consume(self):
+                raise bt2.Stop
 
-        def ports_connected_listener(upstream_port, downstream_port):
-            nonlocal calls
-            calls.append((ports_connected_listener, upstream_port,
-                          downstream_port))
+        graph = bt2.Graph()
 
-        def ports_disconnected_listener(upstream_comp, downstream_comp,
-                                        upstream_port, downstream_port):
-            nonlocal calls
-            calls.append((ports_disconnected_listener, upstream_comp,
-                          downstream_comp, upstream_port, downstream_port))
+        with self.assertRaises(bt2.Error):
+            graph.add_component(MySink, 'comp')
 
-        calls = []
-        self._graph.add_listener(bt2.GraphListenerType.PORT_ADDED,
-                                 port_added_listener)
-        self._graph.add_listener(bt2.GraphListenerType.PORT_REMOVED,
-                                 port_removed_listener)
-        self._graph.add_listener(bt2.GraphListenerType.PORTS_CONNECTED,
-                                 ports_connected_listener)
-        self._graph.add_listener(bt2.GraphListenerType.PORTS_DISCONNECTED,
-                                 ports_disconnected_listener)
-        src = self._graph.add_component(MySource, 'src')
-        sink = self._graph.add_component(MySink, 'sink')
-        self._graph.connect_ports(src.output_ports['out'],
-                                  sink.input_ports['in'])
-        sink.input_ports['in'].disconnect()
-        self.assertIs(calls[0][0], port_added_listener)
-        self.assertEqual(calls[0][1].name, 'out')
-        self.assertIs(calls[1][0], port_added_listener)
-        self.assertEqual(calls[1][1].name, 'zero')
-        self.assertIs(calls[2][0], port_added_listener)
-        self.assertEqual(calls[2][1].name, 'in')
-        self.assertIs(calls[3][0], port_removed_listener)
-        self.assertEqual(calls[3][1].name, 'zero')
-        self.assertIs(calls[4][0], port_added_listener)
-        self.assertEqual(calls[4][1].name, 'taste')
-        self.assertIs(calls[5][0], ports_connected_listener)
-        self.assertEqual(calls[5][1].name, 'out')
-        self.assertEqual(calls[5][2].name, 'in')
-        self.assertIs(calls[6][0], port_removed_listener)
-        self.assertEqual(calls[6][1].name, 'in')
-        self.assertIs(calls[7][0], ports_disconnected_listener)
-        self.assertEqual(calls[7][1].name, 'src')
-        self.assertEqual(calls[7][2].name, 'sink')
-        self.assertEqual(calls[7][3].name, 'out')
-        self.assertEqual(calls[7][4].name, 'in')
-        del calls
+    def test_raise_in_port_added_listener(self):
+        class MySink(bt2._UserSinkComponent):
+            def __init__(self, params):
+                self._add_input_port('in')
+
+            def _consume(self):
+                raise bt2.Stop
+
+        def port_added_listener(component, port):
+            raise ValueError('oh noes!')
+
+        graph = bt2.Graph()
+        graph.add_port_added_listener(port_added_listener)
+
+        with self.assertRaises(bt2.Error):
+            graph.add_component(MySink, 'comp')
+
+    def test_raise_in_ports_connected_listener(self):
+        class MyIter(bt2._UserMessageIterator):
+            def __next__(self):
+                raise bt2.Stop
+
+        class MySource(bt2._UserSourceComponent,
+                       message_iterator_class=MyIter):
+            def __init__(self, params):
+                self._add_output_port('out')
+
+        class MySink(bt2._UserSinkComponent):
+            def __init__(self, params):
+                self._add_input_port('in')
+
+            def _consume(self):
+                raise bt2.Stop
+
+        def ports_connected_listener(upstream_component, upstream_port,
+                                     downstream_component, downstream_port):
+            raise ValueError('oh noes!')
+
+        graph = bt2.Graph()
+        graph.add_ports_connected_listener(ports_connected_listener)
+        up = graph.add_component(MySource, 'down')
+        down = graph.add_component(MySink, 'up')
+
+        with self.assertRaises(bt2.Error):
+            graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
This page took 0.032147 seconds and 4 git commands to generate.