X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=tests%2Fbindings%2Fpython%2Fbt2%2Ftest_graph.py;h=acf283ceab5e41fae4c0392cdb6011825cc933e5;hb=f5567ea88d172767b34373bc6e402da8bfd85ef8;hp=1df1d32f9b76c82b422770245e76a83ad3f0ad48;hpb=d2d857a8c492de2cde82d191a20c50b43842bdd7;p=babeltrace.git diff --git a/tests/bindings/python/bt2/test_graph.py b/tests/bindings/python/bt2/test_graph.py index 1df1d32f..acf283ce 100644 --- a/tests/bindings/python/bt2/test_graph.py +++ b/tests/bindings/python/bt2/test_graph.py @@ -1,50 +1,32 @@ +# SPDX-License-Identifier: GPL-2.0-only # # Copyright (C) 2019 EfficiOS Inc. # -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; only version 2 -# of the License. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. -# -from bt2 import value -import collections import unittest -import copy import bt2 class _MyIter(bt2._UserMessageIterator): - def __init__(self, self_output_port): + def __init__(self, config, self_output_port): self._build_meta() self._at = 0 def _build_meta(self): self._tc = self._component._create_trace_class() self._t = self._tc() - self._sc = self._tc.create_stream_class() - self._ec = self._sc.create_event_class(name='salut') + self._sc = self._tc.create_stream_class(supports_packets=True) + self._ec = self._sc.create_event_class(name="salut") self._my_int_ft = self._tc.create_signed_integer_field_class(32) payload_ft = self._tc.create_structure_field_class() - payload_ft += collections.OrderedDict([ - ('my_int', self._my_int_ft), - ]) + payload_ft += [("my_int", self._my_int_ft)] self._ec.payload_field_type = payload_ft self._stream = self._t.create_stream(self._sc) self._packet = self._stream.create_packet() def _create_event(self, value): ev = self._ec() - ev.payload_field['my_int'] = value + ev.payload_field["my_int"] = value ev.packet = self._packet return ev @@ -56,187 +38,287 @@ class GraphTestCase(unittest.TestCase): def tearDown(self): del self._graph - def test_create_empty(self): - graph = bt2.Graph() + def test_create_default(self): + bt2.Graph() + + def test_create_known_mip_version(self): + bt2.Graph(0) + + def test_create_invalid_mip_version_type(self): + with self.assertRaises(TypeError): + bt2.Graph("") + + def test_create_unknown_mip_version(self): + with self.assertRaisesRegex(ValueError, "unknown MIP version"): + bt2.Graph(1) + + def test_default_interrupter(self): + interrupter = self._graph.default_interrupter + self.assertIs(type(interrupter), bt2.Interrupter) def test_add_component_user_cls(self): class MySink(bt2._UserSinkComponent): - def _consume(self): + def _user_consume(self): pass - comp = self._graph.add_component(MySink, 'salut') - self.assertEqual(comp.name, 'salut') + comp = self._graph.add_component(MySink, "salut") + self.assertEqual(comp.name, "salut") def test_add_component_gen_cls(self): class MySink(bt2._UserSinkComponent): - def _consume(self): + def _user_consume(self): pass - comp = self._graph.add_component(MySink, 'salut') + comp = self._graph.add_component(MySink, "salut") assert comp - comp2 = self._graph.add_component(comp.cls, 'salut2') - self.assertEqual(comp2.name, 'salut2') + comp2 = self._graph.add_component(comp.cls, "salut2") + self.assertEqual(comp2.name, "salut2") def test_add_component_params(self): comp_params = None class MySink(bt2._UserSinkComponent): - def __init__(self, params): + def __init__(self, config, params, obj): nonlocal comp_params comp_params = params - def _consume(self): + def _user_consume(self): pass - params = {'hello': 23, 'path': '/path/to/stuff'} - comp = self._graph.add_component(MySink, 'salut', params) + params = {"hello": 23, "path": "/path/to/stuff"} + self._graph.add_component(MySink, "salut", params) self.assertEqual(params, comp_params) del comp_params + def test_add_component_obj_python_comp_cls(self): + comp_obj = None + + class MySink(bt2._UserSinkComponent): + def __init__(self, config, params, obj): + nonlocal comp_obj + comp_obj = obj + + def _user_consume(self): + pass + + obj = object() + self._graph.add_component(MySink, "salut", obj=obj) + self.assertIs(comp_obj, obj) + del comp_obj + + def test_add_component_obj_none_python_comp_cls(self): + comp_obj = None + + class MySink(bt2._UserSinkComponent): + def __init__(self, config, params, obj): + nonlocal comp_obj + comp_obj = obj + + def _user_consume(self): + pass + + self._graph.add_component(MySink, "salut") + self.assertIsNone(comp_obj) + del comp_obj + + def test_add_component_obj_non_python_comp_cls(self): + plugin = bt2.find_plugin("text", find_in_user_dir=False, find_in_sys_dir=False) + assert plugin is not None + cc = plugin.source_component_classes["dmesg"] + assert cc is not None + + with self.assertRaises(ValueError): + self._graph.add_component(cc, "salut", obj=57) + def test_add_component_invalid_cls_type(self): with self.assertRaises(TypeError): - self._graph.add_component(int, 'salut') + self._graph.add_component(int, "salut") def test_add_component_invalid_logging_level_type(self): class MySink(bt2._UserSinkComponent): - def _consume(self): + def _user_consume(self): pass with self.assertRaises(TypeError): - self._graph.add_component(MySink, 'salut', logging_level='yo') + self._graph.add_component(MySink, "salut", logging_level="yo") def test_add_component_invalid_logging_level_value(self): class MySink(bt2._UserSinkComponent): - def _consume(self): + def _user_consume(self): pass with self.assertRaises(ValueError): - self._graph.add_component(MySink, 'salut', logging_level=12345) + self._graph.add_component(MySink, "salut", logging_level=12345) + + def test_add_component_invalid_params_type(self): + class MySink(bt2._UserSinkComponent): + def _user_consume(self): + pass + + with self.assertRaises(TypeError): + self._graph.add_component(MySink, "salut", params=12) + + def test_add_component_params_dict(self): + params_obj = None + + class MySink(bt2._UserSinkComponent): + def __init__(self, config, params, obj): + nonlocal params_obj + params_obj = params + + def _user_consume(self): + pass + + params = {"plage": 12312} + self._graph.add_component(MySink, "salut", params=params) + + # Check equality and not identity because `add_component()` method + # converts the Python `dict` to a `bt2.MapValue`. + self.assertEqual(params, params_obj) + + def test_add_component_params_mapvalue(self): + params_obj = None + + class MySink(bt2._UserSinkComponent): + def __init__(self, config, params, obj): + nonlocal params_obj + params_obj = params + + def _user_consume(self): + pass + + params = bt2.MapValue({"beachclub": "121"}) + self._graph.add_component(MySink, "salut", params=params) + + self.assertEqual(params, params_obj) def test_add_component_logging_level(self): class MySink(bt2._UserSinkComponent): - def _consume(self): + def _user_consume(self): pass - comp = self._graph.add_component(MySink, 'salut', - logging_level=bt2.LoggingLevel.DEBUG) + comp = self._graph.add_component( + MySink, "salut", logging_level=bt2.LoggingLevel.DEBUG + ) self.assertEqual(comp.logging_level, bt2.LoggingLevel.DEBUG) def test_connect_ports(self): - class MyIter(bt2._UserMessageIterator): - def __next__(self): - raise bt2.Stop - - class MySource(bt2._UserSourceComponent, - message_iterator_class=MyIter): - def __init__(self, params): - self._add_output_port('out') + class MySource( + bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator + ): + def __init__(self, config, params, obj): + self._add_output_port("out") class MySink(bt2._UserSinkComponent): - def __init__(self, params): - self._add_input_port('in') + def __init__(self, config, params, obj): + self._add_input_port("in") - def _consume(self): + def _user_consume(self): raise bt2.Stop - src = self._graph.add_component(MySource, 'src') - sink = self._graph.add_component(MySink, 'sink') + src = self._graph.add_component(MySource, "src") + sink = self._graph.add_component(MySink, "sink") - conn = self._graph.connect_ports(src.output_ports['out'], - sink.input_ports['in']) - self.assertTrue(src.output_ports['out'].is_connected) - self.assertTrue(sink.input_ports['in'].is_connected) - self.assertEqual(src.output_ports['out'].connection._ptr, conn._ptr) - self.assertEqual(sink.input_ports['in'].connection._ptr, conn._ptr) + conn = self._graph.connect_ports( + src.output_ports["out"], sink.input_ports["in"] + ) + self.assertTrue(src.output_ports["out"].is_connected) + self.assertTrue(sink.input_ports["in"].is_connected) + self.assertEqual(src.output_ports["out"].connection.addr, conn.addr) + self.assertEqual(sink.input_ports["in"].connection.addr, conn.addr) def test_connect_ports_invalid_direction(self): - class MyIter(bt2._UserMessageIterator): - def __next__(self): - raise bt2.Stop - - class MySource(bt2._UserSourceComponent, - message_iterator_class=MyIter): - def __init__(self, params): - self._add_output_port('out') + class MySource( + bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator + ): + def __init__(self, config, params, obj): + self._add_output_port("out") class MySink(bt2._UserSinkComponent): - def __init__(self, params): - self._add_input_port('in') + def __init__(self, config, params, obj): + self._add_input_port("in") - def _consume(self): + def _user_consume(self): raise bt2.Stop - src = self._graph.add_component(MySource, 'src') - sink = self._graph.add_component(MySink, 'sink') + src = self._graph.add_component(MySource, "src") + sink = self._graph.add_component(MySink, "sink") with self.assertRaises(TypeError): - conn = self._graph.connect_ports(sink.input_ports['in'], - src.output_ports['out']) + self._graph.connect_ports(sink.input_ports["in"], src.output_ports["out"]) - def test_connect_ports_refused(self): + def test_add_interrupter(self): class MyIter(bt2._UserMessageIterator): def __next__(self): - raise bt2.Stop + raise TypeError - class MySource(bt2._UserSourceComponent, - message_iterator_class=MyIter): - def __init__(self, params): - self._add_output_port('out') + class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter): + def __init__(self, config, params, obj): + self._add_output_port("out") class MySink(bt2._UserSinkComponent): - def __init__(self, params): - self._add_input_port('in') + def __init__(self, config, params, obj): + self._add_input_port("in") - def _consume(self): - raise bt2.Stop + def _user_consume(self): + next(self._msg_iter) - def _accept_port_connection(self, port, other_port): - return False + def _user_graph_is_configured(self): + self._msg_iter = self._create_message_iterator(self._input_ports["in"]) - src = self._graph.add_component(MySource, 'src') - sink = self._graph.add_component(MySink, 'sink') + # add two interrupters, set one of them + interrupter1 = bt2.Interrupter() + interrupter2 = bt2.Interrupter() + self._graph.add_interrupter(interrupter1) + src = self._graph.add_component(MySource, "src") + sink = self._graph.add_component(MySink, "sink") + self._graph.connect_ports(src.output_ports["out"], sink.input_ports["in"]) + self._graph.add_interrupter(interrupter2) - with self.assertRaises(bt2.PortConnectionRefused): - conn = self._graph.connect_ports(src.output_ports['out'], - sink.input_ports['in']) + with self.assertRaises(bt2._Error): + self._graph.run() - def test_cancel(self): - self.assertFalse(self._graph.is_canceled) - self._graph.cancel() - self.assertTrue(self._graph.is_canceled) + interrupter2.set() - # Test that Graph.run() raises bt2.GraphCanceled if the graph gets canceled - # during execution. - def test_cancel_while_running(self): + with self.assertRaises(bt2.TryAgain): + self._graph.run() + + interrupter2.reset() + + with self.assertRaises(bt2._Error): + self._graph.run() + + # Test that Graph.run() raises bt2.Interrupted if the graph gets + # interrupted during execution. + def test_interrupt_while_running(self): class MyIter(_MyIter): def __next__(self): return self._create_stream_beginning_message(self._stream) - class MySource(bt2._UserSourceComponent, - message_iterator_class=MyIter): - def __init__(self, params): - self._add_output_port('out') + class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter): + def __init__(self, config, params, obj): + self._add_output_port("out") class MySink(bt2._UserSinkComponent): - def __init__(self, params): - self._add_input_port('in') + def __init__(self, config, params, obj): + self._add_input_port("in") - def _consume(self): - # Pretend that somebody asynchronously cancelled the graph. + def _user_consume(self): + # Pretend that somebody asynchronously interrupted the graph. nonlocal graph - graph.cancel() - + graph.default_interrupter.set() return next(self._msg_iter) - def _graph_is_configured(self): - self._msg_iter = self._input_ports['in'].create_message_iterator() + def _user_graph_is_configured(self): + self._msg_iter = self._create_message_iterator(self._input_ports["in"]) - graph = bt2.Graph() - up = graph.add_component(MySource, 'down') - down = graph.add_component(MySink, 'up') - graph.connect_ports(up.output_ports['out'], down.input_ports['in']) - with self.assertRaises(bt2.GraphCanceled): - graph.run() + graph = self._graph + up = self._graph.add_component(MySource, "down") + down = self._graph.add_component(MySink, "up") + self._graph.connect_ports(up.output_ports["out"], down.input_ports["in"]) + + with self.assertRaises(bt2.TryAgain): + self._graph.run() def test_run(self): class MyIter(_MyIter): @@ -258,42 +340,89 @@ class GraphTestCase(unittest.TestCase): self._at += 1 return msg - class MySource(bt2._UserSourceComponent, - message_iterator_class=MyIter): - def __init__(self, params): - self._add_output_port('out') + class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter): + def __init__(self, config, params, obj): + self._add_output_port("out") class MySink(bt2._UserSinkComponent): - def __init__(self, params): - self._input_port = self._add_input_port('in') + def __init__(self, config, params, obj): + self._input_port = self._add_input_port("in") self._at = 0 - def _consume(comp_self): + def _user_consume(comp_self): msg = next(comp_self._msg_iter) if comp_self._at == 0: - self.assertIsInstance(msg, bt2.message._StreamBeginningMessage) + self.assertIs(type(msg), bt2._StreamBeginningMessageConst) elif comp_self._at == 1: - self.assertIsInstance(msg, bt2.message._PacketBeginningMessage) + self.assertIs(type(msg), bt2._PacketBeginningMessageConst) elif comp_self._at >= 2 and comp_self._at <= 6: - self.assertIsInstance(msg, bt2.message._EventMessage) - self.assertEqual(msg.event.cls.name, 'salut') + self.assertIs(type(msg), bt2._EventMessageConst) + self.assertEqual(msg.event.cls.name, "salut") elif comp_self._at == 7: - self.assertIsInstance(msg, bt2.message._PacketEndMessage) + self.assertIs(type(msg), bt2._PacketEndMessageConst) elif comp_self._at == 8: - self.assertIsInstance(msg, bt2.message._StreamEndMessage) + self.assertIs(type(msg), bt2._StreamEndMessageConst) comp_self._at += 1 - def _graph_is_configured(self): - self._msg_iter = self._input_port.create_message_iterator() + def _user_graph_is_configured(self): + self._msg_iter = self._create_message_iterator(self._input_port) - src = self._graph.add_component(MySource, 'src') - sink = self._graph.add_component(MySink, 'sink') - conn = self._graph.connect_ports(src.output_ports['out'], - sink.input_ports['in']) + src = self._graph.add_component(MySource, "src") + sink = self._graph.add_component(MySink, "sink") + self._graph.connect_ports(src.output_ports["out"], sink.input_ports["in"]) self._graph.run() + def test_run_once(self): + class MyIter(_MyIter): + pass + + class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter): + def __init__(self, config, params, obj): + self._add_output_port("out") + + class MySink(bt2._UserSinkComponent): + def __init__(self, config, params, obj): + self._input_port = self._add_input_port("in") + + def _user_consume(comp_self): + nonlocal run_count + run_count += 1 + raise bt2.TryAgain + + run_count = 0 + src = self._graph.add_component(MySource, "src") + sink = self._graph.add_component(MySink, "sink") + self._graph.connect_ports(src.output_ports["out"], sink.input_ports["in"]) + + with self.assertRaises(bt2.TryAgain): + self._graph.run_once() + + self.assertEqual(run_count, 1) + + def test_run_once_stops(self): + class MyIter(_MyIter): + pass + + class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter): + def __init__(self, config, params, obj): + self._add_output_port("out") + + class MySink(bt2._UserSinkComponent): + def __init__(self, config, params, obj): + self._input_port = self._add_input_port("in") + + def _user_consume(comp_self): + raise bt2.Stop + + src = self._graph.add_component(MySource, "src") + sink = self._graph.add_component(MySink, "sink") + self._graph.connect_ports(src.output_ports["out"], sink.input_ports["in"]) + + with self.assertRaises(bt2.Stop): + self._graph.run_once() + def test_run_again(self): class MyIter(_MyIter): def __next__(self): @@ -310,37 +439,35 @@ class GraphTestCase(unittest.TestCase): self._at += 1 return msg - class MySource(bt2._UserSourceComponent, - message_iterator_class=MyIter): - def __init__(self, params): - self._add_output_port('out') + class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter): + def __init__(self, config, params, obj): + self._add_output_port("out") class MySink(bt2._UserSinkComponent): - def __init__(self, params): - self._input_port = self._add_input_port('in') + def __init__(self, config, params, obj): + self._input_port = self._add_input_port("in") self._at = 0 - def _consume(comp_self): + def _user_consume(comp_self): msg = next(comp_self._msg_iter) if comp_self._at == 0: - self.assertIsInstance(msg, bt2.message._StreamBeginningMessage) + self.assertIs(type(msg), bt2._StreamBeginningMessageConst) elif comp_self._at == 1: - self.assertIsInstance(msg, bt2.message._PacketBeginningMessage) + self.assertIs(type(msg), bt2._PacketBeginningMessageConst) elif comp_self._at == 2: - self.assertIsInstance(msg, bt2.message._EventMessage) + self.assertIs(type(msg), bt2._EventMessageConst) raise bt2.TryAgain else: pass comp_self._at += 1 - def _graph_is_configured(self): - self._msg_iter = self._input_port.create_message_iterator() + def _user_graph_is_configured(self): + self._msg_iter = self._create_message_iterator(self._input_port) - src = self._graph.add_component(MySource, 'src') - sink = self._graph.add_component(MySink, 'sink') - conn = self._graph.connect_ports(src.output_ports['out'], - sink.input_ports['in']) + src = self._graph.add_component(MySource, "src") + sink = self._graph.add_component(MySink, "sink") + self._graph.connect_ports(src.output_ports["out"], sink.input_ports["in"]) with self.assertRaises(bt2.TryAgain): self._graph.run() @@ -366,187 +493,137 @@ class GraphTestCase(unittest.TestCase): self._at += 1 return msg - class MySource(bt2._UserSourceComponent, - message_iterator_class=MyIter): - def __init__(self, params): - self._add_output_port('out') + class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter): + def __init__(self, config, params, obj): + self._add_output_port("out") class MySink(bt2._UserSinkComponent): - def __init__(self, params): - self._input_port = self._add_input_port('in') + def __init__(self, config, params, obj): + self._input_port = self._add_input_port("in") self._at = 0 - def _consume(comp_self): + def _user_consume(comp_self): msg = next(comp_self._msg_iter) if comp_self._at == 0: - self.assertIsInstance(msg, bt2.message._StreamBeginningMessage) + self.assertIs(type(msg), bt2._StreamBeginningMessageConst) elif comp_self._at == 1: - self.assertIsInstance(msg, bt2.message._PacketBeginningMessage) + self.assertIs(type(msg), bt2._PacketBeginningMessageConst) elif comp_self._at == 2: - self.assertIsInstance(msg, bt2.message._EventMessage) + self.assertIs(type(msg), bt2._EventMessageConst) elif comp_self._at == 3: nonlocal raised_in_sink raised_in_sink = True - raise RuntimeError('error!') + raise RuntimeError("error!") comp_self._at += 1 - def _graph_is_configured(self): - self._msg_iter = self._input_port.create_message_iterator() + def _user_graph_is_configured(self): + self._msg_iter = self._create_message_iterator(self._input_port) - src = self._graph.add_component(MySource, 'src') - sink = self._graph.add_component(MySink, 'sink') - conn = self._graph.connect_ports(src.output_ports['out'], - sink.input_ports['in']) + src = self._graph.add_component(MySource, "src") + sink = self._graph.add_component(MySink, "sink") + self._graph.connect_ports(src.output_ports["out"], sink.input_ports["in"]) - with self.assertRaises(bt2.Error): + with self.assertRaises(bt2._Error): self._graph.run() def test_listeners(self): - class MyIter(bt2._UserMessageIterator): - def __next__(self): - raise bt2.Stop - - class MySource(bt2._UserSourceComponent, - message_iterator_class=MyIter): - def __init__(self, params): - self._add_output_port('out') - self._add_output_port('zero') + class MySource( + bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator + ): + def __init__(self, config, params, obj): + self._add_output_port("out") + self._add_output_port("zero") class MySink(bt2._UserSinkComponent): - def __init__(self, params): - self._add_input_port('in') + def __init__(self, config, params, obj): + self._add_input_port("in") - def _consume(self): + def _user_consume(self): raise bt2.Stop - def _port_connected(self, port, other_port): - self._add_input_port('taste') + def _user_port_connected(self, port, other_port): + self._add_input_port("taste") def port_added_listener(component, port): nonlocal calls calls.append((port_added_listener, component, port)) - def ports_connected_listener(upstream_component, upstream_port, - downstream_component, downstream_port): - nonlocal calls - calls.append((ports_connected_listener, - upstream_component, upstream_port, - downstream_component, downstream_port)) - calls = [] self._graph.add_port_added_listener(port_added_listener) - self._graph.add_ports_connected_listener(ports_connected_listener) - src = self._graph.add_component(MySource, 'src') - sink = self._graph.add_component(MySink, 'sink') - self._graph.connect_ports(src.output_ports['out'], - sink.input_ports['in']) + src = self._graph.add_component(MySource, "src") + sink = self._graph.add_component(MySink, "sink") + self._graph.connect_ports(src.output_ports["out"], sink.input_ports["in"]) - self.assertEqual(len(calls), 5) + self.assertEqual(len(calls), 4) self.assertIs(calls[0][0], port_added_listener) - self.assertEqual(calls[0][1].name, 'src') - self.assertEqual(calls[0][2].name, 'out') + self.assertEqual(calls[0][1].name, "src") + self.assertEqual(calls[0][2].name, "out") self.assertIs(calls[1][0], port_added_listener) - self.assertEqual(calls[1][1].name, 'src') - self.assertEqual(calls[1][2].name, 'zero') + self.assertEqual(calls[1][1].name, "src") + self.assertEqual(calls[1][2].name, "zero") self.assertIs(calls[2][0], port_added_listener) - self.assertEqual(calls[2][1].name, 'sink') - self.assertEqual(calls[2][2].name, 'in') + self.assertEqual(calls[2][1].name, "sink") + self.assertEqual(calls[2][2].name, "in") self.assertIs(calls[3][0], port_added_listener) - self.assertEqual(calls[3][1].name, 'sink') - self.assertEqual(calls[3][2].name, 'taste') - - self.assertIs(calls[4][0], ports_connected_listener) - self.assertEqual(calls[4][1].name, 'src') - self.assertEqual(calls[4][2].name, 'out') - self.assertEqual(calls[4][3].name, 'sink') - self.assertEqual(calls[4][4].name, 'in') + self.assertEqual(calls[3][1].name, "sink") + self.assertEqual(calls[3][2].name, "taste") def test_invalid_listeners(self): - class MyIter(bt2._UserMessageIterator): - def __next__(self): - raise bt2.Stop - - class MySource(bt2._UserSourceComponent, - message_iterator_class=MyIter): - def __init__(self, params): - self._add_output_port('out') - self._add_output_port('zero') + class MySource( + bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator + ): + def __init__(self, config, params, obj): + self._add_output_port("out") + self._add_output_port("zero") class MySink(bt2._UserSinkComponent): - def __init__(self, params): - self._add_input_port('in') + def __init__(self, config, params, obj): + self._add_input_port("in") - def _consume(self): + def _user_consume(self): raise bt2.Stop - def _port_connected(self, port, other_port): - self._add_input_port('taste') + def _user_port_connected(self, port, other_port): + self._add_input_port("taste") with self.assertRaises(TypeError): self._graph.add_port_added_listener(1234) - with self.assertRaises(TypeError): - self._graph.add_ports_connected_listener(1234) def test_raise_in_component_init(self): class MySink(bt2._UserSinkComponent): - def __init__(self, params): - raise ValueError('oops!') + def __init__(self, config, params, obj): + raise ValueError("oops!") - def _consume(self): + def _user_consume(self): raise bt2.Stop graph = bt2.Graph() - with self.assertRaises(bt2.Error): - graph.add_component(MySink, 'comp') + with self.assertRaises(bt2._Error): + graph.add_component(MySink, "comp") def test_raise_in_port_added_listener(self): class MySink(bt2._UserSinkComponent): - def __init__(self, params): - self._add_input_port('in') + def __init__(self, config, params, obj): + self._add_input_port("in") - def _consume(self): + def _user_consume(self): raise bt2.Stop def port_added_listener(component, port): - raise ValueError('oh noes!') + raise ValueError("oh noes!") graph = bt2.Graph() graph.add_port_added_listener(port_added_listener) - with self.assertRaises(bt2.Error): - graph.add_component(MySink, 'comp') - - def test_raise_in_ports_connected_listener(self): - class MyIter(bt2._UserMessageIterator): - def __next__(self): - raise bt2.Stop - - class MySource(bt2._UserSourceComponent, - message_iterator_class=MyIter): - def __init__(self, params): - self._add_output_port('out') + with self.assertRaises(bt2._Error): + graph.add_component(MySink, "comp") - class MySink(bt2._UserSinkComponent): - def __init__(self, params): - self._add_input_port('in') - - def _consume(self): - raise bt2.Stop - - def ports_connected_listener(upstream_component, upstream_port, - downstream_component, downstream_port): - raise ValueError('oh noes!') - - graph = bt2.Graph() - graph.add_ports_connected_listener(ports_connected_listener) - up = graph.add_component(MySource, 'down') - down = graph.add_component(MySink, 'up') - with self.assertRaises(bt2.Error): - graph.connect_ports(up.output_ports['out'], down.input_ports['in']) +if __name__ == "__main__": + unittest.main()