lib: add precond. check for begin <= end on pkt./ev. disc. msg. creation
[babeltrace.git] / tests / bindings / python / bt2 / test_message.py
index b2ad1a426d791ef6b677681dfecd2478ed6a5a62..a99972aadc7d54f226f9de639e34f602a9c58bb2 100644 (file)
@@ -1,20 +1,7 @@
+# SPDX-License-Identifier: GPL-2.0-only
 #
 # Copyright (C) 2019 EfficiOS Inc.
 #
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; only version 2
-# of the License.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
-#
 
 import unittest
 import bt2
@@ -34,7 +21,7 @@ from bt2 import trace_class as bt2_trace_class
 class AllMessagesTestCase(unittest.TestCase):
     def setUp(self):
         class MyIter(bt2._UserMessageIterator):
-            def __init__(self, self_port_output):
+            def __init__(self, config, self_port_output):
                 self._at = 0
                 self._with_stream_msgs_clock_snapshots = self_port_output.user_data.get(
                     'with_stream_msgs_clock_snapshots', False
@@ -119,7 +106,7 @@ class AllMessagesTestCase(unittest.TestCase):
                 return msg
 
         class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
-            def __init__(self, params, obj):
+            def __init__(self, config, params, obj):
                 self._add_output_port('out', params)
 
                 with_cc = bool(params['with_cc'])
@@ -215,10 +202,9 @@ class AllMessagesTestCase(unittest.TestCase):
             elif i == 3:
                 self.assertIs(type(msg), bt2._MessageIteratorInactivityMessageConst)
                 self.assertIs(
-                    type(msg.default_clock_snapshot),
-                    bt2_clock_snapshot._ClockSnapshotConst,
+                    type(msg.clock_snapshot), bt2_clock_snapshot._ClockSnapshotConst
                 )
-                self.assertEqual(msg.default_clock_snapshot.value, i)
+                self.assertEqual(msg.clock_snapshot.value, i)
             elif i == 4:
                 self.assertIs(type(msg), bt2._DiscardedEventsMessageConst)
                 self.assertIs(type(msg.stream), bt2_stream._StreamConst)
@@ -405,3 +391,266 @@ class AllMessagesTestCase(unittest.TestCase):
     def test_event_msg(self):
         msg = utils.get_event_message()
         self.assertIs(type(msg.event), bt2_event._Event)
+
+
+class CreateDiscardedEventMessageTestCase(unittest.TestCase):
+    # Most basic case.
+    def test_create(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(supports_discarded_events=True)
+
+        def msg_iter_next(msg_iter, stream):
+            return msg_iter._create_discarded_events_message(stream)
+
+        msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertIs(type(msg), bt2._DiscardedEventsMessage)
+        self.assertIs(msg.count, None)
+
+    # With event count.
+    def test_create_with_count(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(supports_discarded_events=True)
+
+        def msg_iter_next(msg_iter, stream):
+            return msg_iter._create_discarded_events_message(stream, count=242)
+
+        msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertIs(type(msg), bt2._DiscardedEventsMessage)
+        self.assertEqual(msg.count, 242)
+
+    # With clock snapshots.
+    def test_create_with_clock_snapshots(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(
+                default_clock_class=cc,
+                supports_discarded_events=True,
+                discarded_events_have_default_clock_snapshots=True,
+            )
+
+        def msg_iter_next(msg_iter, stream):
+            return msg_iter._create_discarded_events_message(
+                stream, beg_clock_snapshot=10, end_clock_snapshot=20
+            )
+
+        msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertIs(type(msg), bt2._DiscardedEventsMessage)
+        self.assertEqual(msg.beginning_default_clock_snapshot, 10)
+        self.assertEqual(msg.end_default_clock_snapshot, 20)
+
+    # Trying to create when the stream does not support discarded events.
+    def test_create_unsupported_raises(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class()
+
+        def msg_iter_next(msg_iter, stream):
+            with self.assertRaisesRegex(
+                ValueError, 'stream class does not support discarded events'
+            ):
+                msg_iter._create_discarded_events_message(stream)
+
+            return 123
+
+        res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertEqual(res, 123)
+
+    # Trying to create with clock snapshots when the stream does not support
+    # them.
+    def test_create_unsupported_clock_snapshots_raises(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(supports_discarded_events=True)
+
+        def msg_iter_next(msg_iter, stream):
+            with self.assertRaisesRegex(
+                ValueError,
+                'discarded events have no default clock snapshots for this stream class',
+            ):
+                msg_iter._create_discarded_events_message(
+                    stream, beg_clock_snapshot=10, end_clock_snapshot=20
+                )
+
+            return 123
+
+        res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertEqual(res, 123)
+
+    # Trying to create without clock snapshots when the stream requires them.
+    def test_create_missing_clock_snapshots_raises(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(
+                default_clock_class=cc,
+                supports_discarded_events=True,
+                discarded_events_have_default_clock_snapshots=True,
+            )
+
+        def msg_iter_next(msg_iter, stream):
+            with self.assertRaisesRegex(
+                ValueError,
+                'discarded events have default clock snapshots for this stream class',
+            ):
+                msg_iter._create_discarded_events_message(stream)
+
+            return 123
+
+        res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertEqual(res, 123)
+
+    # End clock snapshot greater than beginning clock snapshot.
+    def test_create_clock_snapshots_end_gt_begin_raises(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(
+                default_clock_class=cc,
+                supports_discarded_events=True,
+                discarded_events_have_default_clock_snapshots=True,
+            )
+
+        def msg_iter_next(msg_iter, stream):
+            with self.assertRaisesRegex(
+                ValueError,
+                r'beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)',
+            ):
+                msg_iter._create_discarded_events_message(
+                    stream, beg_clock_snapshot=20, end_clock_snapshot=10
+                )
+
+            return 123
+
+        res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertEqual(res, 123)
+
+
+class CreateDiscardedPacketMessageTestCase(unittest.TestCase):
+    # Most basic case.
+    def test_create(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(
+                supports_packets=True, supports_discarded_packets=True
+            )
+
+        def msg_iter_next(msg_iter, stream):
+            return msg_iter._create_discarded_packets_message(stream)
+
+        msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertIs(type(msg), bt2._DiscardedPacketsMessage)
+        self.assertIs(msg.count, None)
+
+    # With packet count.
+    def test_create_with_count(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(
+                supports_packets=True, supports_discarded_packets=True
+            )
+
+        def msg_iter_next(msg_iter, stream):
+            return msg_iter._create_discarded_packets_message(stream, count=242)
+
+        msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertIs(type(msg), bt2._DiscardedPacketsMessage)
+        self.assertEqual(msg.count, 242)
+
+    # With clock snapshots.
+    def test_create_with_clock_snapshots(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(
+                default_clock_class=cc,
+                supports_packets=True,
+                supports_discarded_packets=True,
+                discarded_packets_have_default_clock_snapshots=True,
+            )
+
+        def msg_iter_next(msg_iter, stream):
+            return msg_iter._create_discarded_packets_message(
+                stream, beg_clock_snapshot=10, end_clock_snapshot=20
+            )
+
+        msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertIs(type(msg), bt2._DiscardedPacketsMessage)
+        self.assertEqual(msg.beginning_default_clock_snapshot, 10)
+        self.assertEqual(msg.end_default_clock_snapshot, 20)
+
+    # Trying to create when the stream does not support discarded packets.
+    def test_create_unsupported_raises(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(supports_packets=True,)
+
+        def msg_iter_next(msg_iter, stream):
+            with self.assertRaisesRegex(
+                ValueError, 'stream class does not support discarded packets'
+            ):
+                msg_iter._create_discarded_packets_message(stream)
+
+            return 123
+
+        res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertEqual(res, 123)
+
+    # Trying to create with clock snapshots when the stream does not support
+    # them.
+    def test_create_unsupported_clock_snapshots_raises(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(
+                supports_packets=True, supports_discarded_packets=True
+            )
+
+        def msg_iter_next(msg_iter, stream):
+            with self.assertRaisesRegex(
+                ValueError,
+                'discarded packets have no default clock snapshots for this stream class',
+            ):
+                msg_iter._create_discarded_packets_message(
+                    stream, beg_clock_snapshot=10, end_clock_snapshot=20
+                )
+
+            return 123
+
+        res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertEqual(res, 123)
+
+    # Trying to create without clock snapshots when the stream requires them.
+    def test_create_missing_clock_snapshots_raises(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(
+                default_clock_class=cc,
+                supports_packets=True,
+                supports_discarded_packets=True,
+                discarded_packets_have_default_clock_snapshots=True,
+            )
+
+        def msg_iter_next(msg_iter, stream):
+            with self.assertRaisesRegex(
+                ValueError,
+                'discarded packets have default clock snapshots for this stream class',
+            ):
+                msg_iter._create_discarded_packets_message(stream)
+
+            return 123
+
+        res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertEqual(res, 123)
+
+    # End clock snapshot greater than beginning clock snapshot.
+    def test_create_clock_snapshots_end_gt_begin_raises(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(
+                default_clock_class=cc,
+                supports_packets=True,
+                supports_discarded_packets=True,
+                discarded_packets_have_default_clock_snapshots=True,
+            )
+
+        def msg_iter_next(msg_iter, stream):
+            with self.assertRaisesRegex(
+                ValueError,
+                r'beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)',
+            ):
+                msg_iter._create_discarded_packets_message(
+                    stream, beg_clock_snapshot=20, end_clock_snapshot=10
+                )
+
+            return 123
+
+        res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertEqual(res, 123)
+
+
+if __name__ == '__main__':
+    unittest.main()
This page took 0.028617 seconds and 4 git commands to generate.