X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=tests%2Fbindings%2Fpython%2Fbt2%2Ftest_message.py;h=fc75e1c596f02c9d82f235ad32a5af76c26d80b6;hb=776a2a252c9875caa1e8b4f41cb8cc12c79611c3;hp=e58e52451d708648012ab2d514bbaf8a2d178eee;hpb=59225a3e0e13a9c674234755e55055d9ff68d635;p=babeltrace.git diff --git a/tests/bindings/python/bt2/test_message.py b/tests/bindings/python/bt2/test_message.py index e58e5245..fc75e1c5 100644 --- a/tests/bindings/python/bt2/test_message.py +++ b/tests/bindings/python/bt2/test_message.py @@ -1,20 +1,7 @@ +# SPDX-License-Identifier: GPL-2.0-only # # Copyright (C) 2019 EfficiOS Inc. # -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; only version 2 -# of the License. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. -# import unittest import bt2 @@ -34,7 +21,7 @@ from bt2 import trace_class as bt2_trace_class class AllMessagesTestCase(unittest.TestCase): def setUp(self): class MyIter(bt2._UserMessageIterator): - def __init__(self, self_port_output): + def __init__(self, config, self_port_output): self._at = 0 self._with_stream_msgs_clock_snapshots = self_port_output.user_data.get( 'with_stream_msgs_clock_snapshots', False @@ -215,10 +202,9 @@ class AllMessagesTestCase(unittest.TestCase): elif i == 3: self.assertIs(type(msg), bt2._MessageIteratorInactivityMessageConst) self.assertIs( - type(msg.default_clock_snapshot), - bt2_clock_snapshot._ClockSnapshotConst, + type(msg.clock_snapshot), bt2_clock_snapshot._ClockSnapshotConst ) - self.assertEqual(msg.default_clock_snapshot.value, i) + self.assertEqual(msg.clock_snapshot.value, i) elif i == 4: self.assertIs(type(msg), bt2._DiscardedEventsMessageConst) self.assertIs(type(msg.stream), bt2_stream._StreamConst) @@ -407,5 +393,302 @@ class AllMessagesTestCase(unittest.TestCase): self.assertIs(type(msg.event), bt2_event._Event) +class CreateDiscardedEventMessageTestCase(unittest.TestCase): + # Most basic case. + def test_create(self): + def create_stream_class(tc, cc): + return tc.create_stream_class(supports_discarded_events=True) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_events_message(stream) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedEventsMessage) + self.assertIs(msg.count, None) + + # With event count. + def test_create_with_count(self): + def create_stream_class(tc, cc): + return tc.create_stream_class(supports_discarded_events=True) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_events_message(stream, count=242) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedEventsMessage) + self.assertEqual(msg.count, 242) + + # With event count == 0. + def test_create_with_count_zero_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class(supports_discarded_events=True) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + 'discarded event count is 0', + ): + msg_iter._create_discarded_events_message(stream, count=0) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # With clock snapshots. + def test_create_with_clock_snapshots(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + default_clock_class=cc, + supports_discarded_events=True, + discarded_events_have_default_clock_snapshots=True, + ) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_events_message( + stream, beg_clock_snapshot=10, end_clock_snapshot=20 + ) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedEventsMessage) + self.assertEqual(msg.beginning_default_clock_snapshot, 10) + self.assertEqual(msg.end_default_clock_snapshot, 20) + + # Trying to create when the stream does not support discarded events. + def test_create_unsupported_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class() + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, 'stream class does not support discarded events' + ): + msg_iter._create_discarded_events_message(stream) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # Trying to create with clock snapshots when the stream does not support + # them. + def test_create_unsupported_clock_snapshots_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class(supports_discarded_events=True) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + 'discarded events have no default clock snapshots for this stream class', + ): + msg_iter._create_discarded_events_message( + stream, beg_clock_snapshot=10, end_clock_snapshot=20 + ) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # Trying to create without clock snapshots when the stream requires them. + def test_create_missing_clock_snapshots_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + default_clock_class=cc, + supports_discarded_events=True, + discarded_events_have_default_clock_snapshots=True, + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + 'discarded events have default clock snapshots for this stream class', + ): + msg_iter._create_discarded_events_message(stream) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # End clock snapshot greater than beginning clock snapshot. + def test_create_clock_snapshots_end_gt_begin_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + default_clock_class=cc, + supports_discarded_events=True, + discarded_events_have_default_clock_snapshots=True, + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + r'beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)', + ): + msg_iter._create_discarded_events_message( + stream, beg_clock_snapshot=20, end_clock_snapshot=10 + ) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + +class CreateDiscardedPacketMessageTestCase(unittest.TestCase): + # Most basic case. + def test_create(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + supports_packets=True, supports_discarded_packets=True + ) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_packets_message(stream) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedPacketsMessage) + self.assertIs(msg.count, None) + + # With packet count. + def test_create_with_count(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + supports_packets=True, supports_discarded_packets=True + ) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_packets_message(stream, count=242) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedPacketsMessage) + self.assertEqual(msg.count, 242) + + # With packet count == 0. + def test_create_with_count_zero_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + supports_packets=True, supports_discarded_packets=True + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + 'discarded packet count is 0', + ): + msg_iter._create_discarded_packets_message(stream, count=0) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # With clock snapshots. + def test_create_with_clock_snapshots(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + default_clock_class=cc, + supports_packets=True, + supports_discarded_packets=True, + discarded_packets_have_default_clock_snapshots=True, + ) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_packets_message( + stream, beg_clock_snapshot=10, end_clock_snapshot=20 + ) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedPacketsMessage) + self.assertEqual(msg.beginning_default_clock_snapshot, 10) + self.assertEqual(msg.end_default_clock_snapshot, 20) + + # Trying to create when the stream does not support discarded packets. + def test_create_unsupported_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + supports_packets=True, + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, 'stream class does not support discarded packets' + ): + msg_iter._create_discarded_packets_message(stream) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # Trying to create with clock snapshots when the stream does not support + # them. + def test_create_unsupported_clock_snapshots_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + supports_packets=True, supports_discarded_packets=True + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + 'discarded packets have no default clock snapshots for this stream class', + ): + msg_iter._create_discarded_packets_message( + stream, beg_clock_snapshot=10, end_clock_snapshot=20 + ) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # Trying to create without clock snapshots when the stream requires them. + def test_create_missing_clock_snapshots_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + default_clock_class=cc, + supports_packets=True, + supports_discarded_packets=True, + discarded_packets_have_default_clock_snapshots=True, + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + 'discarded packets have default clock snapshots for this stream class', + ): + msg_iter._create_discarded_packets_message(stream) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # End clock snapshot greater than beginning clock snapshot. + def test_create_clock_snapshots_end_gt_begin_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + default_clock_class=cc, + supports_packets=True, + supports_discarded_packets=True, + discarded_packets_have_default_clock_snapshots=True, + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + r'beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)', + ): + msg_iter._create_discarded_packets_message( + stream, beg_clock_snapshot=20, end_clock_snapshot=10 + ) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + if __name__ == '__main__': unittest.main()