+ cc = None
+
+ sc = tc.create_stream_class(
+ default_clock_class=cc,
+ supports_packets=True,
+ packets_have_beginning_default_clock_snapshot=with_cc,
+ packets_have_end_default_clock_snapshot=with_cc,
+ supports_discarded_events=True,
+ discarded_events_have_default_clock_snapshots=with_cc,
+ supports_discarded_packets=True,
+ discarded_packets_have_default_clock_snapshots=with_cc,
+ )
+
+ # Create payload field class
+ my_int_fc = tc.create_signed_integer_field_class(32)
+ payload_fc = tc.create_structure_field_class()
+ payload_fc += [('my_int', my_int_fc)]
+
+ # Create specific context field class
+ my_int_fc = tc.create_signed_integer_field_class(32)
+ specific_fc = tc.create_structure_field_class()
+ specific_fc += [('my_int', my_int_fc)]
+
+ ec = sc.create_event_class(
+ name='salut',
+ payload_field_class=payload_fc,
+ specific_context_field_class=specific_fc,
+ )
+
+ trace = tc()
+ stream = trace.create_stream(sc)
+ packet = stream.create_packet()
+
+ test_obj._trace = trace
+ test_obj._stream = stream
+ test_obj._packet = packet
+ test_obj._event_class = ec
+ test_obj._clock_class = cc
+
+ test_obj = self
+ self._graph = bt2.Graph()
+ self._src = MySrc
+ self._iter = MyIter
+
+ def test_all_msg_with_cc(self):
+ params = {'with_cc': True}
+ self._src_comp = self._graph.add_component(self._src, 'my_source', params)
+ self._msg_iter = TestOutputPortMessageIterator(
+ self._graph, self._src_comp.output_ports['out']
+ )
+
+ for i, msg in enumerate(self._msg_iter):
+ if i == 0:
+ self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
+ self.assertIs(type(msg.stream), bt2_stream._StreamConst)
+ self.assertEqual(msg.stream.addr, self._stream.addr)
+ self.assertIsInstance(
+ msg.default_clock_snapshot, bt2._UnknownClockSnapshot
+ )
+ elif i == 1:
+ self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
+ self.assertIs(type(msg.packet), bt2_packet._PacketConst)
+ self.assertIs(
+ type(msg.default_clock_snapshot),
+ bt2_clock_snapshot._ClockSnapshotConst,
+ )
+ self.assertEqual(msg.packet.addr, self._packet.addr)
+ self.assertEqual(msg.default_clock_snapshot.value, i)
+ elif i == 2:
+ self.assertIs(type(msg), bt2._EventMessageConst)
+ self.assertIs(type(msg.event), bt2_event._EventConst)
+ self.assertIs(
+ type(msg.default_clock_snapshot),
+ bt2_clock_snapshot._ClockSnapshotConst,
+ )
+ self.assertIs(
+ type(msg.event.payload_field), bt2_field._StructureFieldConst
+ )
+ self.assertIs(
+ type(msg.event.payload_field['my_int']),
+ bt2_field._SignedIntegerFieldConst,
+ )
+
+ self.assertEqual(msg.event.cls.addr, self._event_class.addr)
+ self.assertEqual(msg.default_clock_snapshot.value, i)
+ elif i == 3:
+ self.assertIs(type(msg), bt2._MessageIteratorInactivityMessageConst)
+ self.assertIs(
+ type(msg.clock_snapshot), bt2_clock_snapshot._ClockSnapshotConst
+ )
+ self.assertEqual(msg.clock_snapshot.value, i)
+ elif i == 4:
+ self.assertIs(type(msg), bt2._DiscardedEventsMessageConst)
+ self.assertIs(type(msg.stream), bt2_stream._StreamConst)
+ self.assertIs(type(msg.stream.cls), bt2_stream_class._StreamClassConst)
+ self.assertIs(
+ type(msg.beginning_default_clock_snapshot),
+ bt2_clock_snapshot._ClockSnapshotConst,
+ )
+ self.assertIs(
+ type(msg.end_default_clock_snapshot),
+ bt2_clock_snapshot._ClockSnapshotConst,
+ )
+
+ self.assertEqual(msg.stream.addr, self._stream.addr)
+ self.assertEqual(msg.count, 890)
+ self.assertEqual(
+ msg.stream.cls.default_clock_class.addr, self._clock_class.addr
+ )
+ self.assertEqual(msg.beginning_default_clock_snapshot.value, i)
+ self.assertEqual(msg.end_default_clock_snapshot.value, i)
+ elif i == 5:
+ self.assertIs(type(msg), bt2._PacketEndMessageConst)
+ self.assertIs(type(msg.packet), bt2_packet._PacketConst)
+ self.assertIs(
+ type(msg.default_clock_snapshot),
+ bt2_clock_snapshot._ClockSnapshotConst,
+ )
+ self.assertEqual(msg.packet.addr, self._packet.addr)
+ self.assertEqual(msg.default_clock_snapshot.value, i)
+ elif i == 6:
+ self.assertIs(type(msg), bt2._DiscardedPacketsMessageConst)
+ self.assertIs(type(msg.stream), bt2_stream._StreamConst)
+ self.assertIs(type(msg.stream.trace), bt2_trace._TraceConst)
+ self.assertIs(
+ type(msg.stream.trace.cls), bt2_trace_class._TraceClassConst
+ )
+ self.assertIs(
+ type(msg.beginning_default_clock_snapshot),
+ bt2_clock_snapshot._ClockSnapshotConst,
+ )
+ self.assertIs(
+ type(msg.end_default_clock_snapshot),
+ bt2_clock_snapshot._ClockSnapshotConst,
+ )
+ self.assertEqual(msg.stream.addr, self._stream.addr)
+ self.assertEqual(msg.count, 678)
+ self.assertEqual(
+ msg.stream.cls.default_clock_class.addr, self._clock_class.addr
+ )
+ self.assertEqual(msg.beginning_default_clock_snapshot.value, i)
+ self.assertEqual(msg.end_default_clock_snapshot.value, i)
+ elif i == 7:
+ self.assertIs(type(msg), bt2._StreamEndMessageConst)
+ self.assertIs(type(msg.stream), bt2_stream._StreamConst)
+ self.assertEqual(msg.stream.addr, self._stream.addr)
+ self.assertIs(
+ type(msg.default_clock_snapshot), bt2._UnknownClockSnapshot
+ )
+ else:
+ raise Exception
+
+ def test_all_msg_without_cc(self):
+ params = {'with_cc': False}
+ self._src_comp = self._graph.add_component(self._src, 'my_source', params)
+ self._msg_iter = TestOutputPortMessageIterator(
+ self._graph, self._src_comp.output_ports['out']
+ )
+
+ for i, msg in enumerate(self._msg_iter):
+ if i == 0:
+ self.assertIsInstance(msg, bt2._StreamBeginningMessageConst)
+ self.assertIs(type(msg.stream), bt2_stream._StreamConst)
+ self.assertEqual(msg.stream.addr, self._stream.addr)
+ with self.assertRaisesRegex(
+ ValueError, 'stream class has no default clock class'
+ ):
+ msg.default_clock_snapshot
+ elif i == 1:
+ self.assertIsInstance(msg, bt2._PacketBeginningMessageConst)
+ self.assertIs(type(msg.packet), bt2_packet._PacketConst)
+ self.assertEqual(msg.packet.addr, self._packet.addr)
+ elif i == 2:
+ self.assertIsInstance(msg, bt2._EventMessageConst)
+ self.assertIs(type(msg.event), bt2_event._EventConst)
+ self.assertIs(type(msg.event.cls), bt2_event_class._EventClassConst)
+ self.assertEqual(msg.event.cls.addr, self._event_class.addr)
+ with self.assertRaisesRegex(
+ ValueError, 'stream class has no default clock class'
+ ):
+ msg.default_clock_snapshot
+ elif i == 3:
+ self.assertIsInstance(msg, bt2._DiscardedEventsMessageConst)
+ self.assertIs(type(msg.stream), bt2_stream._StreamConst)
+ self.assertIs(type(msg.stream.cls), bt2_stream_class._StreamClassConst)
+ self.assertEqual(msg.stream.addr, self._stream.addr)
+ self.assertEqual(msg.count, 890)
+ self.assertIsNone(msg.stream.cls.default_clock_class)
+ with self.assertRaisesRegex(
+ ValueError,
+ 'such a message has no clock snapshots for this stream class',
+ ):
+ msg.beginning_default_clock_snapshot
+ with self.assertRaisesRegex(
+ ValueError,
+ 'such a message has no clock snapshots for this stream class',
+ ):
+ msg.end_default_clock_snapshot
+ elif i == 4:
+ self.assertIsInstance(msg, bt2._PacketEndMessageConst)
+ self.assertEqual(msg.packet.addr, self._packet.addr)
+ self.assertIs(type(msg.packet), bt2_packet._PacketConst)
+ elif i == 5:
+ self.assertIsInstance(msg, bt2._DiscardedPacketsMessageConst)
+ self.assertIs(type(msg.stream), bt2_stream._StreamConst)
+ self.assertIs(type(msg.stream.cls), bt2_stream_class._StreamClassConst)
+ self.assertIs(
+ type(msg.stream.cls.trace_class), bt2_trace_class._TraceClassConst
+ )
+ self.assertEqual(msg.stream.addr, self._stream.addr)
+ self.assertEqual(msg.count, 678)
+ self.assertIsNone(msg.stream.cls.default_clock_class)
+ with self.assertRaisesRegex(
+ ValueError,
+ 'such a message has no clock snapshots for this stream class',
+ ):
+ msg.beginning_default_clock_snapshot
+ with self.assertRaisesRegex(
+ ValueError,
+ 'such a message has no clock snapshots for this stream class',
+ ):
+ msg.end_default_clock_snapshot
+ elif i == 6:
+ self.assertIsInstance(msg, bt2._StreamEndMessageConst)
+ self.assertIs(type(msg.stream), bt2_stream._StreamConst)
+ self.assertEqual(msg.stream.addr, self._stream.addr)
+ with self.assertRaisesRegex(
+ ValueError, 'stream class has no default clock class'
+ ):
+ msg.default_clock_snapshot
+ else:
+ raise Exception
+
+ def test_msg_stream_with_clock_snapshots(self):
+ params = {'with_cc': True, 'with_stream_msgs_clock_snapshots': True}
+
+ self._src_comp = self._graph.add_component(self._src, 'my_source', params)
+ self._msg_iter = TestOutputPortMessageIterator(
+ self._graph, self._src_comp.output_ports['out']
+ )
+ msgs = list(self._msg_iter)
+
+ msg_stream_beg = msgs[0]
+ self.assertIsInstance(msg_stream_beg, bt2._StreamBeginningMessageConst)
+ self.assertIs(
+ type(msg_stream_beg.default_clock_snapshot),
+ bt2_clock_snapshot._ClockSnapshotConst,
+ )
+ self.assertEqual(msg_stream_beg.default_clock_snapshot.value, 0)
+
+ msg_stream_end = msgs[7]
+ self.assertIsInstance(msg_stream_end, bt2._StreamEndMessageConst)
+ self.assertIs(
+ type(msg_stream_end.default_clock_snapshot),
+ bt2_clock_snapshot._ClockSnapshotConst,
+ )
+ self.assertEqual(msg_stream_end.default_clock_snapshot.value, 7)
+
+ def test_stream_beg_msg(self):
+ msg = utils.get_stream_beginning_message()
+ self.assertIs(type(msg.stream), bt2_stream._Stream)
+
+ def test_stream_end_msg(self):
+ msg = utils.get_stream_end_message()
+ self.assertIs(type(msg.stream), bt2_stream._Stream)
+
+ def test_packet_beg_msg(self):
+ msg = utils.get_packet_beginning_message()
+ self.assertIs(type(msg.packet), bt2_packet._Packet)
+
+ def test_packet_end_msg(self):
+ msg = utils.get_packet_end_message()
+ self.assertIs(type(msg.packet), bt2_packet._Packet)
+
+ def test_event_msg(self):
+ msg = utils.get_event_message()
+ self.assertIs(type(msg.event), bt2_event._Event)
+
+
+class CreateDiscardedEventMessageTestCase(unittest.TestCase):
+ # Most basic case.
+ def test_create(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(supports_discarded_events=True)
+
+ def msg_iter_next(msg_iter, stream):
+ return msg_iter._create_discarded_events_message(stream)
+
+ msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertIs(type(msg), bt2._DiscardedEventsMessage)
+ self.assertIs(msg.count, None)
+
+ # With event count.
+ def test_create_with_count(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(supports_discarded_events=True)
+
+ def msg_iter_next(msg_iter, stream):
+ return msg_iter._create_discarded_events_message(stream, count=242)
+
+ msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertIs(type(msg), bt2._DiscardedEventsMessage)
+ self.assertEqual(msg.count, 242)
+
+ # With event count == 0.
+ def test_create_with_count_zero_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(supports_discarded_events=True)
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError,
+ 'discarded event count is 0',
+ ):
+ msg_iter._create_discarded_events_message(stream, count=0)
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+ # With clock snapshots.
+ def test_create_with_clock_snapshots(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ default_clock_class=cc,
+ supports_discarded_events=True,
+ discarded_events_have_default_clock_snapshots=True,
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ return msg_iter._create_discarded_events_message(
+ stream, beg_clock_snapshot=10, end_clock_snapshot=20
+ )
+
+ msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertIs(type(msg), bt2._DiscardedEventsMessage)
+ self.assertEqual(msg.beginning_default_clock_snapshot, 10)
+ self.assertEqual(msg.end_default_clock_snapshot, 20)
+
+ # Trying to create when the stream does not support discarded events.
+ def test_create_unsupported_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class()
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError, 'stream class does not support discarded events'
+ ):
+ msg_iter._create_discarded_events_message(stream)
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+ # Trying to create with clock snapshots when the stream does not support
+ # them.
+ def test_create_unsupported_clock_snapshots_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(supports_discarded_events=True)
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError,
+ 'discarded events have no default clock snapshots for this stream class',
+ ):
+ msg_iter._create_discarded_events_message(
+ stream, beg_clock_snapshot=10, end_clock_snapshot=20
+ )
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+ # Trying to create without clock snapshots when the stream requires them.
+ def test_create_missing_clock_snapshots_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ default_clock_class=cc,
+ supports_discarded_events=True,
+ discarded_events_have_default_clock_snapshots=True,
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError,
+ 'discarded events have default clock snapshots for this stream class',
+ ):
+ msg_iter._create_discarded_events_message(stream)
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+ # End clock snapshot greater than beginning clock snapshot.
+ def test_create_clock_snapshots_end_gt_begin_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ default_clock_class=cc,
+ supports_discarded_events=True,
+ discarded_events_have_default_clock_snapshots=True,
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError,
+ r'beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)',
+ ):
+ msg_iter._create_discarded_events_message(
+ stream, beg_clock_snapshot=20, end_clock_snapshot=10
+ )
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+
+class CreateDiscardedPacketMessageTestCase(unittest.TestCase):
+ # Most basic case.