X-Git-Url: https://git.efficios.com/?a=blobdiff_plain;f=tests%2Fbindings%2Fpython%2Fbt2%2Ftest_message.py;h=96367ac64592c5bf54038480cf2df6b1c2df2b58;hb=356ada2b3523dc7f056973111399915153c6294a;hp=7c26a935536e5e9b2fe3a9bec9c6fa922328c66d;hpb=bcc1b6cda412d8ea9ffc70dba6db12028b84a599;p=babeltrace.git diff --git a/tests/bindings/python/bt2/test_message.py b/tests/bindings/python/bt2/test_message.py index 7c26a935..96367ac6 100644 --- a/tests/bindings/python/bt2/test_message.py +++ b/tests/bindings/python/bt2/test_message.py @@ -406,5 +406,220 @@ class AllMessagesTestCase(unittest.TestCase): self.assertIs(type(msg.event), bt2_event._Event) +class CreateDiscardedEventMessageTestCase(unittest.TestCase): + # Most basic case. + def test_create(self): + def create_stream_class(tc, cc): + return tc.create_stream_class(supports_discarded_events=True) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_events_message(stream) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedEventsMessage) + # Broken at the moment. + # self.assertIs(msg.count, None) + + # With event count. + def test_create_with_count(self): + def create_stream_class(tc, cc): + return tc.create_stream_class(supports_discarded_events=True) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_events_message(stream, count=242) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedEventsMessage) + # Broken at the moment. + # self.assertEqual(msg.count, 242) + + # With clock snapshots. + def test_create_with_clock_snapshots(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + default_clock_class=cc, + supports_discarded_events=True, + discarded_events_have_default_clock_snapshots=True, + ) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_events_message( + stream, beg_clock_snapshot=10, end_clock_snapshot=20 + ) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedEventsMessage) + # Broken at the moment. + # self.assertEqual(msg.beginning_default_clock_snapshot, 10); + # self.assertEqual(msg.end_default_clock_snapshot, 20); + + # Trying to create when the stream does not support discarded events. + def test_create_unsupported_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class() + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, 'stream class does not support discarded events' + ): + msg_iter._create_discarded_events_message(stream) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # Trying to create with clock snapshots when the stream does not support + # them. + def test_create_unsupported_clock_snapshots_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class(supports_discarded_events=True) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + 'discarded events have no default clock snapshots for this stream class', + ): + msg_iter._create_discarded_events_message( + stream, beg_clock_snapshot=10, end_clock_snapshot=20 + ) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # Trying to create without clock snapshots when the stream requires them. + def test_create_missing_clock_snapshots_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + default_clock_class=cc, + supports_discarded_events=True, + discarded_events_have_default_clock_snapshots=True, + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + 'discarded events have default clock snapshots for this stream class', + ): + msg_iter._create_discarded_events_message(stream) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + +class CreateDiscardedPacketMessageTestCase(unittest.TestCase): + # Most basic case. + def test_create(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + supports_packets=True, supports_discarded_packets=True + ) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_packets_message(stream) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedPacketsMessage) + self.assertIs(msg.count, None) + + # With packet count. + def test_create_with_count(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + supports_packets=True, supports_discarded_packets=True + ) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_packets_message(stream, count=242) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedPacketsMessage) + self.assertEqual(msg.count, 242) + + # With clock snapshots. + def test_create_with_clock_snapshots(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + default_clock_class=cc, + supports_packets=True, + supports_discarded_packets=True, + discarded_packets_have_default_clock_snapshots=True, + ) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_packets_message( + stream, beg_clock_snapshot=10, end_clock_snapshot=20 + ) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedPacketsMessage) + self.assertEqual(msg.beginning_default_clock_snapshot, 10) + self.assertEqual(msg.end_default_clock_snapshot, 20) + + # Trying to create when the stream does not support discarded packets. + def test_create_unsupported_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class(supports_packets=True,) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, 'stream class does not support discarded packets' + ): + msg_iter._create_discarded_packets_message(stream) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # Trying to create with clock snapshots when the stream does not support + # them. + def test_create_unsupported_clock_snapshots_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + supports_packets=True, supports_discarded_packets=True + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + 'discarded packets have no default clock snapshots for this stream class', + ): + msg_iter._create_discarded_packets_message( + stream, beg_clock_snapshot=10, end_clock_snapshot=20 + ) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # Trying to create without clock snapshots when the stream requires them. + def test_create_missing_clock_snapshots_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + default_clock_class=cc, + supports_packets=True, + supports_discarded_packets=True, + discarded_packets_have_default_clock_snapshots=True, + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + 'discarded packets have default clock snapshots for this stream class', + ): + msg_iter._create_discarded_packets_message(stream) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + if __name__ == '__main__': unittest.main()