-from bt2 import value
-import collections
+#
+# Copyright (C) 2019 EfficiOS Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; only version 2
+# of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+
import unittest
-import copy
import bt2
-@unittest.skip("this is broken")
+class _MyIter(bt2._UserMessageIterator):
+ def __init__(self, config, self_output_port):
+ self._build_meta()
+ self._at = 0
+
+ def _build_meta(self):
+ self._tc = self._component._create_trace_class()
+ self._t = self._tc()
+ self._sc = self._tc.create_stream_class(supports_packets=True)
+ self._ec = self._sc.create_event_class(name='salut')
+ self._my_int_ft = self._tc.create_signed_integer_field_class(32)
+ payload_ft = self._tc.create_structure_field_class()
+ payload_ft += [('my_int', self._my_int_ft)]
+ self._ec.payload_field_type = payload_ft
+ self._stream = self._t.create_stream(self._sc)
+ self._packet = self._stream.create_packet()
+
+ def _create_event(self, value):
+ ev = self._ec()
+ ev.payload_field['my_int'] = value
+ ev.packet = self._packet
+ return ev
+
+
class GraphTestCase(unittest.TestCase):
def setUp(self):
self._graph = bt2.Graph()
def tearDown(self):
del self._graph
- def test_create_empty(self):
- graph = bt2.Graph()
+ def test_create_default(self):
+ bt2.Graph()
+
+ def test_create_known_mip_version(self):
+ bt2.Graph(0)
+
+ def test_create_invalid_mip_version_type(self):
+ with self.assertRaises(TypeError):
+ bt2.Graph('')
+
+ def test_create_unknown_mip_version(self):
+ with self.assertRaisesRegex(ValueError, 'unknown MIP version'):
+ bt2.Graph(1)
def test_add_component_user_cls(self):
class MySink(bt2._UserSinkComponent):
- def _consume(self):
+ def _user_consume(self):
pass
comp = self._graph.add_component(MySink, 'salut')
def test_add_component_gen_cls(self):
class MySink(bt2._UserSinkComponent):
- def _consume(self):
+ def _user_consume(self):
pass
comp = self._graph.add_component(MySink, 'salut')
- assert(comp)
- comp2 = self._graph.add_component(comp.component_class, 'salut2')
+ assert comp
+ comp2 = self._graph.add_component(comp.cls, 'salut2')
self.assertEqual(comp2.name, 'salut2')
def test_add_component_params(self):
comp_params = None
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, config, params, obj):
nonlocal comp_params
comp_params = params
- def _consume(self):
+ def _user_consume(self):
pass
params = {'hello': 23, 'path': '/path/to/stuff'}
- comp = self._graph.add_component(MySink, 'salut', params)
+ self._graph.add_component(MySink, 'salut', params)
self.assertEqual(params, comp_params)
del comp_params
+ def test_add_component_obj_python_comp_cls(self):
+ comp_obj = None
+
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, config, params, obj):
+ nonlocal comp_obj
+ comp_obj = obj
+
+ def _user_consume(self):
+ pass
+
+ obj = object()
+ self._graph.add_component(MySink, 'salut', obj=obj)
+ self.assertIs(comp_obj, obj)
+ del comp_obj
+
+ def test_add_component_obj_none_python_comp_cls(self):
+ comp_obj = None
+
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, config, params, obj):
+ nonlocal comp_obj
+ comp_obj = obj
+
+ def _user_consume(self):
+ pass
+
+ self._graph.add_component(MySink, 'salut')
+ self.assertIsNone(comp_obj)
+ del comp_obj
+
+ def test_add_component_obj_non_python_comp_cls(self):
+ plugin = bt2.find_plugin('text', find_in_user_dir=False, find_in_sys_dir=False)
+ assert plugin is not None
+ cc = plugin.source_component_classes['dmesg']
+ assert cc is not None
+
+ with self.assertRaises(ValueError):
+ self._graph.add_component(cc, 'salut', obj=57)
+
def test_add_component_invalid_cls_type(self):
with self.assertRaises(TypeError):
self._graph.add_component(int, 'salut')
- def test_connect_ports(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
+ def test_add_component_invalid_logging_level_type(self):
+ class MySink(bt2._UserSinkComponent):
+ def _user_consume(self):
+ pass
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
- self._add_output_port('out')
+ with self.assertRaises(TypeError):
+ self._graph.add_component(MySink, 'salut', logging_level='yo')
+ def test_add_component_invalid_logging_level_value(self):
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
- self._add_input_port('in')
+ def _user_consume(self):
+ pass
- def _consume(self):
- raise bt2.Stop
+ with self.assertRaises(ValueError):
+ self._graph.add_component(MySink, 'salut', logging_level=12345)
- src = self._graph.add_component(MySource, 'src')
- sink = self._graph.add_component(MySink, 'sink')
- conn = self._graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
- self.assertTrue(src.output_ports['out'].is_connected)
- self.assertTrue(sink.input_ports['in'].is_connected)
- self.assertEqual(src.output_ports['out'].connection, conn)
- self.assertEqual(sink.input_ports['in'].connection, conn)
+ def test_add_component_invalid_params_type(self):
+ class MySink(bt2._UserSinkComponent):
+ def _user_consume(self):
+ pass
- def test_connect_ports_invalid_direction(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
+ with self.assertRaises(TypeError):
+ self._graph.add_component(MySink, 'salut', params=12)
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
- self._add_output_port('out')
+ def test_add_component_params_dict(self):
+ params_obj = None
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
- self._add_input_port('in')
+ def __init__(self, config, params, obj):
+ nonlocal params_obj
+ params_obj = params
- def _consume(self):
- raise bt2.Stop
+ def _user_consume(self):
+ pass
- src = self._graph.add_component(MySource, 'src')
- sink = self._graph.add_component(MySink, 'sink')
+ params = {'plage': 12312}
+ self._graph.add_component(MySink, 'salut', params=params)
- with self.assertRaises(TypeError):
- conn = self._graph.connect_ports(sink.input_ports['in'],
- src.output_ports['out'])
+ # Check equality and not identity because `add_component()` method
+ # converts the Python `dict` to a `bt2.MapValue`.
+ self.assertEqual(params, params_obj)
+
+ def test_add_component_params_mapvalue(self):
+ params_obj = None
+
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, config, params, obj):
+ nonlocal params_obj
+ params_obj = params
+
+ def _user_consume(self):
+ pass
- def test_connect_ports_refused(self):
+ params = bt2.MapValue({'beachclub': '121'})
+ self._graph.add_component(MySink, 'salut', params=params)
+
+ self.assertEqual(params, params_obj)
+
+ def test_add_component_logging_level(self):
+ class MySink(bt2._UserSinkComponent):
+ def _user_consume(self):
+ pass
+
+ comp = self._graph.add_component(
+ MySink, 'salut', logging_level=bt2.LoggingLevel.DEBUG
+ )
+ self.assertEqual(comp.logging_level, bt2.LoggingLevel.DEBUG)
+
+ def test_connect_ports(self):
class MyIter(bt2._UserMessageIterator):
def __next__(self):
raise bt2.Stop
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
+ class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ def __init__(self, config, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, config, params, obj):
self._add_input_port('in')
- def _consume(self):
+ def _user_consume(self):
raise bt2.Stop
- def _accept_port_connection(self, port, other_port):
- return False
-
src = self._graph.add_component(MySource, 'src')
sink = self._graph.add_component(MySink, 'sink')
- with self.assertRaises(bt2.PortConnectionRefused):
- conn = self._graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
+ conn = self._graph.connect_ports(
+ src.output_ports['out'], sink.input_ports['in']
+ )
+ self.assertTrue(src.output_ports['out'].is_connected)
+ self.assertTrue(sink.input_ports['in'].is_connected)
+ self.assertEqual(src.output_ports['out'].connection.addr, conn.addr)
+ self.assertEqual(sink.input_ports['in'].connection.addr, conn.addr)
- def test_connect_ports_canceled(self):
+ def test_connect_ports_invalid_direction(self):
class MyIter(bt2._UserMessageIterator):
def __next__(self):
raise bt2.Stop
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
+ class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ def __init__(self, config, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, config, params, obj):
self._add_input_port('in')
- def _consume(self):
+ def _user_consume(self):
raise bt2.Stop
src = self._graph.add_component(MySource, 'src')
sink = self._graph.add_component(MySink, 'sink')
- self._graph.cancel()
- with self.assertRaises(bt2.GraphCanceled):
- conn = self._graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
+ with self.assertRaises(TypeError):
+ self._graph.connect_ports(sink.input_ports['in'], src.output_ports['out'])
- def test_connect_ports_cannot_consume_accept(self):
+ def test_add_interrupter(self):
class MyIter(bt2._UserMessageIterator):
def __next__(self):
- raise bt2.Stop
+ raise TypeError
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
+ class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ def __init__(self, config, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, config, params, obj):
self._add_input_port('in')
- def _consume(self):
- raise bt2.Stop
-
- def _accept_port_connection(self, port, other_port):
- nonlocal exc
-
- try:
- self.graph.run()
- except Exception as e:
- exc = e
+ def _user_consume(self):
+ next(self._msg_iter)
- return True
+ def _user_graph_is_configured(self):
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_ports['in']
+ )
- exc = None
+ # add two interrupters, set one of them
+ interrupter1 = bt2.Interrupter()
+ interrupter2 = bt2.Interrupter()
+ self._graph.add_interrupter(interrupter1)
src = self._graph.add_component(MySource, 'src')
sink = self._graph.add_component(MySink, 'sink')
- self._graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
- self.assertIs(type(exc), bt2.CannotConsumeGraph)
+ self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
+ self._graph.add_interrupter(interrupter2)
- def test_connect_ports_cannot_consume_connected(self):
- class MyIter(bt2._UserMessageIterator):
+ with self.assertRaises(bt2._Error):
+ self._graph.run()
+
+ interrupter2.set()
+
+ with self.assertRaises(bt2.TryAgain):
+ self._graph.run()
+
+ interrupter2.reset()
+
+ with self.assertRaises(bt2._Error):
+ self._graph.run()
+
+ # Test that Graph.run() raises bt2.Interrupted if the graph gets
+ # interrupted during execution.
+ def test_interrupt_while_running(self):
+ class MyIter(_MyIter):
def __next__(self):
- raise bt2.Stop
+ return self._create_stream_beginning_message(self._stream)
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
+ class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ def __init__(self, config, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, config, params, obj):
self._add_input_port('in')
- def _consume(self):
- raise bt2.Stop
+ def _user_consume(self):
+ # Pretend that somebody asynchronously interrupted the graph.
+ nonlocal graph
+ graph.interrupt()
+ return next(self._msg_iter)
- def _port_connected(self, port, other_port):
- nonlocal exc
+ def _user_graph_is_configured(self):
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_ports['in']
+ )
- try:
- self.graph.run()
- except Exception as e:
- exc = e
-
- return True
-
- exc = None
- src = self._graph.add_component(MySource, 'src')
- sink = self._graph.add_component(MySink, 'sink')
- self._graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
- self._graph.run()
- self.assertIs(type(exc), bt2.CannotConsumeGraph)
+ graph = self._graph
+ up = self._graph.add_component(MySource, 'down')
+ down = self._graph.add_component(MySink, 'up')
+ self._graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
- def test_cancel(self):
- self.assertFalse(self._graph.is_canceled)
- self._graph.cancel()
- self.assertTrue(self._graph.is_canceled)
+ with self.assertRaises(bt2.TryAgain):
+ self._graph.run()
def test_run(self):
- class MyIter(bt2._UserMessageIterator):
- def __init__(self):
- self._build_meta()
- self._at = 0
-
- def _build_meta(self):
- self._trace = bt2.Trace()
- self._sc = bt2.StreamClass()
- self._ec = bt2.EventClass('salut')
- self._my_int_fc = bt2.IntegerFieldClass(32)
- self._ec.payload_field_class = bt2.StructureFieldClass()
- self._ec.payload_field_class += collections.OrderedDict([
- ('my_int', self._my_int_fc),
- ])
- self._sc.add_event_class(self._ec)
- self._trace.add_stream_class(self._sc)
- self._stream = self._sc()
- self._packet = self._stream.create_packet()
-
- def _create_event(self, value):
- ev = self._ec()
- ev.payload_field['my_int'] = value
- ev.packet = self._packet
- return ev
-
+ class MyIter(_MyIter):
def __next__(self):
- if self._at == 5:
- raise bt2.Stop
+ if self._at == 9:
+ raise StopIteration
+
+ if self._at == 0:
+ msg = self._create_stream_beginning_message(self._stream)
+ elif self._at == 1:
+ msg = self._create_packet_beginning_message(self._packet)
+ elif self._at == 7:
+ msg = self._create_packet_end_message(self._packet)
+ elif self._at == 8:
+ msg = self._create_stream_end_message(self._stream)
+ else:
+ msg = self._create_event_message(self._ec, self._packet)
- msg = bt2.EventMessage(self._create_event(self._at * 3))
self._at += 1
return msg
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
+ class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ def __init__(self, config, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
- self._add_input_port('in')
+ def __init__(self, config, params, obj):
+ self._input_port = self._add_input_port('in')
self._at = 0
- def _consume(comp_self):
+ def _user_consume(comp_self):
msg = next(comp_self._msg_iter)
if comp_self._at == 0:
- self.assertIsInstance(msg, bt2.StreamBeginningMessage)
+ self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
elif comp_self._at == 1:
- self.assertIsInstance(msg, bt2.PacketBeginningMessage)
+ self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
elif comp_self._at >= 2 and comp_self._at <= 6:
- self.assertIsInstance(msg, bt2.EventMessage)
- self.assertEqual(msg.event.event_class.name, 'salut')
- field = msg.event.payload_field['my_int']
- self.assertEqual(field, (comp_self._at - 2) * 3)
+ self.assertIs(type(msg), bt2._EventMessageConst)
+ self.assertEqual(msg.event.cls.name, 'salut')
elif comp_self._at == 7:
- self.assertIsInstance(msg, bt2.PacketEndMessage)
+ self.assertIs(type(msg), bt2._PacketEndMessageConst)
elif comp_self._at == 8:
- self.assertIsInstance(msg, bt2.StreamEndMessage)
+ self.assertIs(type(msg), bt2._StreamEndMessageConst)
comp_self._at += 1
- def _port_connected(self, port, other_port):
- self._msg_iter = port.connection.create_message_iterator()
+ def _user_graph_is_configured(self):
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_port
+ )
src = self._graph.add_component(MySource, 'src')
sink = self._graph.add_component(MySink, 'sink')
- conn = self._graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
+ self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
self._graph.run()
- def test_run_again(self):
- class MyIter(bt2._UserMessageIterator):
- def __init__(self):
- self._build_meta()
- self._at = 0
+ def test_run_once(self):
+ class MyIter(_MyIter):
+ pass
- def _build_meta(self):
- self._trace = bt2.Trace()
- self._sc = bt2.StreamClass()
- self._ec = bt2.EventClass('salut')
- self._my_int_fc = bt2.IntegerFieldClass(32)
- self._ec.payload_field_class = bt2.StructureFieldClass()
- self._ec.payload_field_class += collections.OrderedDict([
- ('my_int', self._my_int_fc),
- ])
- self._sc.add_event_class(self._ec)
- self._trace.add_stream_class(self._sc)
- self._stream = self._sc()
- self._packet = self._stream.create_packet()
-
- def _create_event(self, value):
- ev = self._ec()
- ev.payload_field['my_int'] = value
- ev.packet = self._packet
- return ev
+ class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ def __init__(self, config, params, obj):
+ self._add_output_port('out')
+
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, config, params, obj):
+ self._input_port = self._add_input_port('in')
+
+ def _user_consume(comp_self):
+ nonlocal run_count
+ run_count += 1
+ raise bt2.TryAgain
+
+ run_count = 0
+ src = self._graph.add_component(MySource, 'src')
+ sink = self._graph.add_component(MySink, 'sink')
+ self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
+
+ with self.assertRaises(bt2.TryAgain):
+ self._graph.run_once()
+
+ self.assertEqual(run_count, 1)
+
+ def test_run_once_stops(self):
+ class MyIter(_MyIter):
+ pass
+ class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ def __init__(self, config, params, obj):
+ self._add_output_port('out')
+
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, config, params, obj):
+ self._input_port = self._add_input_port('in')
+
+ def _user_consume(comp_self):
+ raise bt2.Stop
+
+ src = self._graph.add_component(MySource, 'src')
+ sink = self._graph.add_component(MySink, 'sink')
+ self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
+
+ with self.assertRaises(bt2.Stop):
+ self._graph.run_once()
+
+ def test_run_again(self):
+ class MyIter(_MyIter):
def __next__(self):
- if self._at == 1:
+ if self._at == 3:
raise bt2.TryAgain
- msg = bt2.EventMessage(self._create_event(self._at * 3))
+ if self._at == 0:
+ msg = self._create_stream_beginning_message(self._stream)
+ elif self._at == 1:
+ msg = self._create_packet_beginning_message(self._packet)
+ elif self._at == 2:
+ msg = self._create_event_message(self._ec, self._packet)
+
self._at += 1
return msg
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
+ class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ def __init__(self, config, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
- self._add_input_port('in')
+ def __init__(self, config, params, obj):
+ self._input_port = self._add_input_port('in')
self._at = 0
- def _consume(comp_self):
+ def _user_consume(comp_self):
+ msg = next(comp_self._msg_iter)
if comp_self._at == 0:
- msg = next(comp_self._msg_iter)
- self.assertIsInstance(msg, bt2.EventMessage)
+ self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
elif comp_self._at == 1:
- with self.assertRaises(bt2.TryAgain):
- msg = next(comp_self._msg_iter)
-
+ self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
+ elif comp_self._at == 2:
+ self.assertIs(type(msg), bt2._EventMessageConst)
raise bt2.TryAgain
+ else:
+ pass
comp_self._at += 1
- def _port_connected(self, port, other_port):
- types = [bt2.EventMessage]
- self._msg_iter = port.connection.create_message_iterator(types)
+ def _user_graph_is_configured(self):
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_port
+ )
src = self._graph.add_component(MySource, 'src')
sink = self._graph.add_component(MySink, 'sink')
- conn = self._graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
+ self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
with self.assertRaises(bt2.TryAgain):
self._graph.run()
def test_run_error(self):
- class MyIter(bt2._UserMessageIterator):
- def __init__(self):
- self._build_meta()
- self._at = 0
-
- def _build_meta(self):
- self._trace = bt2.Trace()
- self._sc = bt2.StreamClass()
- self._ec = bt2.EventClass('salut')
- self._my_int_fc = bt2.IntegerFieldClass(32)
- self._ec.payload_field_class = bt2.StructureFieldClass()
- self._ec.payload_field_class += collections.OrderedDict([
- ('my_int', self._my_int_fc),
- ])
- self._sc.add_event_class(self._ec)
- self._trace.add_stream_class(self._sc)
- self._stream = self._sc()
- self._packet = self._stream.create_packet()
-
- def _create_event(self, value):
- ev = self._ec()
- ev.payload_field['my_int'] = value
- ev.packet = self._packet
- return ev
+ raised_in_sink = False
+ class MyIter(_MyIter):
def __next__(self):
- if self._at == 1:
+ # If this gets called after the sink raised an exception, it is
+ # an error.
+ nonlocal raised_in_sink
+ assert raised_in_sink is False
+
+ if self._at == 0:
+ msg = self._create_stream_beginning_message(self._stream)
+ elif self._at == 1:
+ msg = self._create_packet_beginning_message(self._packet)
+ elif self._at == 2 or self._at == 3:
+ msg = self._create_event_message(self._ec, self._packet)
+ else:
raise bt2.TryAgain
-
- msg = bt2.EventMessage(self._create_event(self._at * 3))
self._at += 1
return msg
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
+ class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ def __init__(self, config, params, obj):
self._add_output_port('out')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
- self._add_input_port('in')
+ def __init__(self, config, params, obj):
+ self._input_port = self._add_input_port('in')
self._at = 0
- def _consume(comp_self):
+ def _user_consume(comp_self):
+ msg = next(comp_self._msg_iter)
if comp_self._at == 0:
- msg = next(comp_self._msg_iter)
- self.assertIsInstance(msg, bt2.EventMessage)
+ self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
elif comp_self._at == 1:
+ self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
+ elif comp_self._at == 2:
+ self.assertIs(type(msg), bt2._EventMessageConst)
+ elif comp_self._at == 3:
+ nonlocal raised_in_sink
+ raised_in_sink = True
raise RuntimeError('error!')
comp_self._at += 1
- def _port_connected(self, port, other_port):
- types = [bt2.EventMessage]
- self._msg_iter = port.connection.create_message_iterator(types)
+ def _user_graph_is_configured(self):
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_port
+ )
src = self._graph.add_component(MySource, 'src')
sink = self._graph.add_component(MySink, 'sink')
- conn = self._graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
+ self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
- with self.assertRaises(bt2.Error):
+ with self.assertRaises(bt2._Error):
self._graph.run()
- def test_run_cannot_consume(self):
+ def test_listeners(self):
class MyIter(bt2._UserMessageIterator):
- pass
+ def __next__(self):
+ raise bt2.Stop
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
+ class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ def __init__(self, config, params, obj):
self._add_output_port('out')
+ self._add_output_port('zero')
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, config, params, obj):
self._add_input_port('in')
- self._at = 0
- def _consume(comp_self):
- nonlocal exc
+ def _user_consume(self):
+ raise bt2.Stop
- try:
- print('going in')
- comp_self.graph.run()
- print('going out')
- except Exception as e:
- exc = e
+ def _user_port_connected(self, port, other_port):
+ self._add_input_port('taste')
- raise bt2.Stop
+ def port_added_listener(component, port):
+ nonlocal calls
+ calls.append((port_added_listener, component, port))
- exc = None
+ calls = []
+ self._graph.add_port_added_listener(port_added_listener)
src = self._graph.add_component(MySource, 'src')
sink = self._graph.add_component(MySink, 'sink')
- conn = self._graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
- self._graph.run()
- self.assertIs(type(exc), bt2.CannotConsumeGraph)
+ self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
- def test_listeners(self):
+ self.assertEqual(len(calls), 4)
+
+ self.assertIs(calls[0][0], port_added_listener)
+ self.assertEqual(calls[0][1].name, 'src')
+ self.assertEqual(calls[0][2].name, 'out')
+
+ self.assertIs(calls[1][0], port_added_listener)
+ self.assertEqual(calls[1][1].name, 'src')
+ self.assertEqual(calls[1][2].name, 'zero')
+
+ self.assertIs(calls[2][0], port_added_listener)
+ self.assertEqual(calls[2][1].name, 'sink')
+ self.assertEqual(calls[2][2].name, 'in')
+
+ self.assertIs(calls[3][0], port_added_listener)
+ self.assertEqual(calls[3][1].name, 'sink')
+ self.assertEqual(calls[3][2].name, 'taste')
+
+ def test_invalid_listeners(self):
class MyIter(bt2._UserMessageIterator):
def __next__(self):
raise bt2.Stop
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
+ class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ def __init__(self, config, params, obj):
self._add_output_port('out')
self._add_output_port('zero')
- def _port_connected(self, port, other_port):
- self._output_ports['zero'].remove_from_component()
-
class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
+ def __init__(self, config, params, obj):
self._add_input_port('in')
- def _consume(self):
+ def _user_consume(self):
raise bt2.Stop
- def _port_connected(self, port, other_port):
+ def _user_port_connected(self, port, other_port):
self._add_input_port('taste')
- def _port_disconnected(self, port):
- port.remove_from_component()
+ with self.assertRaises(TypeError):
+ self._graph.add_port_added_listener(1234)
- def port_added_listener(port):
- nonlocal calls
- calls.append((port_added_listener, port))
+ def test_raise_in_component_init(self):
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, config, params, obj):
+ raise ValueError('oops!')
- def port_removed_listener(port):
- nonlocal calls
- calls.append((port_removed_listener, port))
+ def _user_consume(self):
+ raise bt2.Stop
- def ports_connected_listener(upstream_port, downstream_port):
- nonlocal calls
- calls.append((ports_connected_listener, upstream_port,
- downstream_port))
+ graph = bt2.Graph()
- def ports_disconnected_listener(upstream_comp, downstream_comp,
- upstream_port, downstream_port):
- nonlocal calls
- calls.append((ports_disconnected_listener, upstream_comp,
- downstream_comp, upstream_port, downstream_port))
+ with self.assertRaises(bt2._Error):
+ graph.add_component(MySink, 'comp')
- calls = []
- self._graph.add_listener(bt2.GraphListenerType.PORT_ADDED,
- port_added_listener)
- self._graph.add_listener(bt2.GraphListenerType.PORT_REMOVED,
- port_removed_listener)
- self._graph.add_listener(bt2.GraphListenerType.PORTS_CONNECTED,
- ports_connected_listener)
- self._graph.add_listener(bt2.GraphListenerType.PORTS_DISCONNECTED,
- ports_disconnected_listener)
- src = self._graph.add_component(MySource, 'src')
- sink = self._graph.add_component(MySink, 'sink')
- self._graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
- sink.input_ports['in'].disconnect()
- self.assertIs(calls[0][0], port_added_listener)
- self.assertEqual(calls[0][1].name, 'out')
- self.assertIs(calls[1][0], port_added_listener)
- self.assertEqual(calls[1][1].name, 'zero')
- self.assertIs(calls[2][0], port_added_listener)
- self.assertEqual(calls[2][1].name, 'in')
- self.assertIs(calls[3][0], port_removed_listener)
- self.assertEqual(calls[3][1].name, 'zero')
- self.assertIs(calls[4][0], port_added_listener)
- self.assertEqual(calls[4][1].name, 'taste')
- self.assertIs(calls[5][0], ports_connected_listener)
- self.assertEqual(calls[5][1].name, 'out')
- self.assertEqual(calls[5][2].name, 'in')
- self.assertIs(calls[6][0], port_removed_listener)
- self.assertEqual(calls[6][1].name, 'in')
- self.assertIs(calls[7][0], ports_disconnected_listener)
- self.assertEqual(calls[7][1].name, 'src')
- self.assertEqual(calls[7][2].name, 'sink')
- self.assertEqual(calls[7][3].name, 'out')
- self.assertEqual(calls[7][4].name, 'in')
- del calls
+ def test_raise_in_port_added_listener(self):
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, config, params, obj):
+ self._add_input_port('in')
+
+ def _user_consume(self):
+ raise bt2.Stop
+
+ def port_added_listener(component, port):
+ raise ValueError('oh noes!')
+
+ graph = bt2.Graph()
+ graph.add_port_added_listener(port_added_listener)
+
+ with self.assertRaises(bt2._Error):
+ graph.add_component(MySink, 'comp')
+
+
+if __name__ == '__main__':
+ unittest.main()