tests: add tests for discarded events/packets creation
[babeltrace.git] / tests / bindings / python / bt2 / test_message.py
index 6000a0fa972404e3b0cd55c0f38a8ddfef680bf1..96367ac64592c5bf54038480cf2df6b1c2df2b58 100644 (file)
@@ -34,7 +34,7 @@ from bt2 import trace_class as bt2_trace_class
 class AllMessagesTestCase(unittest.TestCase):
     def setUp(self):
         class MyIter(bt2._UserMessageIterator):
-            def __init__(self, self_port_output):
+            def __init__(self, config, self_port_output):
                 self._at = 0
                 self._with_stream_msgs_clock_snapshots = self_port_output.user_data.get(
                     'with_stream_msgs_clock_snapshots', False
@@ -119,7 +119,7 @@ class AllMessagesTestCase(unittest.TestCase):
                 return msg
 
         class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
-            def __init__(self, params, obj):
+            def __init__(self, config, params, obj):
                 self._add_output_port('out', params)
 
                 with_cc = bool(params['with_cc'])
@@ -215,10 +215,9 @@ class AllMessagesTestCase(unittest.TestCase):
             elif i == 3:
                 self.assertIs(type(msg), bt2._MessageIteratorInactivityMessageConst)
                 self.assertIs(
-                    type(msg.default_clock_snapshot),
-                    bt2_clock_snapshot._ClockSnapshotConst,
+                    type(msg.clock_snapshot), bt2_clock_snapshot._ClockSnapshotConst
                 )
-                self.assertEqual(msg.default_clock_snapshot.value, i)
+                self.assertEqual(msg.clock_snapshot.value, i)
             elif i == 4:
                 self.assertIs(type(msg), bt2._DiscardedEventsMessageConst)
                 self.assertIs(type(msg.stream), bt2_stream._StreamConst)
@@ -407,5 +406,220 @@ class AllMessagesTestCase(unittest.TestCase):
         self.assertIs(type(msg.event), bt2_event._Event)
 
 
+class CreateDiscardedEventMessageTestCase(unittest.TestCase):
+    # Most basic case.
+    def test_create(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(supports_discarded_events=True)
+
+        def msg_iter_next(msg_iter, stream):
+            return msg_iter._create_discarded_events_message(stream)
+
+        msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertIs(type(msg), bt2._DiscardedEventsMessage)
+        # Broken at the moment.
+        # self.assertIs(msg.count, None)
+
+    # With event count.
+    def test_create_with_count(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(supports_discarded_events=True)
+
+        def msg_iter_next(msg_iter, stream):
+            return msg_iter._create_discarded_events_message(stream, count=242)
+
+        msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertIs(type(msg), bt2._DiscardedEventsMessage)
+        # Broken at the moment.
+        # self.assertEqual(msg.count, 242)
+
+    # With clock snapshots.
+    def test_create_with_clock_snapshots(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(
+                default_clock_class=cc,
+                supports_discarded_events=True,
+                discarded_events_have_default_clock_snapshots=True,
+            )
+
+        def msg_iter_next(msg_iter, stream):
+            return msg_iter._create_discarded_events_message(
+                stream, beg_clock_snapshot=10, end_clock_snapshot=20
+            )
+
+        msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertIs(type(msg), bt2._DiscardedEventsMessage)
+        # Broken at the moment.
+        # self.assertEqual(msg.beginning_default_clock_snapshot, 10);
+        # self.assertEqual(msg.end_default_clock_snapshot, 20);
+
+    # Trying to create when the stream does not support discarded events.
+    def test_create_unsupported_raises(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class()
+
+        def msg_iter_next(msg_iter, stream):
+            with self.assertRaisesRegex(
+                ValueError, 'stream class does not support discarded events'
+            ):
+                msg_iter._create_discarded_events_message(stream)
+
+            return 123
+
+        res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertEqual(res, 123)
+
+    # Trying to create with clock snapshots when the stream does not support
+    # them.
+    def test_create_unsupported_clock_snapshots_raises(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(supports_discarded_events=True)
+
+        def msg_iter_next(msg_iter, stream):
+            with self.assertRaisesRegex(
+                ValueError,
+                'discarded events have no default clock snapshots for this stream class',
+            ):
+                msg_iter._create_discarded_events_message(
+                    stream, beg_clock_snapshot=10, end_clock_snapshot=20
+                )
+
+            return 123
+
+        res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertEqual(res, 123)
+
+    # Trying to create without clock snapshots when the stream requires them.
+    def test_create_missing_clock_snapshots_raises(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(
+                default_clock_class=cc,
+                supports_discarded_events=True,
+                discarded_events_have_default_clock_snapshots=True,
+            )
+
+        def msg_iter_next(msg_iter, stream):
+            with self.assertRaisesRegex(
+                ValueError,
+                'discarded events have default clock snapshots for this stream class',
+            ):
+                msg_iter._create_discarded_events_message(stream)
+
+            return 123
+
+        res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertEqual(res, 123)
+
+
+class CreateDiscardedPacketMessageTestCase(unittest.TestCase):
+    # Most basic case.
+    def test_create(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(
+                supports_packets=True, supports_discarded_packets=True
+            )
+
+        def msg_iter_next(msg_iter, stream):
+            return msg_iter._create_discarded_packets_message(stream)
+
+        msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertIs(type(msg), bt2._DiscardedPacketsMessage)
+        self.assertIs(msg.count, None)
+
+    # With packet count.
+    def test_create_with_count(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(
+                supports_packets=True, supports_discarded_packets=True
+            )
+
+        def msg_iter_next(msg_iter, stream):
+            return msg_iter._create_discarded_packets_message(stream, count=242)
+
+        msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertIs(type(msg), bt2._DiscardedPacketsMessage)
+        self.assertEqual(msg.count, 242)
+
+    # With clock snapshots.
+    def test_create_with_clock_snapshots(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(
+                default_clock_class=cc,
+                supports_packets=True,
+                supports_discarded_packets=True,
+                discarded_packets_have_default_clock_snapshots=True,
+            )
+
+        def msg_iter_next(msg_iter, stream):
+            return msg_iter._create_discarded_packets_message(
+                stream, beg_clock_snapshot=10, end_clock_snapshot=20
+            )
+
+        msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertIs(type(msg), bt2._DiscardedPacketsMessage)
+        self.assertEqual(msg.beginning_default_clock_snapshot, 10)
+        self.assertEqual(msg.end_default_clock_snapshot, 20)
+
+    # Trying to create when the stream does not support discarded packets.
+    def test_create_unsupported_raises(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(supports_packets=True,)
+
+        def msg_iter_next(msg_iter, stream):
+            with self.assertRaisesRegex(
+                ValueError, 'stream class does not support discarded packets'
+            ):
+                msg_iter._create_discarded_packets_message(stream)
+
+            return 123
+
+        res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertEqual(res, 123)
+
+    # Trying to create with clock snapshots when the stream does not support
+    # them.
+    def test_create_unsupported_clock_snapshots_raises(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(
+                supports_packets=True, supports_discarded_packets=True
+            )
+
+        def msg_iter_next(msg_iter, stream):
+            with self.assertRaisesRegex(
+                ValueError,
+                'discarded packets have no default clock snapshots for this stream class',
+            ):
+                msg_iter._create_discarded_packets_message(
+                    stream, beg_clock_snapshot=10, end_clock_snapshot=20
+                )
+
+            return 123
+
+        res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertEqual(res, 123)
+
+    # Trying to create without clock snapshots when the stream requires them.
+    def test_create_missing_clock_snapshots_raises(self):
+        def create_stream_class(tc, cc):
+            return tc.create_stream_class(
+                default_clock_class=cc,
+                supports_packets=True,
+                supports_discarded_packets=True,
+                discarded_packets_have_default_clock_snapshots=True,
+            )
+
+        def msg_iter_next(msg_iter, stream):
+            with self.assertRaisesRegex(
+                ValueError,
+                'discarded packets have default clock snapshots for this stream class',
+            ):
+                msg_iter._create_discarded_packets_message(stream)
+
+            return 123
+
+        res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+        self.assertEqual(res, 123)
+
+
 if __name__ == '__main__':
     unittest.main()
This page took 0.026442 seconds and 4 git commands to generate.