Remove `skip-string-normalization` in Python formatter config
[babeltrace.git] / tests / bindings / python / bt2 / test_message.py
index 6000a0fa972404e3b0cd55c0f38a8ddfef680bf1..11d12a024b09526349698648d7d6db676e4fe42b 100644 (file)
@@ -1,20 +1,7 @@
+# SPDX-License-Identifier: GPL-2.0-only
 #
 # Copyright (C) 2019 EfficiOS Inc.
 #
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; only version 2
-# of the License.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
-#
 
 import unittest
 import bt2
@@ -34,10 +21,10 @@ from bt2 import trace_class as bt2_trace_class
 class AllMessagesTestCase(unittest.TestCase):
     def setUp(self):
         class MyIter(bt2._UserMessageIterator):
-            def __init__(self, self_port_output):
+            def __init__(self, config, self_port_output):
                 self._at = 0
                 self._with_stream_msgs_clock_snapshots = self_port_output.user_data.get(
-                    'with_stream_msgs_clock_snapshots', False
+                    "with_stream_msgs_clock_snapshots", False
                 )
 
             def __next__(self):
@@ -119,10 +106,10 @@ class AllMessagesTestCase(unittest.TestCase):
                 return msg
 
         class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
-            def __init__(self, params, obj):
-                self._add_output_port('out', params)
+            def __init__(self, config, params, obj):
+                self._add_output_port("out", params)
 
-                with_cc = bool(params['with_cc'])
+                with_cc = bool(params["with_cc"])
                 tc = self._create_trace_class()
                 if with_cc:
                     cc = self._create_clock_class()
@@ -143,15 +130,15 @@ class AllMessagesTestCase(unittest.TestCase):
                 # Create payload field class
                 my_int_fc = tc.create_signed_integer_field_class(32)
                 payload_fc = tc.create_structure_field_class()
-                payload_fc += [('my_int', my_int_fc)]
+                payload_fc += [("my_int", my_int_fc)]
 
                 # Create specific context field class
                 my_int_fc = tc.create_signed_integer_field_class(32)
                 specific_fc = tc.create_structure_field_class()
-                specific_fc += [('my_int', my_int_fc)]
+                specific_fc += [("my_int", my_int_fc)]
 
                 ec = sc.create_event_class(
-                    name='salut',
+                    name="salut",
                     payload_field_class=payload_fc,
                     specific_context_field_class=specific_fc,
                 )
@@ -172,10 +159,10 @@ class AllMessagesTestCase(unittest.TestCase):
         self._iter = MyIter
 
     def test_all_msg_with_cc(self):
-        params = {'with_cc': True}
-        self._src_comp = self._graph.add_component(self._src, 'my_source', params)
+        params = {"with_cc": True}
+        self._src_comp = self._graph.add_component(self._src, "my_source", params)
         self._msg_iter = TestOutputPortMessageIterator(
-            self._graph, self._src_comp.output_ports['out']
+            self._graph, self._src_comp.output_ports["out"]
         )
 
         for i, msg in enumerate(self._msg_iter):
@@ -206,7 +193,7 @@ class AllMessagesTestCase(unittest.TestCase):
                     type(msg.event.payload_field), bt2_field._StructureFieldConst
                 )
                 self.assertIs(
-                    type(msg.event.payload_field['my_int']),
+                    type(msg.event.payload_field["my_int"]),
                     bt2_field._SignedIntegerFieldConst,
                 )
 
@@ -215,10 +202,9 @@ class AllMessagesTestCase(unittest.TestCase):
             elif i == 3:
                 self.assertIs(type(msg), bt2._MessageIteratorInactivityMessageConst)
                 self.assertIs(
-                    type(msg.default_clock_snapshot),
-                    bt2_clock_snapshot._ClockSnapshotConst,
+                    type(msg.clock_snapshot), bt2_clock_snapshot._ClockSnapshotConst
                 )
-                self.assertEqual(msg.default_clock_snapshot.value, i)
+                self.assertEqual(msg.clock_snapshot.value, i)
             elif i == 4:
                 self.assertIs(type(msg), bt2._DiscardedEventsMessageConst)
                 self.assertIs(type(msg.stream), bt2_stream._StreamConst)
@@ -281,10 +267,10 @@ class AllMessagesTestCase(unittest.TestCase):
                 raise Exception
 
     def test_all_msg_without_cc(self):
-        params = {'with_cc': False}
-        self._src_comp = self._graph.add_component(self._src, 'my_source', params)
+        params = {"with_cc": False}
+        self._src_comp = self._graph.add_component(self._src, "my_source", params)
         self._msg_iter = TestOutputPortMessageIterator(
-            self._graph, self._src_comp.output_ports['out']
+            self._graph, self._src_comp.output_ports["out"]
         )
 
         for i, msg in enumerate(self._msg_iter):
@@ -293,7 +279,7 @@ class AllMessagesTestCase(unittest.TestCase):
                 self.assertIs(type(msg.stream), bt2_stream._StreamConst)
                 self.assertEqual(msg.stream.addr, self._stream.addr)
                 with self.assertRaisesRegex(
-                    ValueError, 'stream class has no default clock class'
+                    ValueError, "stream class has no default clock class"
                 ):
                     msg.default_clock_snapshot
             elif i == 1:
@@ -306,7 +292,7 @@ class AllMessagesTestCase(unittest.TestCase):
                 self.assertIs(type(msg.event.cls), bt2_event_class._EventClassConst)
                 self.assertEqual(msg.event.cls.addr, self._event_class.addr)
                 with self.assertRaisesRegex(
-                    ValueError, 'stream class has no default clock class'
+                    ValueError, "stream class has no default clock class"
                 ):
                     msg.default_clock_snapshot
             elif i == 3:
@@ -318,12 +304,12 @@ class AllMessagesTestCase(unittest.TestCase):
                 self.assertIsNone(msg.stream.cls.default_clock_class)
                 with self.assertRaisesRegex(
                     ValueError,
-                    'such a message has no clock snapshots for this stream class',
+                    "such a message has no clock snapshots for this stream class",
                 ):
                     msg.beginning_default_clock_snapshot
                 with self.assertRaisesRegex(
                     ValueError,
-                    'such a message has no clock snapshots for this stream class',
+                    "such a message has no clock snapshots for this stream class",
                 ):
                     msg.end_default_clock_snapshot
             elif i == 4:
@@ -342,12 +328,12 @@ class AllMessagesTestCase(unittest.TestCase):
                 self.assertIsNone(msg.stream.cls.default_clock_class)
                 with self.assertRaisesRegex(
                     ValueError,
-                    'such a message has no clock snapshots for this stream class',
+                    "such a message has no clock snapshots for this stream class",
                 ):
                     msg.beginning_default_clock_snapshot
                 with self.assertRaisesRegex(
                     ValueError,
-                    'such a message has no clock snapshots for this stream class',
+                    "such a message has no clock snapshots for this stream class",
                 ):
                     msg.end_default_clock_snapshot
             elif i == 6:
@@ -355,18 +341,18 @@ class AllMessagesTestCase(unittest.TestCase):
                 self.assertIs(type(msg.stream), bt2_stream._StreamConst)
                 self.assertEqual(msg.stream.addr, self._stream.addr)
                 with self.assertRaisesRegex(
-                    ValueError, 'stream class has no default clock class'
+                    ValueError, "stream class has no default clock class"
                 ):
                     msg.default_clock_snapshot
             else:
                 raise Exception
 
     def test_msg_stream_with_clock_snapshots(self):
-        params = {'with_cc': True, 'with_stream_msgs_clock_snapshots': True}
+        params = {"with_cc": True, "with_stream_msgs_clock_snapshots": True}
 
-        self._src_comp = self._graph.add_component(self._src, 'my_source', params)
+        self._src_comp = self._graph.add_component(self._src, "my_source", params)
         self._msg_iter = TestOutputPortMessageIterator(
-            self._graph, self._src_comp.output_ports['out']
+            self._graph, self._src_comp.output_ports["out"]
         )
         msgs = list(self._msg_iter)
 
@@ -407,5 +393,302 @@ class AllMessagesTestCase(unittest.TestCase):
         self.assertIs(type(msg.event), bt2_event._Event)
 
 
-if __name__ == '__main__':
+class CreateDiscardedEventMessageTestCase(unittest.TestCase):
+    # Most basic case.
+    def test_create(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(supports_discarded_events=True)
+
+        def msg_iter_next(msg_iter, stream):
+            return msg_iter._create_discarded_events_message(stream)
+
+        msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertIs(type(msg), bt2._DiscardedEventsMessage)
+        self.assertIs(msg.count, None)
+
+    # With event count.
+    def test_create_with_count(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(supports_discarded_events=True)
+
+        def msg_iter_next(msg_iter, stream):
+            return msg_iter._create_discarded_events_message(stream, count=242)
+
+        msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertIs(type(msg), bt2._DiscardedEventsMessage)
+        self.assertEqual(msg.count, 242)
+
+    # With event count == 0.
+    def test_create_with_count_zero_raises(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(supports_discarded_events=True)
+
+        def msg_iter_next(msg_iter, stream):
+            with self.assertRaisesRegex(
+                ValueError,
+                "discarded event count is 0",
+            ):
+                msg_iter._create_discarded_events_message(stream, count=0)
+
+            return 123
+
+        res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertEqual(res, 123)
+
+    # With clock snapshots.
+    def test_create_with_clock_snapshots(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(
+                default_clock_class=cc,
+                supports_discarded_events=True,
+                discarded_events_have_default_clock_snapshots=True,
+            )
+
+        def msg_iter_next(msg_iter, stream):
+            return msg_iter._create_discarded_events_message(
+                stream, beg_clock_snapshot=10, end_clock_snapshot=20
+            )
+
+        msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertIs(type(msg), bt2._DiscardedEventsMessage)
+        self.assertEqual(msg.beginning_default_clock_snapshot, 10)
+        self.assertEqual(msg.end_default_clock_snapshot, 20)
+
+    # Trying to create when the stream does not support discarded events.
+    def test_create_unsupported_raises(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class()
+
+        def msg_iter_next(msg_iter, stream):
+            with self.assertRaisesRegex(
+                ValueError, "stream class does not support discarded events"
+            ):
+                msg_iter._create_discarded_events_message(stream)
+
+            return 123
+
+        res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertEqual(res, 123)
+
+    # Trying to create with clock snapshots when the stream does not support
+    # them.
+    def test_create_unsupported_clock_snapshots_raises(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(supports_discarded_events=True)
+
+        def msg_iter_next(msg_iter, stream):
+            with self.assertRaisesRegex(
+                ValueError,
+                "discarded events have no default clock snapshots for this stream class",
+            ):
+                msg_iter._create_discarded_events_message(
+                    stream, beg_clock_snapshot=10, end_clock_snapshot=20
+                )
+
+            return 123
+
+        res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertEqual(res, 123)
+
+    # Trying to create without clock snapshots when the stream requires them.
+    def test_create_missing_clock_snapshots_raises(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(
+                default_clock_class=cc,
+                supports_discarded_events=True,
+                discarded_events_have_default_clock_snapshots=True,
+            )
+
+        def msg_iter_next(msg_iter, stream):
+            with self.assertRaisesRegex(
+                ValueError,
+                "discarded events have default clock snapshots for this stream class",
+            ):
+                msg_iter._create_discarded_events_message(stream)
+
+            return 123
+
+        res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertEqual(res, 123)
+
+    # End clock snapshot greater than beginning clock snapshot.
+    def test_create_clock_snapshots_end_gt_begin_raises(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(
+                default_clock_class=cc,
+                supports_discarded_events=True,
+                discarded_events_have_default_clock_snapshots=True,
+            )
+
+        def msg_iter_next(msg_iter, stream):
+            with self.assertRaisesRegex(
+                ValueError,
+                r"beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)",
+            ):
+                msg_iter._create_discarded_events_message(
+                    stream, beg_clock_snapshot=20, end_clock_snapshot=10
+                )
+
+            return 123
+
+        res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertEqual(res, 123)
+
+
+class CreateDiscardedPacketMessageTestCase(unittest.TestCase):
+    # Most basic case.
+    def test_create(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(
+                supports_packets=True, supports_discarded_packets=True
+            )
+
+        def msg_iter_next(msg_iter, stream):
+            return msg_iter._create_discarded_packets_message(stream)
+
+        msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertIs(type(msg), bt2._DiscardedPacketsMessage)
+        self.assertIs(msg.count, None)
+
+    # With packet count.
+    def test_create_with_count(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(
+                supports_packets=True, supports_discarded_packets=True
+            )
+
+        def msg_iter_next(msg_iter, stream):
+            return msg_iter._create_discarded_packets_message(stream, count=242)
+
+        msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertIs(type(msg), bt2._DiscardedPacketsMessage)
+        self.assertEqual(msg.count, 242)
+
+    # With packet count == 0.
+    def test_create_with_count_zero_raises(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(
+                supports_packets=True, supports_discarded_packets=True
+            )
+
+        def msg_iter_next(msg_iter, stream):
+            with self.assertRaisesRegex(
+                ValueError,
+                "discarded packet count is 0",
+            ):
+                msg_iter._create_discarded_packets_message(stream, count=0)
+
+            return 123
+
+        res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertEqual(res, 123)
+
+    # With clock snapshots.
+    def test_create_with_clock_snapshots(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(
+                default_clock_class=cc,
+                supports_packets=True,
+                supports_discarded_packets=True,
+                discarded_packets_have_default_clock_snapshots=True,
+            )
+
+        def msg_iter_next(msg_iter, stream):
+            return msg_iter._create_discarded_packets_message(
+                stream, beg_clock_snapshot=10, end_clock_snapshot=20
+            )
+
+        msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertIs(type(msg), bt2._DiscardedPacketsMessage)
+        self.assertEqual(msg.beginning_default_clock_snapshot, 10)
+        self.assertEqual(msg.end_default_clock_snapshot, 20)
+
+    # Trying to create when the stream does not support discarded packets.
+    def test_create_unsupported_raises(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(
+                supports_packets=True,
+            )
+
+        def msg_iter_next(msg_iter, stream):
+            with self.assertRaisesRegex(
+                ValueError, "stream class does not support discarded packets"
+            ):
+                msg_iter._create_discarded_packets_message(stream)
+
+            return 123
+
+        res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertEqual(res, 123)
+
+    # Trying to create with clock snapshots when the stream does not support
+    # them.
+    def test_create_unsupported_clock_snapshots_raises(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(
+                supports_packets=True, supports_discarded_packets=True
+            )
+
+        def msg_iter_next(msg_iter, stream):
+            with self.assertRaisesRegex(
+                ValueError,
+                "discarded packets have no default clock snapshots for this stream class",
+            ):
+                msg_iter._create_discarded_packets_message(
+                    stream, beg_clock_snapshot=10, end_clock_snapshot=20
+                )
+
+            return 123
+
+        res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertEqual(res, 123)
+
+    # Trying to create without clock snapshots when the stream requires them.
+    def test_create_missing_clock_snapshots_raises(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(
+                default_clock_class=cc,
+                supports_packets=True,
+                supports_discarded_packets=True,
+                discarded_packets_have_default_clock_snapshots=True,
+            )
+
+        def msg_iter_next(msg_iter, stream):
+            with self.assertRaisesRegex(
+                ValueError,
+                "discarded packets have default clock snapshots for this stream class",
+            ):
+                msg_iter._create_discarded_packets_message(stream)
+
+            return 123
+
+        res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertEqual(res, 123)
+
+    # End clock snapshot greater than beginning clock snapshot.
+    def test_create_clock_snapshots_end_gt_begin_raises(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(
+                default_clock_class=cc,
+                supports_packets=True,
+                supports_discarded_packets=True,
+                discarded_packets_have_default_clock_snapshots=True,
+            )
+
+        def msg_iter_next(msg_iter, stream):
+            with self.assertRaisesRegex(
+                ValueError,
+                r"beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)",
+            ):
+                msg_iter._create_discarded_packets_message(
+                    stream, beg_clock_snapshot=20, end_clock_snapshot=10
+                )
+
+            return 123
+
+        res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertEqual(res, 123)
+
+
+if __name__ == "__main__":
     unittest.main()
This page took 0.031895 seconds and 4 git commands to generate.