self._graph.add_component(int, 'salut')
def test_connect_ports(self):
- class MyIter(bt2._UserNotificationIterator):
+ class MyIter(bt2._UserMessageIterator):
def __next__(self):
raise bt2.Stop
class MySource(bt2._UserSourceComponent,
- notification_iterator_class=MyIter):
+ message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')
self.assertEqual(sink.input_ports['in'].connection, conn)
def test_connect_ports_invalid_direction(self):
- class MyIter(bt2._UserNotificationIterator):
+ class MyIter(bt2._UserMessageIterator):
def __next__(self):
raise bt2.Stop
class MySource(bt2._UserSourceComponent,
- notification_iterator_class=MyIter):
+ message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')
src.output_ports['out'])
def test_connect_ports_refused(self):
- class MyIter(bt2._UserNotificationIterator):
+ class MyIter(bt2._UserMessageIterator):
def __next__(self):
raise bt2.Stop
class MySource(bt2._UserSourceComponent,
- notification_iterator_class=MyIter):
+ message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')
sink.input_ports['in'])
def test_connect_ports_canceled(self):
- class MyIter(bt2._UserNotificationIterator):
+ class MyIter(bt2._UserMessageIterator):
def __next__(self):
raise bt2.Stop
class MySource(bt2._UserSourceComponent,
- notification_iterator_class=MyIter):
+ message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')
sink.input_ports['in'])
def test_connect_ports_cannot_consume_accept(self):
- class MyIter(bt2._UserNotificationIterator):
+ class MyIter(bt2._UserMessageIterator):
def __next__(self):
raise bt2.Stop
class MySource(bt2._UserSourceComponent,
- notification_iterator_class=MyIter):
+ message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')
self.assertIs(type(exc), bt2.CannotConsumeGraph)
def test_connect_ports_cannot_consume_connected(self):
- class MyIter(bt2._UserNotificationIterator):
+ class MyIter(bt2._UserMessageIterator):
def __next__(self):
raise bt2.Stop
class MySource(bt2._UserSourceComponent,
- notification_iterator_class=MyIter):
+ message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')
self.assertTrue(self._graph.is_canceled)
def test_run(self):
- class MyIter(bt2._UserNotificationIterator):
+ class MyIter(bt2._UserMessageIterator):
def __init__(self):
self._build_meta()
self._at = 0
if self._at == 5:
raise bt2.Stop
- notif = bt2.EventNotification(self._create_event(self._at * 3))
+ msg = bt2.EventMessage(self._create_event(self._at * 3))
self._at += 1
- return notif
+ return msg
class MySource(bt2._UserSourceComponent,
- notification_iterator_class=MyIter):
+ message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')
self._at = 0
def _consume(comp_self):
- notif = next(comp_self._notif_iter)
+ msg = next(comp_self._msg_iter)
if comp_self._at == 0:
- self.assertIsInstance(notif, bt2.StreamBeginningNotification)
+ self.assertIsInstance(msg, bt2.StreamBeginningMessage)
elif comp_self._at == 1:
- self.assertIsInstance(notif, bt2.PacketBeginningNotification)
+ self.assertIsInstance(msg, bt2.PacketBeginningMessage)
elif comp_self._at >= 2 and comp_self._at <= 6:
- self.assertIsInstance(notif, bt2.EventNotification)
- self.assertEqual(notif.event.event_class.name, 'salut')
- field = notif.event.payload_field['my_int']
+ self.assertIsInstance(msg, bt2.EventMessage)
+ self.assertEqual(msg.event.event_class.name, 'salut')
+ field = msg.event.payload_field['my_int']
self.assertEqual(field, (comp_self._at - 2) * 3)
elif comp_self._at == 7:
- self.assertIsInstance(notif, bt2.PacketEndNotification)
+ self.assertIsInstance(msg, bt2.PacketEndMessage)
elif comp_self._at == 8:
- self.assertIsInstance(notif, bt2.StreamEndNotification)
+ self.assertIsInstance(msg, bt2.StreamEndMessage)
comp_self._at += 1
def _port_connected(self, port, other_port):
- self._notif_iter = port.connection.create_notification_iterator()
+ self._msg_iter = port.connection.create_message_iterator()
src = self._graph.add_component(MySource, 'src')
sink = self._graph.add_component(MySink, 'sink')
self._graph.run()
def test_run_again(self):
- class MyIter(bt2._UserNotificationIterator):
+ class MyIter(bt2._UserMessageIterator):
def __init__(self):
self._build_meta()
self._at = 0
if self._at == 1:
raise bt2.TryAgain
- notif = bt2.EventNotification(self._create_event(self._at * 3))
+ msg = bt2.EventMessage(self._create_event(self._at * 3))
self._at += 1
- return notif
+ return msg
class MySource(bt2._UserSourceComponent,
- notification_iterator_class=MyIter):
+ message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')
def _consume(comp_self):
if comp_self._at == 0:
- notif = next(comp_self._notif_iter)
- self.assertIsInstance(notif, bt2.EventNotification)
+ msg = next(comp_self._msg_iter)
+ self.assertIsInstance(msg, bt2.EventMessage)
elif comp_self._at == 1:
with self.assertRaises(bt2.TryAgain):
- notif = next(comp_self._notif_iter)
+ msg = next(comp_self._msg_iter)
raise bt2.TryAgain
comp_self._at += 1
def _port_connected(self, port, other_port):
- types = [bt2.EventNotification]
- self._notif_iter = port.connection.create_notification_iterator(types)
+ types = [bt2.EventMessage]
+ self._msg_iter = port.connection.create_message_iterator(types)
src = self._graph.add_component(MySource, 'src')
sink = self._graph.add_component(MySink, 'sink')
self._graph.run()
def test_run_no_sink(self):
- class MyIter(bt2._UserNotificationIterator):
+ class MyIter(bt2._UserMessageIterator):
pass
class MySource(bt2._UserSourceComponent,
- notification_iterator_class=MyIter):
+ message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')
class MyFilter(bt2._UserFilterComponent,
- notification_iterator_class=MyIter):
+ message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')
self._add_input_port('in')
self._graph.run()
def test_run_error(self):
- class MyIter(bt2._UserNotificationIterator):
+ class MyIter(bt2._UserMessageIterator):
def __init__(self):
self._build_meta()
self._at = 0
if self._at == 1:
raise bt2.TryAgain
- notif = bt2.EventNotification(self._create_event(self._at * 3))
+ msg = bt2.EventMessage(self._create_event(self._at * 3))
self._at += 1
- return notif
+ return msg
class MySource(bt2._UserSourceComponent,
- notification_iterator_class=MyIter):
+ message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')
def _consume(comp_self):
if comp_self._at == 0:
- notif = next(comp_self._notif_iter)
- self.assertIsInstance(notif, bt2.EventNotification)
+ msg = next(comp_self._msg_iter)
+ self.assertIsInstance(msg, bt2.EventMessage)
elif comp_self._at == 1:
raise RuntimeError('error!')
comp_self._at += 1
def _port_connected(self, port, other_port):
- types = [bt2.EventNotification]
- self._notif_iter = port.connection.create_notification_iterator(types)
+ types = [bt2.EventMessage]
+ self._msg_iter = port.connection.create_message_iterator(types)
src = self._graph.add_component(MySource, 'src')
sink = self._graph.add_component(MySink, 'sink')
self._graph.run()
def test_run_cannot_consume(self):
- class MyIter(bt2._UserNotificationIterator):
+ class MyIter(bt2._UserMessageIterator):
pass
class MySource(bt2._UserSourceComponent,
- notification_iterator_class=MyIter):
+ message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')
self.assertIs(type(exc), bt2.CannotConsumeGraph)
def test_listeners(self):
- class MyIter(bt2._UserNotificationIterator):
+ class MyIter(bt2._UserMessageIterator):
def __next__(self):
raise bt2.Stop
class MySource(bt2._UserSourceComponent,
- notification_iterator_class=MyIter):
+ message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')
self._add_output_port('zero')