-from bt2 import values
-import collections
+# SPDX-License-Identifier: GPL-2.0-only
+#
+# Copyright (C) 2019 EfficiOS Inc.
+#
+
import unittest
-import copy
import bt2
-
-
-class _MessageTestCase(unittest.TestCase):
- def setUp(self):
- self._trace = bt2.Trace()
- self._sc = bt2.StreamClass()
- self._ec = bt2.EventClass('salut')
- self._my_int_ft = bt2.IntegerFieldType(32)
- self._ec.payload_field_type = bt2.StructureFieldType()
- self._ec.payload_field_type += collections.OrderedDict([
- ('my_int', self._my_int_ft),
- ])
- self._sc.add_event_class(self._ec)
- self._clock_class = bt2.ClockClass('allo', 1000)
- self._trace.add_clock_class(self._clock_class)
- self._trace.packet_header_field_type = bt2.StructureFieldType()
- self._trace.packet_header_field_type += collections.OrderedDict([
- ('hello', self._my_int_ft),
- ])
- self._trace.add_stream_class(self._sc)
- self._cc_prio_map = bt2.ClockClassPriorityMap()
- self._cc_prio_map[self._clock_class] = 231
- self._stream = self._sc()
- self._packet = self._stream.create_packet()
- self._packet.header_field['hello'] = 19487
- self._event = self._ec()
- self._event.clock_values.add(self._clock_class(1772))
- self._event.payload_field['my_int'] = 23
- self._event.packet = self._packet
-
- def tearDown(self):
- del self._trace
- del self._sc
- del self._ec
- del self._my_int_ft
- del self._clock_class
- del self._cc_prio_map
- del self._stream
- del self._packet
- del self._event
-
-
-@unittest.skip("this is broken")
-class EventMessageTestCase(_MessageTestCase):
- def test_create_no_cc_prio_map(self):
- msg = bt2.EventMessage(self._event)
- self.assertEqual(msg.event.addr, self._event.addr)
- self.assertEqual(len(msg.clock_class_priority_map), 0)
-
- def test_create_with_cc_prio_map(self):
- msg = bt2.EventMessage(self._event, self._cc_prio_map)
- self.assertEqual(msg.event.addr, self._event.addr)
- self.assertEqual(len(msg.clock_class_priority_map), 1)
- self.assertEqual(msg.clock_class_priority_map.highest_priority_clock_class.addr,
- self._clock_class.addr)
- self.assertEqual(msg.clock_class_priority_map[self._clock_class], 231)
-
- def test_eq(self):
- msg = bt2.EventMessage(self._event, self._cc_prio_map)
- event_copy = copy.copy(self._event)
- event_copy.packet = self._packet
- cc_prio_map_copy = copy.copy(self._cc_prio_map)
- msg2 = bt2.EventMessage(event_copy, cc_prio_map_copy)
- self.assertEqual(msg, msg2)
-
- def test_ne_event(self):
- msg = bt2.EventMessage(self._event, self._cc_prio_map)
- event_copy = copy.copy(self._event)
- event_copy.payload_field['my_int'] = 17
- event_copy.packet = self._packet
- cc_prio_map_copy = copy.copy(self._cc_prio_map)
- msg2 = bt2.EventMessage(event_copy, cc_prio_map_copy)
- self.assertNotEqual(msg, msg2)
-
- def test_ne_cc_prio_map(self):
- msg = bt2.EventMessage(self._event)
- event_copy = copy.copy(self._event)
- event_copy.packet = self._packet
- cc_prio_map_copy = copy.copy(self._cc_prio_map)
- msg2 = bt2.EventMessage(event_copy, cc_prio_map_copy)
- self.assertNotEqual(msg, msg2)
-
- def test_eq_invalid(self):
- msg = bt2.EventMessage(self._event)
- self.assertNotEqual(msg, 23)
-
- def test_copy(self):
- msg = bt2.EventMessage(self._event, self._cc_prio_map)
- msg2 = copy.copy(msg)
- self.assertEqual(msg, msg2)
-
- def test_deepcopy(self):
- msg = bt2.EventMessage(self._event, self._cc_prio_map)
- msg2 = copy.deepcopy(msg)
- self.assertEqual(msg, msg2)
-
-
-@unittest.skip("this is broken")
-class PacketBeginningMessageTestCase(_MessageTestCase):
- def test_create(self):
- msg = bt2.PacketBeginningMessage(self._packet)
- self.assertEqual(msg.packet.addr, self._packet.addr)
-
- def test_eq(self):
- msg = bt2.PacketBeginningMessage(self._packet)
- packet_copy = copy.copy(self._packet)
- msg2 = bt2.PacketBeginningMessage(packet_copy)
- self.assertEqual(msg, msg2)
-
- def test_ne_packet(self):
- msg = bt2.PacketBeginningMessage(self._packet)
- packet_copy = copy.copy(self._packet)
- packet_copy.header_field['hello'] = 1847
- msg2 = bt2.PacketBeginningMessage(packet_copy)
- self.assertNotEqual(msg, msg2)
-
- def test_eq_invalid(self):
- msg = bt2.PacketBeginningMessage(self._packet)
- self.assertNotEqual(msg, 23)
-
- def test_copy(self):
- msg = bt2.PacketBeginningMessage(self._packet)
- msg2 = copy.copy(msg)
- self.assertEqual(msg, msg2)
-
- def test_deepcopy(self):
- msg = bt2.PacketBeginningMessage(self._packet)
- msg2 = copy.deepcopy(msg)
- self.assertEqual(msg, msg2)
-
-
-@unittest.skip("this is broken")
-class PacketEndMessageTestCase(_MessageTestCase):
- def test_create(self):
- msg = bt2.PacketEndMessage(self._packet)
- self.assertEqual(msg.packet.addr, self._packet.addr)
-
- def test_eq(self):
- msg = bt2.PacketEndMessage(self._packet)
- packet_copy = copy.copy(self._packet)
- msg2 = bt2.PacketEndMessage(packet_copy)
- self.assertEqual(msg, msg2)
-
- def test_ne_packet(self):
- msg = bt2.PacketEndMessage(self._packet)
- packet_copy = copy.copy(self._packet)
- packet_copy.header_field['hello'] = 1847
- msg2 = bt2.PacketEndMessage(packet_copy)
- self.assertNotEqual(msg, msg2)
-
- def test_eq_invalid(self):
- msg = bt2.PacketEndMessage(self._packet)
- self.assertNotEqual(msg, 23)
-
- def test_copy(self):
- msg = bt2.PacketEndMessage(self._packet)
- msg2 = copy.copy(msg)
- self.assertEqual(msg, msg2)
-
- def test_deepcopy(self):
- msg = bt2.PacketEndMessage(self._packet)
- msg2 = copy.deepcopy(msg)
- self.assertEqual(msg, msg2)
-
-
-@unittest.skip("this is broken")
-class StreamBeginningMessageTestCase(_MessageTestCase):
- def test_create(self):
- msg = bt2.StreamBeginningMessage(self._stream)
- self.assertEqual(msg.stream.addr, self._stream.addr)
-
- def test_eq(self):
- msg = bt2.StreamBeginningMessage(self._stream)
- stream_copy = copy.copy(self._stream)
- msg2 = bt2.StreamBeginningMessage(stream_copy)
- self.assertEqual(msg, msg2)
-
- def test_ne_stream(self):
- msg = bt2.StreamBeginningMessage(self._stream)
- stream_copy = self._sc(name='salut')
- msg2 = bt2.StreamBeginningMessage(stream_copy)
- self.assertNotEqual(msg, msg2)
-
- def test_eq_invalid(self):
- msg = bt2.StreamBeginningMessage(self._stream)
- self.assertNotEqual(msg, 23)
-
- def test_copy(self):
- msg = bt2.StreamBeginningMessage(self._stream)
- msg2 = copy.copy(msg)
- self.assertEqual(msg, msg2)
-
- def test_deepcopy(self):
- msg = bt2.StreamBeginningMessage(self._stream)
- msg2 = copy.deepcopy(msg)
- self.assertEqual(msg, msg2)
-
-
-@unittest.skip("this is broken")
-class StreamEndMessageTestCase(_MessageTestCase):
- def test_create(self):
- msg = bt2.StreamEndMessage(self._stream)
- self.assertEqual(msg.stream.addr, self._stream.addr)
-
- def test_eq(self):
- msg = bt2.StreamEndMessage(self._stream)
- stream_copy = copy.copy(self._stream)
- msg2 = bt2.StreamEndMessage(stream_copy)
- self.assertEqual(msg, msg2)
-
- def test_ne_stream(self):
- msg = bt2.StreamEndMessage(self._stream)
- stream_copy = self._sc(name='salut')
- msg2 = bt2.StreamEndMessage(stream_copy)
- self.assertNotEqual(msg, msg2)
-
- def test_eq_invalid(self):
- msg = bt2.StreamEndMessage(self._stream)
- self.assertNotEqual(msg, 23)
-
- def test_copy(self):
- msg = bt2.StreamEndMessage(self._stream)
- msg2 = copy.copy(msg)
- self.assertEqual(msg, msg2)
-
- def test_deepcopy(self):
- msg = bt2.StreamEndMessage(self._stream)
- msg2 = copy.deepcopy(msg)
- self.assertEqual(msg, msg2)
-
-
-@unittest.skip("this is broken")
-class InactivityMessageTestCase(unittest.TestCase):
+import utils
+from utils import TestOutputPortMessageIterator
+from bt2 import clock_snapshot as bt2_clock_snapshot
+from bt2 import event as bt2_event
+from bt2 import event_class as bt2_event_class
+from bt2 import field as bt2_field
+from bt2 import packet as bt2_packet
+from bt2 import stream as bt2_stream
+from bt2 import stream_class as bt2_stream_class
+from bt2 import trace as bt2_trace
+from bt2 import trace_class as bt2_trace_class
+
+
+class AllMessagesTestCase(unittest.TestCase):
def setUp(self):
- self._cc1 = bt2.ClockClass('cc1', 1000)
- self._cc2 = bt2.ClockClass('cc2', 2000)
- self._cc_prio_map = bt2.ClockClassPriorityMap()
- self._cc_prio_map[self._cc1] = 25
- self._cc_prio_map[self._cc2] = 50
-
- def tearDown(self):
- del self._cc1
- del self._cc2
- del self._cc_prio_map
-
- def test_create_no_cc_prio_map(self):
- msg = bt2.InactivityMessage()
- self.assertEqual(len(msg.clock_class_priority_map), 0)
-
- def test_create_with_cc_prio_map(self):
- msg = bt2.InactivityMessage(self._cc_prio_map)
- msg.clock_values.add(self._cc1(123))
- msg.clock_values.add(self._cc2(19487))
- self.assertEqual(len(msg.clock_class_priority_map), 2)
- self.assertEqual(msg.clock_class_priority_map, self._cc_prio_map)
- self.assertEqual(msg.clock_values[self._cc1], 123)
- self.assertEqual(msg.clock_values[self._cc2], 19487)
-
- def test_eq(self):
- msg = bt2.InactivityMessage(self._cc_prio_map)
- msg.clock_values.add(self._cc1(123))
- msg.clock_values.add(self._cc2(19487))
- cc_prio_map_copy = copy.copy(self._cc_prio_map)
- msg2 = bt2.InactivityMessage(cc_prio_map_copy)
- msg2.clock_values.add(self._cc1(123))
- msg2.clock_values.add(self._cc2(19487))
- self.assertEqual(msg, msg2)
-
- def test_ne_cc_prio_map(self):
- msg = bt2.InactivityMessage(self._cc_prio_map)
- msg.clock_values.add(self._cc1(123))
- msg.clock_values.add(self._cc2(19487))
- cc_prio_map_copy = copy.copy(self._cc_prio_map)
- cc_prio_map_copy[self._cc2] = 23
- msg2 = bt2.InactivityMessage(cc_prio_map_copy)
- self.assertNotEqual(msg, msg2)
-
- def test_ne_clock_value(self):
- msg = bt2.InactivityMessage(self._cc_prio_map)
- msg.clock_values.add(self._cc1(123))
- msg.clock_values.add(self._cc2(19487))
- msg2 = bt2.InactivityMessage(self._cc_prio_map)
- msg.clock_values.add(self._cc1(123))
- msg.clock_values.add(self._cc2(1847))
- self.assertNotEqual(msg, msg2)
-
- def test_eq_invalid(self):
- msg = bt2.InactivityMessage(self._cc_prio_map)
- self.assertNotEqual(msg, 23)
-
- def test_copy(self):
- msg = bt2.InactivityMessage(self._cc_prio_map)
- msg.clock_values.add(self._cc1(123))
- msg.clock_values.add(self._cc2(19487))
- msg_copy = copy.copy(msg)
- self.assertEqual(msg, msg_copy)
- self.assertNotEqual(msg.addr, msg_copy.addr)
- self.assertEqual(msg.clock_class_priority_map.addr,
- msg_copy.clock_class_priority_map.addr)
- self.assertEqual(msg_copy.clock_values[self._cc1], 123)
- self.assertEqual(msg_copy.clock_values[self._cc2], 19487)
-
- def test_deepcopy(self):
- msg = bt2.InactivityMessage(self._cc_prio_map)
- msg.clock_values.add(self._cc1(123))
- msg.clock_values.add(self._cc2(19487))
- msg_copy = copy.deepcopy(msg)
- self.assertEqual(msg, msg_copy)
- self.assertNotEqual(msg.addr, msg_copy.addr)
- self.assertNotEqual(msg.clock_class_priority_map.addr,
- msg_copy.clock_class_priority_map.addr)
- self.assertEqual(msg.clock_class_priority_map,
- msg_copy.clock_class_priority_map)
- self.assertNotEqual(list(msg.clock_class_priority_map)[0].addr,
- list(msg_copy.clock_class_priority_map)[0].addr)
- self.assertIsNone(msg_copy.clock_values[self._cc1])
- self.assertIsNone(msg_copy.clock_values[self._cc2])
- self.assertEqual(msg_copy.clock_values[list(msg_copy.clock_class_priority_map)[0]], 123)
- self.assertEqual(msg_copy.clock_values[list(msg_copy.clock_class_priority_map)[1]], 19487)
-
-
-@unittest.skip("this is broken")
-class DiscardedPacketsMessageTestCase(unittest.TestCase):
- def setUp(self):
- self._trace = bt2.Trace()
- self._sc = bt2.StreamClass()
- self._ec = bt2.EventClass('salut')
- self._clock_class = bt2.ClockClass('yo', 1000)
- self._uint64_int_ft = bt2.IntegerFieldType(64, mapped_clock_class=self._clock_class)
- self._my_int_ft = bt2.IntegerFieldType(32)
- self._ec.payload_field_type = bt2.StructureFieldType()
- self._ec.payload_field_type += collections.OrderedDict([
- ('my_int', self._my_int_ft),
- ])
- self._sc.add_event_class(self._ec)
- self._sc.packet_context_field_type = bt2.StructureFieldType()
- self._sc.packet_context_field_type += collections.OrderedDict([
- ('packet_seq_num', self._my_int_ft),
- ('timestamp_begin', self._uint64_int_ft),
- ('timestamp_end', self._uint64_int_ft),
- ])
- self._trace.add_clock_class(self._clock_class)
- self._trace.add_stream_class(self._sc)
- self._stream = self._sc()
-
- def tearDown(self):
- del self._trace
- del self._sc
- del self._ec
- del self._clock_class
- del self._uint64_int_ft
- del self._my_int_ft
- del self._stream
-
- def _create_event(self, packet):
- event = self._ec()
- event.payload_field['my_int'] = 23
- event.packet = packet
- return event
-
- def _get_msg(self):
class MyIter(bt2._UserMessageIterator):
- def __init__(iter_self):
- packet1 = self._stream.create_packet()
- packet1.context_field['packet_seq_num'] = 0
- packet1.context_field['timestamp_begin'] = 3
- packet1.context_field['timestamp_end'] = 6
- packet2 = self._stream.create_packet()
- packet2.context_field['packet_seq_num'] = 5
- packet2.context_field['timestamp_begin'] = 7
- packet2.context_field['timestamp_end'] = 10
- iter_self._ev1 = self._create_event(packet1)
- iter_self._ev2 = self._create_event(packet2)
- iter_self._at = 0
+ def __init__(self, config, self_port_output):
+ self._at = 0
+ self._with_stream_msgs_clock_snapshots = self_port_output.user_data.get(
+ 'with_stream_msgs_clock_snapshots', False
+ )
def __next__(self):
- if self._at == 0:
- msg = bt2.EventMessage(self._ev1)
- elif self._at == 1:
- msg = bt2.EventMessage(self._ev2)
+ if test_obj._clock_class:
+ if self._at == 0:
+ if self._with_stream_msgs_clock_snapshots:
+ msg = self._create_stream_beginning_message(
+ test_obj._stream, default_clock_snapshot=self._at
+ )
+ else:
+ msg = self._create_stream_beginning_message(
+ test_obj._stream
+ )
+ test_obj.assertIs(type(msg), bt2._StreamBeginningMessage)
+ elif self._at == 1:
+ msg = self._create_packet_beginning_message(
+ test_obj._packet, self._at
+ )
+ test_obj.assertIs(type(msg), bt2._PacketBeginningMessage)
+ elif self._at == 2:
+ msg = self._create_event_message(
+ test_obj._event_class, test_obj._packet, self._at
+ )
+ test_obj.assertIs(type(msg), bt2._EventMessage)
+ elif self._at == 3:
+ msg = self._create_message_iterator_inactivity_message(
+ test_obj._clock_class, self._at
+ )
+ elif self._at == 4:
+ msg = self._create_discarded_events_message(
+ test_obj._stream, 890, self._at, self._at
+ )
+ test_obj.assertIs(type(msg), bt2._DiscardedEventsMessage)
+ elif self._at == 5:
+ msg = self._create_packet_end_message(
+ test_obj._packet, self._at
+ )
+ test_obj.assertIs(type(msg), bt2._PacketEndMessage)
+ elif self._at == 6:
+ msg = self._create_discarded_packets_message(
+ test_obj._stream, 678, self._at, self._at
+ )
+ test_obj.assertIs(type(msg), bt2._DiscardedPacketsMessage)
+ elif self._at == 7:
+ if self._with_stream_msgs_clock_snapshots:
+ msg = self._create_stream_end_message(
+ test_obj._stream, default_clock_snapshot=self._at
+ )
+ else:
+ msg = self._create_stream_end_message(test_obj._stream)
+ test_obj.assertIs(type(msg), bt2._StreamEndMessage)
+ elif self._at >= 8:
+ raise bt2.Stop
else:
- raise bt2.Stop
+ if self._at == 0:
+ msg = self._create_stream_beginning_message(test_obj._stream)
+ elif self._at == 1:
+ msg = self._create_packet_beginning_message(test_obj._packet)
+ elif self._at == 2:
+ msg = self._create_event_message(
+ test_obj._event_class, test_obj._packet
+ )
+ elif self._at == 3:
+ msg = self._create_discarded_events_message(
+ test_obj._stream, 890
+ )
+ elif self._at == 4:
+ msg = self._create_packet_end_message(test_obj._packet)
+ elif self._at == 5:
+ msg = self._create_discarded_packets_message(
+ test_obj._stream, 678
+ )
+ elif self._at == 6:
+ msg = self._create_stream_end_message(test_obj._stream)
+ elif self._at >= 7:
+ raise bt2.Stop
self._at += 1
return msg
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
- self._add_output_port('out')
-
- class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
- self._add_input_port('in')
-
- def _consume(comp_self):
- nonlocal the_msg
- msg = next(comp_self._msg_iter)
-
- if type(msg) is bt2._DiscardedPacketsMessage:
- the_msg = msg
- raise bt2.Stop
-
- def _port_connected(self, port, other_port):
- self._msg_iter = port.connection.create_message_iterator()
-
- the_msg = None
- graph = bt2.Graph()
- src = graph.add_component(MySource, 'src')
- sink = graph.add_component(MySink, 'sink')
- conn = graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
- graph.run()
- return the_msg
-
- def test_create(self):
- self.assertIsInstance(self._get_msg(), bt2._DiscardedPacketsMessage)
-
- def test_count(self):
- self.assertEqual(self._get_msg().count, 4)
-
- def test_stream(self):
- self.assertEqual(self._get_msg().stream.addr, self._stream.addr)
-
- def test_beginning_clock_value(self):
- msg = self._get_msg()
- beginning_clock_value = msg.beginning_clock_value
- self.assertEqual(beginning_clock_value.clock_class, self._clock_class)
- self.assertEqual(beginning_clock_value, 6)
+ class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ def __init__(self, config, params, obj):
+ self._add_output_port('out', params)
- def test_end_clock_value(self):
- msg = self._get_msg()
- end_clock_value = msg.end_clock_value
- self.assertEqual(end_clock_value.clock_class, self._clock_class)
- self.assertEqual(end_clock_value, 7)
-
- def test_eq(self):
- msg1 = self._get_msg()
- msg2 = self._get_msg()
- self.assertEqual(msg1, msg2)
-
- def test_eq_invalid(self):
- msg1 = self._get_msg()
- self.assertNotEqual(msg1, 23)
-
-
-@unittest.skip("this is broken")
-class DiscardedEventsMessageTestCase(unittest.TestCase):
- def setUp(self):
- self._trace = bt2.Trace()
- self._sc = bt2.StreamClass()
- self._ec = bt2.EventClass('salut')
- self._clock_class = bt2.ClockClass('yo', 1000)
- self._uint64_int_ft = bt2.IntegerFieldType(64, mapped_clock_class=self._clock_class)
- self._my_int_ft = bt2.IntegerFieldType(32)
- self._ec.payload_field_type = bt2.StructureFieldType()
- self._ec.payload_field_type += collections.OrderedDict([
- ('my_int', self._my_int_ft),
- ])
- self._sc.add_event_class(self._ec)
- self._sc.packet_context_field_type = bt2.StructureFieldType()
- self._sc.packet_context_field_type += collections.OrderedDict([
- ('events_discarded', self._my_int_ft),
- ('timestamp_begin', self._uint64_int_ft),
- ('timestamp_end', self._uint64_int_ft),
- ])
- self._trace.add_clock_class(self._clock_class)
- self._trace.add_stream_class(self._sc)
- self._stream = self._sc()
-
- def tearDown(self):
- del self._trace
- del self._sc
- del self._ec
- del self._clock_class
- del self._uint64_int_ft
- del self._my_int_ft
- del self._stream
-
- def _create_event(self, packet):
- event = self._ec()
- event.payload_field['my_int'] = 23
- event.packet = packet
- return event
-
- def _get_msg(self):
- class MyIter(bt2._UserMessageIterator):
- def __init__(iter_self):
- packet1 = self._stream.create_packet()
- packet1.context_field['events_discarded'] = 0
- packet1.context_field['timestamp_begin'] = 3
- packet1.context_field['timestamp_end'] = 6
- packet2 = self._stream.create_packet()
- packet2.context_field['events_discarded'] = 10
- packet2.context_field['timestamp_begin'] = 7
- packet2.context_field['timestamp_end'] = 10
- iter_self._ev1 = self._create_event(packet1)
- iter_self._ev2 = self._create_event(packet2)
- iter_self._at = 0
-
- def __next__(self):
- if self._at == 0:
- msg = bt2.EventMessage(self._ev1)
- elif self._at == 1:
- msg = bt2.EventMessage(self._ev2)
+ with_cc = bool(params['with_cc'])
+ tc = self._create_trace_class()
+ if with_cc:
+ cc = self._create_clock_class()
else:
- raise bt2.Stop
-
- self._at += 1
- return msg
-
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
- self._add_output_port('out')
-
- class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
- self._add_input_port('in')
-
- def _consume(comp_self):
- nonlocal the_msg
- msg = next(comp_self._msg_iter)
-
- if type(msg) is bt2._DiscardedEventsMessage:
- the_msg = msg
- raise bt2.Stop
-
- def _port_connected(self, port, other_port):
- self._msg_iter = port.connection.create_message_iterator()
-
- the_msg = None
- graph = bt2.Graph()
- src = graph.add_component(MySource, 'src')
- sink = graph.add_component(MySink, 'sink')
- conn = graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
- graph.run()
- return the_msg
-
+ cc = None
+
+ sc = tc.create_stream_class(
+ default_clock_class=cc,
+ supports_packets=True,
+ packets_have_beginning_default_clock_snapshot=with_cc,
+ packets_have_end_default_clock_snapshot=with_cc,
+ supports_discarded_events=True,
+ discarded_events_have_default_clock_snapshots=with_cc,
+ supports_discarded_packets=True,
+ discarded_packets_have_default_clock_snapshots=with_cc,
+ )
+
+ # Create payload field class
+ my_int_fc = tc.create_signed_integer_field_class(32)
+ payload_fc = tc.create_structure_field_class()
+ payload_fc += [('my_int', my_int_fc)]
+
+ # Create specific context field class
+ my_int_fc = tc.create_signed_integer_field_class(32)
+ specific_fc = tc.create_structure_field_class()
+ specific_fc += [('my_int', my_int_fc)]
+
+ ec = sc.create_event_class(
+ name='salut',
+ payload_field_class=payload_fc,
+ specific_context_field_class=specific_fc,
+ )
+
+ trace = tc()
+ stream = trace.create_stream(sc)
+ packet = stream.create_packet()
+
+ test_obj._trace = trace
+ test_obj._stream = stream
+ test_obj._packet = packet
+ test_obj._event_class = ec
+ test_obj._clock_class = cc
+
+ test_obj = self
+ self._graph = bt2.Graph()
+ self._src = MySrc
+ self._iter = MyIter
+
+ def test_all_msg_with_cc(self):
+ params = {'with_cc': True}
+ self._src_comp = self._graph.add_component(self._src, 'my_source', params)
+ self._msg_iter = TestOutputPortMessageIterator(
+ self._graph, self._src_comp.output_ports['out']
+ )
+
+ for i, msg in enumerate(self._msg_iter):
+ if i == 0:
+ self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
+ self.assertIs(type(msg.stream), bt2_stream._StreamConst)
+ self.assertEqual(msg.stream.addr, self._stream.addr)
+ self.assertIsInstance(
+ msg.default_clock_snapshot, bt2._UnknownClockSnapshot
+ )
+ elif i == 1:
+ self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
+ self.assertIs(type(msg.packet), bt2_packet._PacketConst)
+ self.assertIs(
+ type(msg.default_clock_snapshot),
+ bt2_clock_snapshot._ClockSnapshotConst,
+ )
+ self.assertEqual(msg.packet.addr, self._packet.addr)
+ self.assertEqual(msg.default_clock_snapshot.value, i)
+ elif i == 2:
+ self.assertIs(type(msg), bt2._EventMessageConst)
+ self.assertIs(type(msg.event), bt2_event._EventConst)
+ self.assertIs(
+ type(msg.default_clock_snapshot),
+ bt2_clock_snapshot._ClockSnapshotConst,
+ )
+ self.assertIs(
+ type(msg.event.payload_field), bt2_field._StructureFieldConst
+ )
+ self.assertIs(
+ type(msg.event.payload_field['my_int']),
+ bt2_field._SignedIntegerFieldConst,
+ )
+
+ self.assertEqual(msg.event.cls.addr, self._event_class.addr)
+ self.assertEqual(msg.default_clock_snapshot.value, i)
+ elif i == 3:
+ self.assertIs(type(msg), bt2._MessageIteratorInactivityMessageConst)
+ self.assertIs(
+ type(msg.clock_snapshot), bt2_clock_snapshot._ClockSnapshotConst
+ )
+ self.assertEqual(msg.clock_snapshot.value, i)
+ elif i == 4:
+ self.assertIs(type(msg), bt2._DiscardedEventsMessageConst)
+ self.assertIs(type(msg.stream), bt2_stream._StreamConst)
+ self.assertIs(type(msg.stream.cls), bt2_stream_class._StreamClassConst)
+ self.assertIs(
+ type(msg.beginning_default_clock_snapshot),
+ bt2_clock_snapshot._ClockSnapshotConst,
+ )
+ self.assertIs(
+ type(msg.end_default_clock_snapshot),
+ bt2_clock_snapshot._ClockSnapshotConst,
+ )
+
+ self.assertEqual(msg.stream.addr, self._stream.addr)
+ self.assertEqual(msg.count, 890)
+ self.assertEqual(
+ msg.stream.cls.default_clock_class.addr, self._clock_class.addr
+ )
+ self.assertEqual(msg.beginning_default_clock_snapshot.value, i)
+ self.assertEqual(msg.end_default_clock_snapshot.value, i)
+ elif i == 5:
+ self.assertIs(type(msg), bt2._PacketEndMessageConst)
+ self.assertIs(type(msg.packet), bt2_packet._PacketConst)
+ self.assertIs(
+ type(msg.default_clock_snapshot),
+ bt2_clock_snapshot._ClockSnapshotConst,
+ )
+ self.assertEqual(msg.packet.addr, self._packet.addr)
+ self.assertEqual(msg.default_clock_snapshot.value, i)
+ elif i == 6:
+ self.assertIs(type(msg), bt2._DiscardedPacketsMessageConst)
+ self.assertIs(type(msg.stream), bt2_stream._StreamConst)
+ self.assertIs(type(msg.stream.trace), bt2_trace._TraceConst)
+ self.assertIs(
+ type(msg.stream.trace.cls), bt2_trace_class._TraceClassConst
+ )
+ self.assertIs(
+ type(msg.beginning_default_clock_snapshot),
+ bt2_clock_snapshot._ClockSnapshotConst,
+ )
+ self.assertIs(
+ type(msg.end_default_clock_snapshot),
+ bt2_clock_snapshot._ClockSnapshotConst,
+ )
+ self.assertEqual(msg.stream.addr, self._stream.addr)
+ self.assertEqual(msg.count, 678)
+ self.assertEqual(
+ msg.stream.cls.default_clock_class.addr, self._clock_class.addr
+ )
+ self.assertEqual(msg.beginning_default_clock_snapshot.value, i)
+ self.assertEqual(msg.end_default_clock_snapshot.value, i)
+ elif i == 7:
+ self.assertIs(type(msg), bt2._StreamEndMessageConst)
+ self.assertIs(type(msg.stream), bt2_stream._StreamConst)
+ self.assertEqual(msg.stream.addr, self._stream.addr)
+ self.assertIs(
+ type(msg.default_clock_snapshot), bt2._UnknownClockSnapshot
+ )
+ else:
+ raise Exception
+
+ def test_all_msg_without_cc(self):
+ params = {'with_cc': False}
+ self._src_comp = self._graph.add_component(self._src, 'my_source', params)
+ self._msg_iter = TestOutputPortMessageIterator(
+ self._graph, self._src_comp.output_ports['out']
+ )
+
+ for i, msg in enumerate(self._msg_iter):
+ if i == 0:
+ self.assertIsInstance(msg, bt2._StreamBeginningMessageConst)
+ self.assertIs(type(msg.stream), bt2_stream._StreamConst)
+ self.assertEqual(msg.stream.addr, self._stream.addr)
+ with self.assertRaisesRegex(
+ ValueError, 'stream class has no default clock class'
+ ):
+ msg.default_clock_snapshot
+ elif i == 1:
+ self.assertIsInstance(msg, bt2._PacketBeginningMessageConst)
+ self.assertIs(type(msg.packet), bt2_packet._PacketConst)
+ self.assertEqual(msg.packet.addr, self._packet.addr)
+ elif i == 2:
+ self.assertIsInstance(msg, bt2._EventMessageConst)
+ self.assertIs(type(msg.event), bt2_event._EventConst)
+ self.assertIs(type(msg.event.cls), bt2_event_class._EventClassConst)
+ self.assertEqual(msg.event.cls.addr, self._event_class.addr)
+ with self.assertRaisesRegex(
+ ValueError, 'stream class has no default clock class'
+ ):
+ msg.default_clock_snapshot
+ elif i == 3:
+ self.assertIsInstance(msg, bt2._DiscardedEventsMessageConst)
+ self.assertIs(type(msg.stream), bt2_stream._StreamConst)
+ self.assertIs(type(msg.stream.cls), bt2_stream_class._StreamClassConst)
+ self.assertEqual(msg.stream.addr, self._stream.addr)
+ self.assertEqual(msg.count, 890)
+ self.assertIsNone(msg.stream.cls.default_clock_class)
+ with self.assertRaisesRegex(
+ ValueError,
+ 'such a message has no clock snapshots for this stream class',
+ ):
+ msg.beginning_default_clock_snapshot
+ with self.assertRaisesRegex(
+ ValueError,
+ 'such a message has no clock snapshots for this stream class',
+ ):
+ msg.end_default_clock_snapshot
+ elif i == 4:
+ self.assertIsInstance(msg, bt2._PacketEndMessageConst)
+ self.assertEqual(msg.packet.addr, self._packet.addr)
+ self.assertIs(type(msg.packet), bt2_packet._PacketConst)
+ elif i == 5:
+ self.assertIsInstance(msg, bt2._DiscardedPacketsMessageConst)
+ self.assertIs(type(msg.stream), bt2_stream._StreamConst)
+ self.assertIs(type(msg.stream.cls), bt2_stream_class._StreamClassConst)
+ self.assertIs(
+ type(msg.stream.cls.trace_class), bt2_trace_class._TraceClassConst
+ )
+ self.assertEqual(msg.stream.addr, self._stream.addr)
+ self.assertEqual(msg.count, 678)
+ self.assertIsNone(msg.stream.cls.default_clock_class)
+ with self.assertRaisesRegex(
+ ValueError,
+ 'such a message has no clock snapshots for this stream class',
+ ):
+ msg.beginning_default_clock_snapshot
+ with self.assertRaisesRegex(
+ ValueError,
+ 'such a message has no clock snapshots for this stream class',
+ ):
+ msg.end_default_clock_snapshot
+ elif i == 6:
+ self.assertIsInstance(msg, bt2._StreamEndMessageConst)
+ self.assertIs(type(msg.stream), bt2_stream._StreamConst)
+ self.assertEqual(msg.stream.addr, self._stream.addr)
+ with self.assertRaisesRegex(
+ ValueError, 'stream class has no default clock class'
+ ):
+ msg.default_clock_snapshot
+ else:
+ raise Exception
+
+ def test_msg_stream_with_clock_snapshots(self):
+ params = {'with_cc': True, 'with_stream_msgs_clock_snapshots': True}
+
+ self._src_comp = self._graph.add_component(self._src, 'my_source', params)
+ self._msg_iter = TestOutputPortMessageIterator(
+ self._graph, self._src_comp.output_ports['out']
+ )
+ msgs = list(self._msg_iter)
+
+ msg_stream_beg = msgs[0]
+ self.assertIsInstance(msg_stream_beg, bt2._StreamBeginningMessageConst)
+ self.assertIs(
+ type(msg_stream_beg.default_clock_snapshot),
+ bt2_clock_snapshot._ClockSnapshotConst,
+ )
+ self.assertEqual(msg_stream_beg.default_clock_snapshot.value, 0)
+
+ msg_stream_end = msgs[7]
+ self.assertIsInstance(msg_stream_end, bt2._StreamEndMessageConst)
+ self.assertIs(
+ type(msg_stream_end.default_clock_snapshot),
+ bt2_clock_snapshot._ClockSnapshotConst,
+ )
+ self.assertEqual(msg_stream_end.default_clock_snapshot.value, 7)
+
+ def test_stream_beg_msg(self):
+ msg = utils.get_stream_beginning_message()
+ self.assertIs(type(msg.stream), bt2_stream._Stream)
+
+ def test_stream_end_msg(self):
+ msg = utils.get_stream_end_message()
+ self.assertIs(type(msg.stream), bt2_stream._Stream)
+
+ def test_packet_beg_msg(self):
+ msg = utils.get_packet_beginning_message()
+ self.assertIs(type(msg.packet), bt2_packet._Packet)
+
+ def test_packet_end_msg(self):
+ msg = utils.get_packet_end_message()
+ self.assertIs(type(msg.packet), bt2_packet._Packet)
+
+ def test_event_msg(self):
+ msg = utils.get_event_message()
+ self.assertIs(type(msg.event), bt2_event._Event)
+
+
+class CreateDiscardedEventMessageTestCase(unittest.TestCase):
+ # Most basic case.
+ def test_create(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(supports_discarded_events=True)
+
+ def msg_iter_next(msg_iter, stream):
+ return msg_iter._create_discarded_events_message(stream)
+
+ msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertIs(type(msg), bt2._DiscardedEventsMessage)
+ self.assertIs(msg.count, None)
+
+ # With event count.
+ def test_create_with_count(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(supports_discarded_events=True)
+
+ def msg_iter_next(msg_iter, stream):
+ return msg_iter._create_discarded_events_message(stream, count=242)
+
+ msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertIs(type(msg), bt2._DiscardedEventsMessage)
+ self.assertEqual(msg.count, 242)
+
+ # With event count == 0.
+ def test_create_with_count_zero_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(supports_discarded_events=True)
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError, 'discarded event count is 0',
+ ):
+ msg_iter._create_discarded_events_message(stream, count=0)
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+ # With clock snapshots.
+ def test_create_with_clock_snapshots(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ default_clock_class=cc,
+ supports_discarded_events=True,
+ discarded_events_have_default_clock_snapshots=True,
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ return msg_iter._create_discarded_events_message(
+ stream, beg_clock_snapshot=10, end_clock_snapshot=20
+ )
+
+ msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertIs(type(msg), bt2._DiscardedEventsMessage)
+ self.assertEqual(msg.beginning_default_clock_snapshot, 10)
+ self.assertEqual(msg.end_default_clock_snapshot, 20)
+
+ # Trying to create when the stream does not support discarded events.
+ def test_create_unsupported_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class()
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError, 'stream class does not support discarded events'
+ ):
+ msg_iter._create_discarded_events_message(stream)
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+ # Trying to create with clock snapshots when the stream does not support
+ # them.
+ def test_create_unsupported_clock_snapshots_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(supports_discarded_events=True)
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError,
+ 'discarded events have no default clock snapshots for this stream class',
+ ):
+ msg_iter._create_discarded_events_message(
+ stream, beg_clock_snapshot=10, end_clock_snapshot=20
+ )
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+ # Trying to create without clock snapshots when the stream requires them.
+ def test_create_missing_clock_snapshots_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ default_clock_class=cc,
+ supports_discarded_events=True,
+ discarded_events_have_default_clock_snapshots=True,
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError,
+ 'discarded events have default clock snapshots for this stream class',
+ ):
+ msg_iter._create_discarded_events_message(stream)
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+ # End clock snapshot greater than beginning clock snapshot.
+ def test_create_clock_snapshots_end_gt_begin_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ default_clock_class=cc,
+ supports_discarded_events=True,
+ discarded_events_have_default_clock_snapshots=True,
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError,
+ r'beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)',
+ ):
+ msg_iter._create_discarded_events_message(
+ stream, beg_clock_snapshot=20, end_clock_snapshot=10
+ )
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+
+class CreateDiscardedPacketMessageTestCase(unittest.TestCase):
+ # Most basic case.
def test_create(self):
- self.assertIsInstance(self._get_msg(), bt2._DiscardedEventsMessage)
-
- def test_count(self):
- self.assertEqual(self._get_msg().count, 10)
-
- def test_stream(self):
- self.assertEqual(self._get_msg().stream.addr, self._stream.addr)
-
- def test_beginning_clock_value(self):
- msg = self._get_msg()
- beginning_clock_value = msg.beginning_clock_value
- self.assertEqual(beginning_clock_value.clock_class, self._clock_class)
- self.assertEqual(beginning_clock_value, 6)
-
- def test_end_clock_value(self):
- msg = self._get_msg()
- end_clock_value = msg.end_clock_value
- self.assertEqual(end_clock_value.clock_class, self._clock_class)
- self.assertEqual(end_clock_value, 10)
-
- def test_eq(self):
- msg1 = self._get_msg()
- msg2 = self._get_msg()
- self.assertEqual(msg1, msg2)
-
- def test_eq_invalid(self):
- msg1 = self._get_msg()
- self.assertNotEqual(msg1, 23)
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ supports_packets=True, supports_discarded_packets=True
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ return msg_iter._create_discarded_packets_message(stream)
+
+ msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertIs(type(msg), bt2._DiscardedPacketsMessage)
+ self.assertIs(msg.count, None)
+
+ # With packet count.
+ def test_create_with_count(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ supports_packets=True, supports_discarded_packets=True
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ return msg_iter._create_discarded_packets_message(stream, count=242)
+
+ msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertIs(type(msg), bt2._DiscardedPacketsMessage)
+ self.assertEqual(msg.count, 242)
+
+ # With packet count == 0.
+ def test_create_with_count_zero_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ supports_packets=True, supports_discarded_packets=True
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError, 'discarded packet count is 0',
+ ):
+ msg_iter._create_discarded_packets_message(stream, count=0)
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+ # With clock snapshots.
+ def test_create_with_clock_snapshots(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ default_clock_class=cc,
+ supports_packets=True,
+ supports_discarded_packets=True,
+ discarded_packets_have_default_clock_snapshots=True,
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ return msg_iter._create_discarded_packets_message(
+ stream, beg_clock_snapshot=10, end_clock_snapshot=20
+ )
+
+ msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertIs(type(msg), bt2._DiscardedPacketsMessage)
+ self.assertEqual(msg.beginning_default_clock_snapshot, 10)
+ self.assertEqual(msg.end_default_clock_snapshot, 20)
+
+ # Trying to create when the stream does not support discarded packets.
+ def test_create_unsupported_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(supports_packets=True,)
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError, 'stream class does not support discarded packets'
+ ):
+ msg_iter._create_discarded_packets_message(stream)
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+ # Trying to create with clock snapshots when the stream does not support
+ # them.
+ def test_create_unsupported_clock_snapshots_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ supports_packets=True, supports_discarded_packets=True
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError,
+ 'discarded packets have no default clock snapshots for this stream class',
+ ):
+ msg_iter._create_discarded_packets_message(
+ stream, beg_clock_snapshot=10, end_clock_snapshot=20
+ )
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+ # Trying to create without clock snapshots when the stream requires them.
+ def test_create_missing_clock_snapshots_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ default_clock_class=cc,
+ supports_packets=True,
+ supports_discarded_packets=True,
+ discarded_packets_have_default_clock_snapshots=True,
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError,
+ 'discarded packets have default clock snapshots for this stream class',
+ ):
+ msg_iter._create_discarded_packets_message(stream)
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+ # End clock snapshot greater than beginning clock snapshot.
+ def test_create_clock_snapshots_end_gt_begin_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ default_clock_class=cc,
+ supports_packets=True,
+ supports_discarded_packets=True,
+ discarded_packets_have_default_clock_snapshots=True,
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError,
+ r'beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)',
+ ):
+ msg_iter._create_discarded_packets_message(
+ stream, beg_clock_snapshot=20, end_clock_snapshot=10
+ )
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+
+if __name__ == '__main__':
+ unittest.main()