Move to kernel style SPDX license identifiers
[babeltrace.git] / tests / bindings / python / bt2 / test_message.py
index 77a4ca15cfbfc6e660d7c0f6a1f35ba0127e301a..3cadfe1f77001ee8dd0e021e56d5e7fa83f8f550 100644 (file)
-from bt2 import value
-import collections
+# SPDX-License-Identifier: GPL-2.0-only
+#
+# Copyright (C) 2019 EfficiOS Inc.
+#
+
 import unittest
-import copy
 import bt2
-
-
-class _MessageTestCase(unittest.TestCase):
-    def setUp(self):
-        self._trace = bt2.Trace()
-        self._sc = bt2.StreamClass()
-        self._ec = bt2.EventClass('salut')
-        self._my_int_fc = bt2.IntegerFieldClass(32)
-        self._ec.payload_field_class = bt2.StructureFieldClass()
-        self._ec.payload_field_class += collections.OrderedDict([
-            ('my_int', self._my_int_fc),
-        ])
-        self._sc.add_event_class(self._ec)
-        self._clock_class = bt2.ClockClass('allo', 1000)
-        self._trace.add_clock_class(self._clock_class)
-        self._trace.packet_header_field_class = bt2.StructureFieldClass()
-        self._trace.packet_header_field_class += collections.OrderedDict([
-            ('hello', self._my_int_fc),
-        ])
-        self._trace.add_stream_class(self._sc)
-        self._cc_prio_map = bt2.ClockClassPriorityMap()
-        self._cc_prio_map[self._clock_class] = 231
-        self._stream = self._sc()
-        self._packet = self._stream.create_packet()
-        self._packet.header_field['hello'] = 19487
-        self._event = self._ec()
-        self._event.clock_snapshots.add(self._clock_class(1772))
-        self._event.payload_field['my_int'] = 23
-        self._event.packet = self._packet
-
-    def tearDown(self):
-        del self._trace
-        del self._sc
-        del self._ec
-        del self._my_int_fc
-        del self._clock_class
-        del self._cc_prio_map
-        del self._stream
-        del self._packet
-        del self._event
-
-
-@unittest.skip("this is broken")
-class EventMessageTestCase(_MessageTestCase):
-    def test_create_no_cc_prio_map(self):
-        msg = bt2.EventMessage(self._event)
-        self.assertEqual(msg.event.addr, self._event.addr)
-        self.assertEqual(len(msg.clock_class_priority_map), 0)
-
-    def test_create_with_cc_prio_map(self):
-        msg = bt2.EventMessage(self._event, self._cc_prio_map)
-        self.assertEqual(msg.event.addr, self._event.addr)
-        self.assertEqual(len(msg.clock_class_priority_map), 1)
-        self.assertEqual(msg.clock_class_priority_map.highest_priority_clock_class.addr,
-                         self._clock_class.addr)
-        self.assertEqual(msg.clock_class_priority_map[self._clock_class], 231)
-
-    def test_eq(self):
-        msg = bt2.EventMessage(self._event, self._cc_prio_map)
-        event_copy = copy.copy(self._event)
-        event_copy.packet = self._packet
-        cc_prio_map_copy = copy.copy(self._cc_prio_map)
-        msg2 = bt2.EventMessage(event_copy, cc_prio_map_copy)
-        self.assertEqual(msg, msg2)
-
-    def test_ne_event(self):
-        msg = bt2.EventMessage(self._event, self._cc_prio_map)
-        event_copy = copy.copy(self._event)
-        event_copy.payload_field['my_int'] = 17
-        event_copy.packet = self._packet
-        cc_prio_map_copy = copy.copy(self._cc_prio_map)
-        msg2 = bt2.EventMessage(event_copy, cc_prio_map_copy)
-        self.assertNotEqual(msg, msg2)
-
-    def test_ne_cc_prio_map(self):
-        msg = bt2.EventMessage(self._event)
-        event_copy = copy.copy(self._event)
-        event_copy.packet = self._packet
-        cc_prio_map_copy = copy.copy(self._cc_prio_map)
-        msg2 = bt2.EventMessage(event_copy, cc_prio_map_copy)
-        self.assertNotEqual(msg, msg2)
-
-    def test_eq_invalid(self):
-        msg = bt2.EventMessage(self._event)
-        self.assertNotEqual(msg, 23)
-
-    def test_copy(self):
-        msg = bt2.EventMessage(self._event, self._cc_prio_map)
-        msg2 = copy.copy(msg)
-        self.assertEqual(msg, msg2)
-
-    def test_deepcopy(self):
-        msg = bt2.EventMessage(self._event, self._cc_prio_map)
-        msg2 = copy.deepcopy(msg)
-        self.assertEqual(msg, msg2)
-
-
-@unittest.skip("this is broken")
-class PacketBeginningMessageTestCase(_MessageTestCase):
-    def test_create(self):
-        msg = bt2.PacketBeginningMessage(self._packet)
-        self.assertEqual(msg.packet.addr, self._packet.addr)
-
-    def test_eq(self):
-        msg = bt2.PacketBeginningMessage(self._packet)
-        packet_copy = copy.copy(self._packet)
-        msg2 = bt2.PacketBeginningMessage(packet_copy)
-        self.assertEqual(msg, msg2)
-
-    def test_ne_packet(self):
-        msg = bt2.PacketBeginningMessage(self._packet)
-        packet_copy = copy.copy(self._packet)
-        packet_copy.header_field['hello'] = 1847
-        msg2 = bt2.PacketBeginningMessage(packet_copy)
-        self.assertNotEqual(msg, msg2)
-
-    def test_eq_invalid(self):
-        msg = bt2.PacketBeginningMessage(self._packet)
-        self.assertNotEqual(msg, 23)
-
-    def test_copy(self):
-        msg = bt2.PacketBeginningMessage(self._packet)
-        msg2 = copy.copy(msg)
-        self.assertEqual(msg, msg2)
-
-    def test_deepcopy(self):
-        msg = bt2.PacketBeginningMessage(self._packet)
-        msg2 = copy.deepcopy(msg)
-        self.assertEqual(msg, msg2)
-
-
-@unittest.skip("this is broken")
-class PacketEndMessageTestCase(_MessageTestCase):
-    def test_create(self):
-        msg = bt2.PacketEndMessage(self._packet)
-        self.assertEqual(msg.packet.addr, self._packet.addr)
-
-    def test_eq(self):
-        msg = bt2.PacketEndMessage(self._packet)
-        packet_copy = copy.copy(self._packet)
-        msg2 = bt2.PacketEndMessage(packet_copy)
-        self.assertEqual(msg, msg2)
-
-    def test_ne_packet(self):
-        msg = bt2.PacketEndMessage(self._packet)
-        packet_copy = copy.copy(self._packet)
-        packet_copy.header_field['hello'] = 1847
-        msg2 = bt2.PacketEndMessage(packet_copy)
-        self.assertNotEqual(msg, msg2)
-
-    def test_eq_invalid(self):
-        msg = bt2.PacketEndMessage(self._packet)
-        self.assertNotEqual(msg, 23)
-
-    def test_copy(self):
-        msg = bt2.PacketEndMessage(self._packet)
-        msg2 = copy.copy(msg)
-        self.assertEqual(msg, msg2)
-
-    def test_deepcopy(self):
-        msg = bt2.PacketEndMessage(self._packet)
-        msg2 = copy.deepcopy(msg)
-        self.assertEqual(msg, msg2)
-
-
-@unittest.skip("this is broken")
-class StreamBeginningMessageTestCase(_MessageTestCase):
-    def test_create(self):
-        msg = bt2.StreamBeginningMessage(self._stream)
-        self.assertEqual(msg.stream.addr, self._stream.addr)
-
-    def test_eq(self):
-        msg = bt2.StreamBeginningMessage(self._stream)
-        stream_copy = copy.copy(self._stream)
-        msg2 = bt2.StreamBeginningMessage(stream_copy)
-        self.assertEqual(msg, msg2)
-
-    def test_ne_stream(self):
-        msg = bt2.StreamBeginningMessage(self._stream)
-        stream_copy = self._sc(name='salut')
-        msg2 = bt2.StreamBeginningMessage(stream_copy)
-        self.assertNotEqual(msg, msg2)
-
-    def test_eq_invalid(self):
-        msg = bt2.StreamBeginningMessage(self._stream)
-        self.assertNotEqual(msg, 23)
-
-    def test_copy(self):
-        msg = bt2.StreamBeginningMessage(self._stream)
-        msg2 = copy.copy(msg)
-        self.assertEqual(msg, msg2)
-
-    def test_deepcopy(self):
-        msg = bt2.StreamBeginningMessage(self._stream)
-        msg2 = copy.deepcopy(msg)
-        self.assertEqual(msg, msg2)
-
-
-@unittest.skip("this is broken")
-class StreamEndMessageTestCase(_MessageTestCase):
-    def test_create(self):
-        msg = bt2.StreamEndMessage(self._stream)
-        self.assertEqual(msg.stream.addr, self._stream.addr)
-
-    def test_eq(self):
-        msg = bt2.StreamEndMessage(self._stream)
-        stream_copy = copy.copy(self._stream)
-        msg2 = bt2.StreamEndMessage(stream_copy)
-        self.assertEqual(msg, msg2)
-
-    def test_ne_stream(self):
-        msg = bt2.StreamEndMessage(self._stream)
-        stream_copy = self._sc(name='salut')
-        msg2 = bt2.StreamEndMessage(stream_copy)
-        self.assertNotEqual(msg, msg2)
-
-    def test_eq_invalid(self):
-        msg = bt2.StreamEndMessage(self._stream)
-        self.assertNotEqual(msg, 23)
-
-    def test_copy(self):
-        msg = bt2.StreamEndMessage(self._stream)
-        msg2 = copy.copy(msg)
-        self.assertEqual(msg, msg2)
-
-    def test_deepcopy(self):
-        msg = bt2.StreamEndMessage(self._stream)
-        msg2 = copy.deepcopy(msg)
-        self.assertEqual(msg, msg2)
-
-
-@unittest.skip("this is broken")
-class InactivityMessageTestCase(unittest.TestCase):
+import utils
+from utils import TestOutputPortMessageIterator
+from bt2 import clock_snapshot as bt2_clock_snapshot
+from bt2 import event as bt2_event
+from bt2 import event_class as bt2_event_class
+from bt2 import field as bt2_field
+from bt2 import packet as bt2_packet
+from bt2 import stream as bt2_stream
+from bt2 import stream_class as bt2_stream_class
+from bt2 import trace as bt2_trace
+from bt2 import trace_class as bt2_trace_class
+
+
+class AllMessagesTestCase(unittest.TestCase):
     def setUp(self):
-        self._cc1 = bt2.ClockClass('cc1', 1000)
-        self._cc2 = bt2.ClockClass('cc2', 2000)
-        self._cc_prio_map = bt2.ClockClassPriorityMap()
-        self._cc_prio_map[self._cc1] = 25
-        self._cc_prio_map[self._cc2] = 50
-
-    def tearDown(self):
-        del self._cc1
-        del self._cc2
-        del self._cc_prio_map
-
-    def test_create_no_cc_prio_map(self):
-        msg = bt2.InactivityMessage()
-        self.assertEqual(len(msg.clock_class_priority_map), 0)
-
-    def test_create_with_cc_prio_map(self):
-        msg = bt2.InactivityMessage(self._cc_prio_map)
-        msg.clock_snapshots.add(self._cc1(123))
-        msg.clock_snapshots.add(self._cc2(19487))
-        self.assertEqual(len(msg.clock_class_priority_map), 2)
-        self.assertEqual(msg.clock_class_priority_map, self._cc_prio_map)
-        self.assertEqual(msg.clock_snapshots[self._cc1], 123)
-        self.assertEqual(msg.clock_snapshots[self._cc2], 19487)
-
-    def test_eq(self):
-        msg = bt2.InactivityMessage(self._cc_prio_map)
-        msg.clock_snapshots.add(self._cc1(123))
-        msg.clock_snapshots.add(self._cc2(19487))
-        cc_prio_map_copy = copy.copy(self._cc_prio_map)
-        msg2 = bt2.InactivityMessage(cc_prio_map_copy)
-        msg2.clock_snapshots.add(self._cc1(123))
-        msg2.clock_snapshots.add(self._cc2(19487))
-        self.assertEqual(msg, msg2)
-
-    def test_ne_cc_prio_map(self):
-        msg = bt2.InactivityMessage(self._cc_prio_map)
-        msg.clock_snapshots.add(self._cc1(123))
-        msg.clock_snapshots.add(self._cc2(19487))
-        cc_prio_map_copy = copy.copy(self._cc_prio_map)
-        cc_prio_map_copy[self._cc2] = 23
-        msg2 = bt2.InactivityMessage(cc_prio_map_copy)
-        self.assertNotEqual(msg, msg2)
-
-    def test_ne_clock_snapshot(self):
-        msg = bt2.InactivityMessage(self._cc_prio_map)
-        msg.clock_snapshots.add(self._cc1(123))
-        msg.clock_snapshots.add(self._cc2(19487))
-        msg2 = bt2.InactivityMessage(self._cc_prio_map)
-        msg.clock_snapshots.add(self._cc1(123))
-        msg.clock_snapshots.add(self._cc2(1847))
-        self.assertNotEqual(msg, msg2)
-
-    def test_eq_invalid(self):
-        msg = bt2.InactivityMessage(self._cc_prio_map)
-        self.assertNotEqual(msg, 23)
-
-    def test_copy(self):
-        msg = bt2.InactivityMessage(self._cc_prio_map)
-        msg.clock_snapshots.add(self._cc1(123))
-        msg.clock_snapshots.add(self._cc2(19487))
-        msg_copy = copy.copy(msg)
-        self.assertEqual(msg, msg_copy)
-        self.assertNotEqual(msg.addr, msg_copy.addr)
-        self.assertEqual(msg.clock_class_priority_map.addr,
-                         msg_copy.clock_class_priority_map.addr)
-        self.assertEqual(msg_copy.clock_snapshots[self._cc1], 123)
-        self.assertEqual(msg_copy.clock_snapshots[self._cc2], 19487)
-
-    def test_deepcopy(self):
-        msg = bt2.InactivityMessage(self._cc_prio_map)
-        msg.clock_snapshots.add(self._cc1(123))
-        msg.clock_snapshots.add(self._cc2(19487))
-        msg_copy = copy.deepcopy(msg)
-        self.assertEqual(msg, msg_copy)
-        self.assertNotEqual(msg.addr, msg_copy.addr)
-        self.assertNotEqual(msg.clock_class_priority_map.addr,
-                            msg_copy.clock_class_priority_map.addr)
-        self.assertEqual(msg.clock_class_priority_map,
-                         msg_copy.clock_class_priority_map)
-        self.assertNotEqual(list(msg.clock_class_priority_map)[0].addr,
-                            list(msg_copy.clock_class_priority_map)[0].addr)
-        self.assertIsNone(msg_copy.clock_snapshots[self._cc1])
-        self.assertIsNone(msg_copy.clock_snapshots[self._cc2])
-        self.assertEqual(msg_copy.clock_snapshots[list(msg_copy.clock_class_priority_map)[0]], 123)
-        self.assertEqual(msg_copy.clock_snapshots[list(msg_copy.clock_class_priority_map)[1]], 19487)
-
-
-@unittest.skip("this is broken")
-class DiscardedPacketsMessageTestCase(unittest.TestCase):
-    def setUp(self):
-        self._trace = bt2.Trace()
-        self._sc = bt2.StreamClass()
-        self._ec = bt2.EventClass('salut')
-        self._clock_class = bt2.ClockClass('yo', 1000)
-        self._uint64_int_fc = bt2.IntegerFieldClass(64, mapped_clock_class=self._clock_class)
-        self._my_int_fc = bt2.IntegerFieldClass(32)
-        self._ec.payload_field_class = bt2.StructureFieldClass()
-        self._ec.payload_field_class += collections.OrderedDict([
-            ('my_int', self._my_int_fc),
-        ])
-        self._sc.add_event_class(self._ec)
-        self._sc.packet_context_field_class = bt2.StructureFieldClass()
-        self._sc.packet_context_field_class += collections.OrderedDict([
-            ('packet_seq_num', self._my_int_fc),
-            ('timestamp_begin', self._uint64_int_fc),
-            ('timestamp_end', self._uint64_int_fc),
-        ])
-        self._trace.add_clock_class(self._clock_class)
-        self._trace.add_stream_class(self._sc)
-        self._stream = self._sc()
-
-    def tearDown(self):
-        del self._trace
-        del self._sc
-        del self._ec
-        del self._clock_class
-        del self._uint64_int_fc
-        del self._my_int_fc
-        del self._stream
-
-    def _create_event(self, packet):
-        event = self._ec()
-        event.payload_field['my_int'] = 23
-        event.packet = packet
-        return event
-
-    def _get_msg(self):
         class MyIter(bt2._UserMessageIterator):
-            def __init__(iter_self):
-                packet1 = self._stream.create_packet()
-                packet1.context_field['packet_seq_num'] = 0
-                packet1.context_field['timestamp_begin'] = 3
-                packet1.context_field['timestamp_end'] = 6
-                packet2 = self._stream.create_packet()
-                packet2.context_field['packet_seq_num'] = 5
-                packet2.context_field['timestamp_begin'] = 7
-                packet2.context_field['timestamp_end'] = 10
-                iter_self._ev1 = self._create_event(packet1)
-                iter_self._ev2 = self._create_event(packet2)
-                iter_self._at = 0
+            def __init__(self, config, self_port_output):
+                self._at = 0
+                self._with_stream_msgs_clock_snapshots = self_port_output.user_data.get(
+                    'with_stream_msgs_clock_snapshots', False
+                )
 
             def __next__(self):
-                if self._at == 0:
-                    msg = bt2.EventMessage(self._ev1)
-                elif self._at == 1:
-                    msg = bt2.EventMessage(self._ev2)
+                if test_obj._clock_class:
+                    if self._at == 0:
+                        if self._with_stream_msgs_clock_snapshots:
+                            msg = self._create_stream_beginning_message(
+                                test_obj._stream, default_clock_snapshot=self._at
+                            )
+                        else:
+                            msg = self._create_stream_beginning_message(
+                                test_obj._stream
+                            )
+                        test_obj.assertIs(type(msg), bt2._StreamBeginningMessage)
+                    elif self._at == 1:
+                        msg = self._create_packet_beginning_message(
+                            test_obj._packet, self._at
+                        )
+                        test_obj.assertIs(type(msg), bt2._PacketBeginningMessage)
+                    elif self._at == 2:
+                        msg = self._create_event_message(
+                            test_obj._event_class, test_obj._packet, self._at
+                        )
+                        test_obj.assertIs(type(msg), bt2._EventMessage)
+                    elif self._at == 3:
+                        msg = self._create_message_iterator_inactivity_message(
+                            test_obj._clock_class, self._at
+                        )
+                    elif self._at == 4:
+                        msg = self._create_discarded_events_message(
+                            test_obj._stream, 890, self._at, self._at
+                        )
+                        test_obj.assertIs(type(msg), bt2._DiscardedEventsMessage)
+                    elif self._at == 5:
+                        msg = self._create_packet_end_message(
+                            test_obj._packet, self._at
+                        )
+                        test_obj.assertIs(type(msg), bt2._PacketEndMessage)
+                    elif self._at == 6:
+                        msg = self._create_discarded_packets_message(
+                            test_obj._stream, 678, self._at, self._at
+                        )
+                        test_obj.assertIs(type(msg), bt2._DiscardedPacketsMessage)
+                    elif self._at == 7:
+                        if self._with_stream_msgs_clock_snapshots:
+                            msg = self._create_stream_end_message(
+                                test_obj._stream, default_clock_snapshot=self._at
+                            )
+                        else:
+                            msg = self._create_stream_end_message(test_obj._stream)
+                        test_obj.assertIs(type(msg), bt2._StreamEndMessage)
+                    elif self._at >= 8:
+                        raise bt2.Stop
                 else:
-                    raise bt2.Stop
+                    if self._at == 0:
+                        msg = self._create_stream_beginning_message(test_obj._stream)
+                    elif self._at == 1:
+                        msg = self._create_packet_beginning_message(test_obj._packet)
+                    elif self._at == 2:
+                        msg = self._create_event_message(
+                            test_obj._event_class, test_obj._packet
+                        )
+                    elif self._at == 3:
+                        msg = self._create_discarded_events_message(
+                            test_obj._stream, 890
+                        )
+                    elif self._at == 4:
+                        msg = self._create_packet_end_message(test_obj._packet)
+                    elif self._at == 5:
+                        msg = self._create_discarded_packets_message(
+                            test_obj._stream, 678
+                        )
+                    elif self._at == 6:
+                        msg = self._create_stream_end_message(test_obj._stream)
+                    elif self._at >= 7:
+                        raise bt2.Stop
 
                 self._at += 1
                 return msg
 
-        class MySource(bt2._UserSourceComponent,
-                       message_iterator_class=MyIter):
-            def __init__(self, params):
-                self._add_output_port('out')
-
-        class MySink(bt2._UserSinkComponent):
-            def __init__(self, params):
-                self._add_input_port('in')
-
-            def _consume(comp_self):
-                nonlocal the_msg
-                msg = next(comp_self._msg_iter)
-
-                if type(msg) is bt2._DiscardedPacketsMessage:
-                    the_msg = msg
-                    raise bt2.Stop
-
-            def _port_connected(self, port, other_port):
-                self._msg_iter = port.connection.create_message_iterator()
-
-        the_msg = None
-        graph = bt2.Graph()
-        src = graph.add_component(MySource, 'src')
-        sink = graph.add_component(MySink, 'sink')
-        conn = graph.connect_ports(src.output_ports['out'],
-                                   sink.input_ports['in'])
-        graph.run()
-        return the_msg
-
-    def test_create(self):
-        self.assertIsInstance(self._get_msg(), bt2._DiscardedPacketsMessage)
-
-    def test_count(self):
-        self.assertEqual(self._get_msg().count, 4)
-
-    def test_stream(self):
-        self.assertEqual(self._get_msg().stream.addr, self._stream.addr)
-
-    def test_beginning_clock_snapshot(self):
-        msg = self._get_msg()
-        beginning_clock_snapshot = msg.beginning_clock_snapshot
-        self.assertEqual(beginning_clock_snapshot.clock_class, self._clock_class)
-        self.assertEqual(beginning_clock_snapshot, 6)
-
-    def test_end_clock_snapshot(self):
-        msg = self._get_msg()
-        end_clock_snapshot = msg.end_clock_snapshot
-        self.assertEqual(end_clock_snapshot.clock_class, self._clock_class)
-        self.assertEqual(end_clock_snapshot, 7)
-
-    def test_eq(self):
-        msg1 = self._get_msg()
-        msg2 = self._get_msg()
-        self.assertEqual(msg1, msg2)
-
-    def test_eq_invalid(self):
-        msg1 = self._get_msg()
-        self.assertNotEqual(msg1, 23)
-
+        class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
+            def __init__(self, config, params, obj):
+                self._add_output_port('out', params)
 
-@unittest.skip("this is broken")
-class DiscardedEventsMessageTestCase(unittest.TestCase):
-    def setUp(self):
-        self._trace = bt2.Trace()
-        self._sc = bt2.StreamClass()
-        self._ec = bt2.EventClass('salut')
-        self._clock_class = bt2.ClockClass('yo', 1000)
-        self._uint64_int_fc = bt2.IntegerFieldClass(64, mapped_clock_class=self._clock_class)
-        self._my_int_fc = bt2.IntegerFieldClass(32)
-        self._ec.payload_field_class = bt2.StructureFieldClass()
-        self._ec.payload_field_class += collections.OrderedDict([
-            ('my_int', self._my_int_fc),
-        ])
-        self._sc.add_event_class(self._ec)
-        self._sc.packet_context_field_class = bt2.StructureFieldClass()
-        self._sc.packet_context_field_class += collections.OrderedDict([
-            ('events_discarded', self._my_int_fc),
-            ('timestamp_begin', self._uint64_int_fc),
-            ('timestamp_end', self._uint64_int_fc),
-        ])
-        self._trace.add_clock_class(self._clock_class)
-        self._trace.add_stream_class(self._sc)
-        self._stream = self._sc()
-
-    def tearDown(self):
-        del self._trace
-        del self._sc
-        del self._ec
-        del self._clock_class
-        del self._uint64_int_fc
-        del self._my_int_fc
-        del self._stream
-
-    def _create_event(self, packet):
-        event = self._ec()
-        event.payload_field['my_int'] = 23
-        event.packet = packet
-        return event
-
-    def _get_msg(self):
-        class MyIter(bt2._UserMessageIterator):
-            def __init__(iter_self):
-                packet1 = self._stream.create_packet()
-                packet1.context_field['events_discarded'] = 0
-                packet1.context_field['timestamp_begin'] = 3
-                packet1.context_field['timestamp_end'] = 6
-                packet2 = self._stream.create_packet()
-                packet2.context_field['events_discarded'] = 10
-                packet2.context_field['timestamp_begin'] = 7
-                packet2.context_field['timestamp_end'] = 10
-                iter_self._ev1 = self._create_event(packet1)
-                iter_self._ev2 = self._create_event(packet2)
-                iter_self._at = 0
-
-            def __next__(self):
-                if self._at == 0:
-                    msg = bt2.EventMessage(self._ev1)
-                elif self._at == 1:
-                    msg = bt2.EventMessage(self._ev2)
+                with_cc = bool(params['with_cc'])
+                tc = self._create_trace_class()
+                if with_cc:
+                    cc = self._create_clock_class()
                 else:
-                    raise bt2.Stop
-
-                self._at += 1
-                return msg
-
-        class MySource(bt2._UserSourceComponent,
-                       message_iterator_class=MyIter):
-            def __init__(self, params):
-                self._add_output_port('out')
-
-        class MySink(bt2._UserSinkComponent):
-            def __init__(self, params):
-                self._add_input_port('in')
-
-            def _consume(comp_self):
-                nonlocal the_msg
-                msg = next(comp_self._msg_iter)
-
-                if type(msg) is bt2._DiscardedEventsMessage:
-                    the_msg = msg
-                    raise bt2.Stop
-
-            def _port_connected(self, port, other_port):
-                self._msg_iter = port.connection.create_message_iterator()
-
-        the_msg = None
-        graph = bt2.Graph()
-        src = graph.add_component(MySource, 'src')
-        sink = graph.add_component(MySink, 'sink')
-        conn = graph.connect_ports(src.output_ports['out'],
-                                   sink.input_ports['in'])
-        graph.run()
-        return the_msg
-
-    def test_create(self):
-        self.assertIsInstance(self._get_msg(), bt2._DiscardedEventsMessage)
-
-    def test_count(self):
-        self.assertEqual(self._get_msg().count, 10)
-
-    def test_stream(self):
-        self.assertEqual(self._get_msg().stream.addr, self._stream.addr)
-
-    def test_beginning_clock_snapshot(self):
-        msg = self._get_msg()
-        beginning_clock_snapshot = msg.beginning_clock_snapshot
-        self.assertEqual(beginning_clock_snapshot.clock_class, self._clock_class)
-        self.assertEqual(beginning_clock_snapshot, 6)
-
-    def test_end_clock_snapshot(self):
-        msg = self._get_msg()
-        end_clock_snapshot = msg.end_clock_snapshot
-        self.assertEqual(end_clock_snapshot.clock_class, self._clock_class)
-        self.assertEqual(end_clock_snapshot, 10)
-
-    def test_eq(self):
-        msg1 = self._get_msg()
-        msg2 = self._get_msg()
-        self.assertEqual(msg1, msg2)
-
-    def test_eq_invalid(self):
-        msg1 = self._get_msg()
-        self.assertNotEqual(msg1, 23)
+                    cc = None
+
+                sc = tc.create_stream_class(
+                    default_clock_class=cc,
+                    supports_packets=True,
+                    packets_have_beginning_default_clock_snapshot=with_cc,
+                    packets_have_end_default_clock_snapshot=with_cc,
+                    supports_discarded_events=True,
+                    discarded_events_have_default_clock_snapshots=with_cc,
+                    supports_discarded_packets=True,
+                    discarded_packets_have_default_clock_snapshots=with_cc,
+                )
+
+                # Create payload field class
+                my_int_fc = tc.create_signed_integer_field_class(32)
+                payload_fc = tc.create_structure_field_class()
+                payload_fc += [('my_int', my_int_fc)]
+
+                # Create specific context field class
+                my_int_fc = tc.create_signed_integer_field_class(32)
+                specific_fc = tc.create_structure_field_class()
+                specific_fc += [('my_int', my_int_fc)]
+
+                ec = sc.create_event_class(
+                    name='salut',
+                    payload_field_class=payload_fc,
+                    specific_context_field_class=specific_fc,
+                )
+
+                trace = tc()
+                stream = trace.create_stream(sc)
+                packet = stream.create_packet()
+
+                test_obj._trace = trace
+                test_obj._stream = stream
+                test_obj._packet = packet
+                test_obj._event_class = ec
+                test_obj._clock_class = cc
+
+        test_obj = self
+        self._graph = bt2.Graph()
+        self._src = MySrc
+        self._iter = MyIter
+
+    def test_all_msg_with_cc(self):
+        params = {'with_cc': True}
+        self._src_comp = self._graph.add_component(self._src, 'my_source', params)
+        self._msg_iter = TestOutputPortMessageIterator(
+            self._graph, self._src_comp.output_ports['out']
+        )
+
+        for i, msg in enumerate(self._msg_iter):
+            if i == 0:
+                self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
+                self.assertIs(type(msg.stream), bt2_stream._StreamConst)
+                self.assertEqual(msg.stream.addr, self._stream.addr)
+                self.assertIsInstance(
+                    msg.default_clock_snapshot, bt2._UnknownClockSnapshot
+                )
+            elif i == 1:
+                self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
+                self.assertIs(type(msg.packet), bt2_packet._PacketConst)
+                self.assertIs(
+                    type(msg.default_clock_snapshot),
+                    bt2_clock_snapshot._ClockSnapshotConst,
+                )
+                self.assertEqual(msg.packet.addr, self._packet.addr)
+                self.assertEqual(msg.default_clock_snapshot.value, i)
+            elif i == 2:
+                self.assertIs(type(msg), bt2._EventMessageConst)
+                self.assertIs(type(msg.event), bt2_event._EventConst)
+                self.assertIs(
+                    type(msg.default_clock_snapshot),
+                    bt2_clock_snapshot._ClockSnapshotConst,
+                )
+                self.assertIs(
+                    type(msg.event.payload_field), bt2_field._StructureFieldConst
+                )
+                self.assertIs(
+                    type(msg.event.payload_field['my_int']),
+                    bt2_field._SignedIntegerFieldConst,
+                )
+
+                self.assertEqual(msg.event.cls.addr, self._event_class.addr)
+                self.assertEqual(msg.default_clock_snapshot.value, i)
+            elif i == 3:
+                self.assertIs(type(msg), bt2._MessageIteratorInactivityMessageConst)
+                self.assertIs(
+                    type(msg.clock_snapshot), bt2_clock_snapshot._ClockSnapshotConst
+                )
+                self.assertEqual(msg.clock_snapshot.value, i)
+            elif i == 4:
+                self.assertIs(type(msg), bt2._DiscardedEventsMessageConst)
+                self.assertIs(type(msg.stream), bt2_stream._StreamConst)
+                self.assertIs(type(msg.stream.cls), bt2_stream_class._StreamClassConst)
+                self.assertIs(
+                    type(msg.beginning_default_clock_snapshot),
+                    bt2_clock_snapshot._ClockSnapshotConst,
+                )
+                self.assertIs(
+                    type(msg.end_default_clock_snapshot),
+                    bt2_clock_snapshot._ClockSnapshotConst,
+                )
+
+                self.assertEqual(msg.stream.addr, self._stream.addr)
+                self.assertEqual(msg.count, 890)
+                self.assertEqual(
+                    msg.stream.cls.default_clock_class.addr, self._clock_class.addr
+                )
+                self.assertEqual(msg.beginning_default_clock_snapshot.value, i)
+                self.assertEqual(msg.end_default_clock_snapshot.value, i)
+            elif i == 5:
+                self.assertIs(type(msg), bt2._PacketEndMessageConst)
+                self.assertIs(type(msg.packet), bt2_packet._PacketConst)
+                self.assertIs(
+                    type(msg.default_clock_snapshot),
+                    bt2_clock_snapshot._ClockSnapshotConst,
+                )
+                self.assertEqual(msg.packet.addr, self._packet.addr)
+                self.assertEqual(msg.default_clock_snapshot.value, i)
+            elif i == 6:
+                self.assertIs(type(msg), bt2._DiscardedPacketsMessageConst)
+                self.assertIs(type(msg.stream), bt2_stream._StreamConst)
+                self.assertIs(type(msg.stream.trace), bt2_trace._TraceConst)
+                self.assertIs(
+                    type(msg.stream.trace.cls), bt2_trace_class._TraceClassConst
+                )
+                self.assertIs(
+                    type(msg.beginning_default_clock_snapshot),
+                    bt2_clock_snapshot._ClockSnapshotConst,
+                )
+                self.assertIs(
+                    type(msg.end_default_clock_snapshot),
+                    bt2_clock_snapshot._ClockSnapshotConst,
+                )
+                self.assertEqual(msg.stream.addr, self._stream.addr)
+                self.assertEqual(msg.count, 678)
+                self.assertEqual(
+                    msg.stream.cls.default_clock_class.addr, self._clock_class.addr
+                )
+                self.assertEqual(msg.beginning_default_clock_snapshot.value, i)
+                self.assertEqual(msg.end_default_clock_snapshot.value, i)
+            elif i == 7:
+                self.assertIs(type(msg), bt2._StreamEndMessageConst)
+                self.assertIs(type(msg.stream), bt2_stream._StreamConst)
+                self.assertEqual(msg.stream.addr, self._stream.addr)
+                self.assertIs(
+                    type(msg.default_clock_snapshot), bt2._UnknownClockSnapshot
+                )
+            else:
+                raise Exception
+
+    def test_all_msg_without_cc(self):
+        params = {'with_cc': False}
+        self._src_comp = self._graph.add_component(self._src, 'my_source', params)
+        self._msg_iter = TestOutputPortMessageIterator(
+            self._graph, self._src_comp.output_ports['out']
+        )
+
+        for i, msg in enumerate(self._msg_iter):
+            if i == 0:
+                self.assertIsInstance(msg, bt2._StreamBeginningMessageConst)
+                self.assertIs(type(msg.stream), bt2_stream._StreamConst)
+                self.assertEqual(msg.stream.addr, self._stream.addr)
+                with self.assertRaisesRegex(
+                    ValueError, 'stream class has no default clock class'
+                ):
+                    msg.default_clock_snapshot
+            elif i == 1:
+                self.assertIsInstance(msg, bt2._PacketBeginningMessageConst)
+                self.assertIs(type(msg.packet), bt2_packet._PacketConst)
+                self.assertEqual(msg.packet.addr, self._packet.addr)
+            elif i == 2:
+                self.assertIsInstance(msg, bt2._EventMessageConst)
+                self.assertIs(type(msg.event), bt2_event._EventConst)
+                self.assertIs(type(msg.event.cls), bt2_event_class._EventClassConst)
+                self.assertEqual(msg.event.cls.addr, self._event_class.addr)
+                with self.assertRaisesRegex(
+                    ValueError, 'stream class has no default clock class'
+                ):
+                    msg.default_clock_snapshot
+            elif i == 3:
+                self.assertIsInstance(msg, bt2._DiscardedEventsMessageConst)
+                self.assertIs(type(msg.stream), bt2_stream._StreamConst)
+                self.assertIs(type(msg.stream.cls), bt2_stream_class._StreamClassConst)
+                self.assertEqual(msg.stream.addr, self._stream.addr)
+                self.assertEqual(msg.count, 890)
+                self.assertIsNone(msg.stream.cls.default_clock_class)
+                with self.assertRaisesRegex(
+                    ValueError,
+                    'such a message has no clock snapshots for this stream class',
+                ):
+                    msg.beginning_default_clock_snapshot
+                with self.assertRaisesRegex(
+                    ValueError,
+                    'such a message has no clock snapshots for this stream class',
+                ):
+                    msg.end_default_clock_snapshot
+            elif i == 4:
+                self.assertIsInstance(msg, bt2._PacketEndMessageConst)
+                self.assertEqual(msg.packet.addr, self._packet.addr)
+                self.assertIs(type(msg.packet), bt2_packet._PacketConst)
+            elif i == 5:
+                self.assertIsInstance(msg, bt2._DiscardedPacketsMessageConst)
+                self.assertIs(type(msg.stream), bt2_stream._StreamConst)
+                self.assertIs(type(msg.stream.cls), bt2_stream_class._StreamClassConst)
+                self.assertIs(
+                    type(msg.stream.cls.trace_class), bt2_trace_class._TraceClassConst
+                )
+                self.assertEqual(msg.stream.addr, self._stream.addr)
+                self.assertEqual(msg.count, 678)
+                self.assertIsNone(msg.stream.cls.default_clock_class)
+                with self.assertRaisesRegex(
+                    ValueError,
+                    'such a message has no clock snapshots for this stream class',
+                ):
+                    msg.beginning_default_clock_snapshot
+                with self.assertRaisesRegex(
+                    ValueError,
+                    'such a message has no clock snapshots for this stream class',
+                ):
+                    msg.end_default_clock_snapshot
+            elif i == 6:
+                self.assertIsInstance(msg, bt2._StreamEndMessageConst)
+                self.assertIs(type(msg.stream), bt2_stream._StreamConst)
+                self.assertEqual(msg.stream.addr, self._stream.addr)
+                with self.assertRaisesRegex(
+                    ValueError, 'stream class has no default clock class'
+                ):
+                    msg.default_clock_snapshot
+            else:
+                raise Exception
+
+    def test_msg_stream_with_clock_snapshots(self):
+        params = {'with_cc': True, 'with_stream_msgs_clock_snapshots': True}
+
+        self._src_comp = self._graph.add_component(self._src, 'my_source', params)
+        self._msg_iter = TestOutputPortMessageIterator(
+            self._graph, self._src_comp.output_ports['out']
+        )
+        msgs = list(self._msg_iter)
+
+        msg_stream_beg = msgs[0]
+        self.assertIsInstance(msg_stream_beg, bt2._StreamBeginningMessageConst)
+        self.assertIs(
+            type(msg_stream_beg.default_clock_snapshot),
+            bt2_clock_snapshot._ClockSnapshotConst,
+        )
+        self.assertEqual(msg_stream_beg.default_clock_snapshot.value, 0)
+
+        msg_stream_end = msgs[7]
+        self.assertIsInstance(msg_stream_end, bt2._StreamEndMessageConst)
+        self.assertIs(
+            type(msg_stream_end.default_clock_snapshot),
+            bt2_clock_snapshot._ClockSnapshotConst,
+        )
+        self.assertEqual(msg_stream_end.default_clock_snapshot.value, 7)
+
+    def test_stream_beg_msg(self):
+        msg = utils.get_stream_beginning_message()
+        self.assertIs(type(msg.stream), bt2_stream._Stream)
+
+    def test_stream_end_msg(self):
+        msg = utils.get_stream_end_message()
+        self.assertIs(type(msg.stream), bt2_stream._Stream)
+
+    def test_packet_beg_msg(self):
+        msg = utils.get_packet_beginning_message()
+        self.assertIs(type(msg.packet), bt2_packet._Packet)
+
+    def test_packet_end_msg(self):
+        msg = utils.get_packet_end_message()
+        self.assertIs(type(msg.packet), bt2_packet._Packet)
+
+    def test_event_msg(self):
+        msg = utils.get_event_message()
+        self.assertIs(type(msg.event), bt2_event._Event)
+
+
+if __name__ == '__main__':
+    unittest.main()
This page took 0.031208 seconds and 4 git commands to generate.