X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=tests%2Fbindings%2Fpython%2Fbt2%2Ftest_message.py;h=d9e2fdbc186d5243edbfd7d6545efeae7ef8d962;hb=e5a41ca3bf74f80d8decfe22f7916be4566838ec;hp=012c851f0ea5e64f95bbb72e47dff9fc5e5b9426;hpb=8d8b141db4c46135a35be19e4a1c192f6a36d67b;p=babeltrace.git diff --git a/tests/bindings/python/bt2/test_message.py b/tests/bindings/python/bt2/test_message.py index 012c851f..d9e2fdbc 100644 --- a/tests/bindings/python/bt2/test_message.py +++ b/tests/bindings/python/bt2/test_message.py @@ -1,20 +1,7 @@ +# SPDX-License-Identifier: GPL-2.0-only # # Copyright (C) 2019 EfficiOS Inc. # -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; only version 2 -# of the License. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. -# import unittest import bt2 @@ -215,10 +202,9 @@ class AllMessagesTestCase(unittest.TestCase): elif i == 3: self.assertIs(type(msg), bt2._MessageIteratorInactivityMessageConst) self.assertIs( - type(msg.default_clock_snapshot), - bt2_clock_snapshot._ClockSnapshotConst, + type(msg.clock_snapshot), bt2_clock_snapshot._ClockSnapshotConst ) - self.assertEqual(msg.default_clock_snapshot.value, i) + self.assertEqual(msg.clock_snapshot.value, i) elif i == 4: self.assertIs(type(msg), bt2._DiscardedEventsMessageConst) self.assertIs(type(msg.stream), bt2_stream._StreamConst) @@ -407,5 +393,298 @@ class AllMessagesTestCase(unittest.TestCase): self.assertIs(type(msg.event), bt2_event._Event) +class CreateDiscardedEventMessageTestCase(unittest.TestCase): + # Most basic case. + def test_create(self): + def create_stream_class(tc, cc): + return tc.create_stream_class(supports_discarded_events=True) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_events_message(stream) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedEventsMessage) + self.assertIs(msg.count, None) + + # With event count. + def test_create_with_count(self): + def create_stream_class(tc, cc): + return tc.create_stream_class(supports_discarded_events=True) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_events_message(stream, count=242) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedEventsMessage) + self.assertEqual(msg.count, 242) + + # With event count == 0. + def test_create_with_count_zero_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class(supports_discarded_events=True) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, 'discarded event count is 0', + ): + msg_iter._create_discarded_events_message(stream, count=0) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # With clock snapshots. + def test_create_with_clock_snapshots(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + default_clock_class=cc, + supports_discarded_events=True, + discarded_events_have_default_clock_snapshots=True, + ) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_events_message( + stream, beg_clock_snapshot=10, end_clock_snapshot=20 + ) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedEventsMessage) + self.assertEqual(msg.beginning_default_clock_snapshot, 10) + self.assertEqual(msg.end_default_clock_snapshot, 20) + + # Trying to create when the stream does not support discarded events. + def test_create_unsupported_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class() + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, 'stream class does not support discarded events' + ): + msg_iter._create_discarded_events_message(stream) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # Trying to create with clock snapshots when the stream does not support + # them. + def test_create_unsupported_clock_snapshots_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class(supports_discarded_events=True) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + 'discarded events have no default clock snapshots for this stream class', + ): + msg_iter._create_discarded_events_message( + stream, beg_clock_snapshot=10, end_clock_snapshot=20 + ) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # Trying to create without clock snapshots when the stream requires them. + def test_create_missing_clock_snapshots_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + default_clock_class=cc, + supports_discarded_events=True, + discarded_events_have_default_clock_snapshots=True, + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + 'discarded events have default clock snapshots for this stream class', + ): + msg_iter._create_discarded_events_message(stream) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # End clock snapshot greater than beginning clock snapshot. + def test_create_clock_snapshots_end_gt_begin_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + default_clock_class=cc, + supports_discarded_events=True, + discarded_events_have_default_clock_snapshots=True, + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + r'beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)', + ): + msg_iter._create_discarded_events_message( + stream, beg_clock_snapshot=20, end_clock_snapshot=10 + ) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + +class CreateDiscardedPacketMessageTestCase(unittest.TestCase): + # Most basic case. + def test_create(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + supports_packets=True, supports_discarded_packets=True + ) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_packets_message(stream) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedPacketsMessage) + self.assertIs(msg.count, None) + + # With packet count. + def test_create_with_count(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + supports_packets=True, supports_discarded_packets=True + ) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_packets_message(stream, count=242) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedPacketsMessage) + self.assertEqual(msg.count, 242) + + # With packet count == 0. + def test_create_with_count_zero_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + supports_packets=True, supports_discarded_packets=True + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, 'discarded packet count is 0', + ): + msg_iter._create_discarded_packets_message(stream, count=0) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # With clock snapshots. + def test_create_with_clock_snapshots(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + default_clock_class=cc, + supports_packets=True, + supports_discarded_packets=True, + discarded_packets_have_default_clock_snapshots=True, + ) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_packets_message( + stream, beg_clock_snapshot=10, end_clock_snapshot=20 + ) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedPacketsMessage) + self.assertEqual(msg.beginning_default_clock_snapshot, 10) + self.assertEqual(msg.end_default_clock_snapshot, 20) + + # Trying to create when the stream does not support discarded packets. + def test_create_unsupported_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class(supports_packets=True,) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, 'stream class does not support discarded packets' + ): + msg_iter._create_discarded_packets_message(stream) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # Trying to create with clock snapshots when the stream does not support + # them. + def test_create_unsupported_clock_snapshots_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + supports_packets=True, supports_discarded_packets=True + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + 'discarded packets have no default clock snapshots for this stream class', + ): + msg_iter._create_discarded_packets_message( + stream, beg_clock_snapshot=10, end_clock_snapshot=20 + ) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # Trying to create without clock snapshots when the stream requires them. + def test_create_missing_clock_snapshots_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + default_clock_class=cc, + supports_packets=True, + supports_discarded_packets=True, + discarded_packets_have_default_clock_snapshots=True, + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + 'discarded packets have default clock snapshots for this stream class', + ): + msg_iter._create_discarded_packets_message(stream) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # End clock snapshot greater than beginning clock snapshot. + def test_create_clock_snapshots_end_gt_begin_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + default_clock_class=cc, + supports_packets=True, + supports_discarded_packets=True, + discarded_packets_have_default_clock_snapshots=True, + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + r'beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)', + ): + msg_iter._create_discarded_packets_message( + stream, beg_clock_snapshot=20, end_clock_snapshot=10 + ) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + if __name__ == '__main__': unittest.main()