@unittest.skip("this is broken")
class ConnectionTestCase(unittest.TestCase):
def test_create(self):
- class MyIter(bt2._UserNotificationIterator):
+ class MyIter(bt2._UserMessageIterator):
def __next__(self):
raise bt2.Stop
class MySource(bt2._UserSourceComponent,
- notification_iterator_class=MyIter):
+ message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')
self.assertNotIsInstance(conn, bt2._PrivateConnection)
def test_is_ended_false(self):
- class MyIter(bt2._UserNotificationIterator):
+ class MyIter(bt2._UserMessageIterator):
def __next__(self):
raise bt2.Stop
class MySource(bt2._UserSourceComponent,
- notification_iterator_class=MyIter):
+ message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')
self.assertFalse(conn.is_ended)
def test_is_ended_true(self):
- class MyIter(bt2._UserNotificationIterator):
+ class MyIter(bt2._UserMessageIterator):
def __next__(self):
raise bt2.Stop
class MySource(bt2._UserSourceComponent,
- notification_iterator_class=MyIter):
+ message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')
self.assertTrue(conn.is_ended)
def test_downstream_port(self):
- class MyIter(bt2._UserNotificationIterator):
+ class MyIter(bt2._UserMessageIterator):
def __next__(self):
raise bt2.Stop
class MySource(bt2._UserSourceComponent,
- notification_iterator_class=MyIter):
+ message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')
self.assertEqual(conn.downstream_port.addr, sink.input_ports['in'].addr)
def test_upstream_port(self):
- class MyIter(bt2._UserNotificationIterator):
+ class MyIter(bt2._UserMessageIterator):
def __next__(self):
raise bt2.Stop
class MySource(bt2._UserSourceComponent,
- notification_iterator_class=MyIter):
+ message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')
self.assertEqual(conn.upstream_port.addr, src.output_ports['out'].addr)
def test_eq(self):
- class MyIter(bt2._UserNotificationIterator):
+ class MyIter(bt2._UserMessageIterator):
def __next__(self):
raise bt2.Stop
class MySource(bt2._UserSourceComponent,
- notification_iterator_class=MyIter):
+ message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')
self.assertEqual(conn, conn)
def test_eq_invalid(self):
- class MyIter(bt2._UserNotificationIterator):
+ class MyIter(bt2._UserMessageIterator):
def __next__(self):
raise bt2.Stop
class MySource(bt2._UserSourceComponent,
- notification_iterator_class=MyIter):
+ message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')
@unittest.skip("this is broken")
class PrivateConnectionTestCase(unittest.TestCase):
def test_create(self):
- class MyIter(bt2._UserNotificationIterator):
+ class MyIter(bt2._UserMessageIterator):
def __next__(self):
raise bt2.Stop
class MySource(bt2._UserSourceComponent,
- notification_iterator_class=MyIter):
+ message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')
del priv_conn
def test_is_ended_false(self):
- class MyIter(bt2._UserNotificationIterator):
+ class MyIter(bt2._UserMessageIterator):
def __next__(self):
raise bt2.Stop
class MySource(bt2._UserSourceComponent,
- notification_iterator_class=MyIter):
+ message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')
del priv_conn
def test_is_ended_true(self):
- class MyIter(bt2._UserNotificationIterator):
+ class MyIter(bt2._UserMessageIterator):
def __next__(self):
raise bt2.Stop
class MySource(bt2._UserSourceComponent,
- notification_iterator_class=MyIter):
+ message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')
del priv_conn
def test_downstream_port(self):
- class MyIter(bt2._UserNotificationIterator):
+ class MyIter(bt2._UserMessageIterator):
def __next__(self):
raise bt2.Stop
class MySource(bt2._UserSourceComponent,
- notification_iterator_class=MyIter):
+ message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')
del priv_port
def test_upstream_port(self):
- class MyIter(bt2._UserNotificationIterator):
+ class MyIter(bt2._UserMessageIterator):
def __next__(self):
raise bt2.Stop
class MySource(bt2._UserSourceComponent,
- notification_iterator_class=MyIter):
+ message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')
del priv_port
def test_eq(self):
- class MyIter(bt2._UserNotificationIterator):
+ class MyIter(bt2._UserMessageIterator):
def __next__(self):
raise bt2.Stop
class MySource(bt2._UserSourceComponent,
- notification_iterator_class=MyIter):
+ message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')
del priv_conn
def test_eq_invalid(self):
- class MyIter(bt2._UserNotificationIterator):
+ class MyIter(bt2._UserMessageIterator):
def __next__(self):
raise bt2.Stop
class MySource(bt2._UserSourceComponent,
- notification_iterator_class=MyIter):
+ message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')