X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=tests%2Fbindings%2Fpython%2Fbt2%2Ftest_message.py;h=fc75e1c596f02c9d82f235ad32a5af76c26d80b6;hb=768f9bcbf4b5acd09dda85ab32c0ea30d8826136;hp=b2ad1a426d791ef6b677681dfecd2478ed6a5a62;hpb=f0a42b33ac3951cd5cb2ee0f66ac04437a681621;p=babeltrace.git diff --git a/tests/bindings/python/bt2/test_message.py b/tests/bindings/python/bt2/test_message.py index b2ad1a42..fc75e1c5 100644 --- a/tests/bindings/python/bt2/test_message.py +++ b/tests/bindings/python/bt2/test_message.py @@ -1,20 +1,7 @@ +# SPDX-License-Identifier: GPL-2.0-only # # Copyright (C) 2019 EfficiOS Inc. # -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; only version 2 -# of the License. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. -# import unittest import bt2 @@ -34,7 +21,7 @@ from bt2 import trace_class as bt2_trace_class class AllMessagesTestCase(unittest.TestCase): def setUp(self): class MyIter(bt2._UserMessageIterator): - def __init__(self, self_port_output): + def __init__(self, config, self_port_output): self._at = 0 self._with_stream_msgs_clock_snapshots = self_port_output.user_data.get( 'with_stream_msgs_clock_snapshots', False @@ -119,7 +106,7 @@ class AllMessagesTestCase(unittest.TestCase): return msg class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter): - def __init__(self, params, obj): + def __init__(self, config, params, obj): self._add_output_port('out', params) with_cc = bool(params['with_cc']) @@ -215,10 +202,9 @@ class AllMessagesTestCase(unittest.TestCase): elif i == 3: self.assertIs(type(msg), bt2._MessageIteratorInactivityMessageConst) self.assertIs( - type(msg.default_clock_snapshot), - bt2_clock_snapshot._ClockSnapshotConst, + type(msg.clock_snapshot), bt2_clock_snapshot._ClockSnapshotConst ) - self.assertEqual(msg.default_clock_snapshot.value, i) + self.assertEqual(msg.clock_snapshot.value, i) elif i == 4: self.assertIs(type(msg), bt2._DiscardedEventsMessageConst) self.assertIs(type(msg.stream), bt2_stream._StreamConst) @@ -405,3 +391,304 @@ class AllMessagesTestCase(unittest.TestCase): def test_event_msg(self): msg = utils.get_event_message() self.assertIs(type(msg.event), bt2_event._Event) + + +class CreateDiscardedEventMessageTestCase(unittest.TestCase): + # Most basic case. + def test_create(self): + def create_stream_class(tc, cc): + return tc.create_stream_class(supports_discarded_events=True) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_events_message(stream) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedEventsMessage) + self.assertIs(msg.count, None) + + # With event count. + def test_create_with_count(self): + def create_stream_class(tc, cc): + return tc.create_stream_class(supports_discarded_events=True) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_events_message(stream, count=242) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedEventsMessage) + self.assertEqual(msg.count, 242) + + # With event count == 0. + def test_create_with_count_zero_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class(supports_discarded_events=True) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + 'discarded event count is 0', + ): + msg_iter._create_discarded_events_message(stream, count=0) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # With clock snapshots. + def test_create_with_clock_snapshots(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + default_clock_class=cc, + supports_discarded_events=True, + discarded_events_have_default_clock_snapshots=True, + ) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_events_message( + stream, beg_clock_snapshot=10, end_clock_snapshot=20 + ) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedEventsMessage) + self.assertEqual(msg.beginning_default_clock_snapshot, 10) + self.assertEqual(msg.end_default_clock_snapshot, 20) + + # Trying to create when the stream does not support discarded events. + def test_create_unsupported_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class() + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, 'stream class does not support discarded events' + ): + msg_iter._create_discarded_events_message(stream) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # Trying to create with clock snapshots when the stream does not support + # them. + def test_create_unsupported_clock_snapshots_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class(supports_discarded_events=True) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + 'discarded events have no default clock snapshots for this stream class', + ): + msg_iter._create_discarded_events_message( + stream, beg_clock_snapshot=10, end_clock_snapshot=20 + ) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # Trying to create without clock snapshots when the stream requires them. + def test_create_missing_clock_snapshots_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + default_clock_class=cc, + supports_discarded_events=True, + discarded_events_have_default_clock_snapshots=True, + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + 'discarded events have default clock snapshots for this stream class', + ): + msg_iter._create_discarded_events_message(stream) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # End clock snapshot greater than beginning clock snapshot. + def test_create_clock_snapshots_end_gt_begin_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + default_clock_class=cc, + supports_discarded_events=True, + discarded_events_have_default_clock_snapshots=True, + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + r'beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)', + ): + msg_iter._create_discarded_events_message( + stream, beg_clock_snapshot=20, end_clock_snapshot=10 + ) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + +class CreateDiscardedPacketMessageTestCase(unittest.TestCase): + # Most basic case. + def test_create(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + supports_packets=True, supports_discarded_packets=True + ) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_packets_message(stream) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedPacketsMessage) + self.assertIs(msg.count, None) + + # With packet count. + def test_create_with_count(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + supports_packets=True, supports_discarded_packets=True + ) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_packets_message(stream, count=242) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedPacketsMessage) + self.assertEqual(msg.count, 242) + + # With packet count == 0. + def test_create_with_count_zero_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + supports_packets=True, supports_discarded_packets=True + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + 'discarded packet count is 0', + ): + msg_iter._create_discarded_packets_message(stream, count=0) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # With clock snapshots. + def test_create_with_clock_snapshots(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + default_clock_class=cc, + supports_packets=True, + supports_discarded_packets=True, + discarded_packets_have_default_clock_snapshots=True, + ) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_packets_message( + stream, beg_clock_snapshot=10, end_clock_snapshot=20 + ) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedPacketsMessage) + self.assertEqual(msg.beginning_default_clock_snapshot, 10) + self.assertEqual(msg.end_default_clock_snapshot, 20) + + # Trying to create when the stream does not support discarded packets. + def test_create_unsupported_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + supports_packets=True, + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, 'stream class does not support discarded packets' + ): + msg_iter._create_discarded_packets_message(stream) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # Trying to create with clock snapshots when the stream does not support + # them. + def test_create_unsupported_clock_snapshots_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + supports_packets=True, supports_discarded_packets=True + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + 'discarded packets have no default clock snapshots for this stream class', + ): + msg_iter._create_discarded_packets_message( + stream, beg_clock_snapshot=10, end_clock_snapshot=20 + ) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # Trying to create without clock snapshots when the stream requires them. + def test_create_missing_clock_snapshots_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + default_clock_class=cc, + supports_packets=True, + supports_discarded_packets=True, + discarded_packets_have_default_clock_snapshots=True, + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + 'discarded packets have default clock snapshots for this stream class', + ): + msg_iter._create_discarded_packets_message(stream) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # End clock snapshot greater than beginning clock snapshot. + def test_create_clock_snapshots_end_gt_begin_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + default_clock_class=cc, + supports_packets=True, + supports_discarded_packets=True, + discarded_packets_have_default_clock_snapshots=True, + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + r'beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)', + ): + msg_iter._create_discarded_packets_message( + stream, beg_clock_snapshot=20, end_clock_snapshot=10 + ) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + +if __name__ == '__main__': + unittest.main()