-class StreamActivityMessagesTestCase(unittest.TestCase):
- def _test_create_msg(self, with_cc, test_create_beginning_func, test_create_end_func):
- class MyIter(bt2._UserMessageIterator):
- def __init__(self, self_port_output):
- self._at = 0
-
- def __next__(self):
- if self._at == 0:
- msg = self._create_stream_beginning_message(self._component._stream)
- elif self._at == 1:
- msg = test_create_beginning_func(self, self._component._stream)
- elif self._at == 2:
- msg = test_create_end_func(self, self._component._stream)
- elif self._at == 3:
- msg = self._create_stream_end_message(self._component._stream)
- elif self._at >= 4:
- raise bt2.Stop
-
- self._at += 1
- return msg
-
- class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params):
- self._add_output_port('out')
- tc = self._create_trace_class()
-
- if with_cc:
- cc = self._create_clock_class()
- sc = tc.create_stream_class(default_clock_class=cc)
- else:
- sc = tc.create_stream_class()
-
- # Create payload field class
- trace = tc()
- self._stream = trace.create_stream(sc)
-
- graph = bt2.Graph()
- src_comp = graph.add_component(MySrc, 'src')
- msg_iter = graph.create_output_port_message_iterator(src_comp.output_ports['out'])
-
- for msg in msg_iter:
- pass
-
- def test_create_beginning_with_cc_with_known_default_cs(self):
- def create_beginning(msg_iter, stream):
- msg = msg_iter._create_stream_activity_beginning_message(stream, 172)
- self.assertEqual(msg.default_clock_snapshot.value, 172)
- return msg
-
- def create_end(msg_iter, stream):
- return msg_iter._create_stream_activity_end_message(stream, 199)
-
- self._test_create_msg(True, create_beginning, create_end)
-
- def test_create_end_with_cc_with_known_default_cs(self):
- def create_beginning(msg_iter, stream):
- return msg_iter._create_stream_activity_beginning_message(stream, 172)
-
- def create_end(msg_iter, stream):
- msg = msg_iter._create_stream_activity_end_message(stream, 199)
- self.assertEqual(msg.default_clock_snapshot.value, 199)
- return msg
-
- self._test_create_msg(True, create_beginning, create_end)
-
- def test_create_beginning_with_cc_with_unknown_default_cs(self):
- def create_beginning(msg_iter, stream):
- msg = msg_iter._create_stream_activity_beginning_message(stream,
- msg_iter._unknown_clock_snapshot)
- self.assertIsInstance(msg.default_clock_snapshot,
- bt2._UnknownClockSnapshot)
- return msg
-
- def create_end(msg_iter, stream):
- return msg_iter._create_stream_activity_end_message(stream, 199)
-
- self._test_create_msg(True, create_beginning, create_end)
-
- def test_create_end_with_cc_with_unknown_default_cs(self):
- def create_beginning(msg_iter, stream):
- return msg_iter._create_stream_activity_beginning_message(stream, 172)
-
- def create_end(msg_iter, stream):
- msg = msg_iter._create_stream_activity_end_message(stream,
- msg_iter._unknown_clock_snapshot)
- self.assertIsInstance(msg.default_clock_snapshot,
- bt2._UnknownClockSnapshot)
- return msg
-
- self._test_create_msg(True, create_beginning, create_end)
-
- def test_create_beginning_with_cc_with_infinite_default_cs(self):
- def create_beginning(msg_iter, stream):
- msg = msg_iter._create_stream_activity_beginning_message(stream,
- msg_iter._infinite_clock_snapshot)
- self.assertIsInstance(msg.default_clock_snapshot,
- bt2._InfiniteClockSnapshot)
- return msg
-
- def create_end(msg_iter, stream):
- return msg_iter._create_stream_activity_end_message(stream, 199)
-
- self._test_create_msg(True, create_beginning, create_end)
-
- def test_create_end_with_cc_with_infinite_default_cs(self):
- def create_beginning(msg_iter, stream):
- return msg_iter._create_stream_activity_beginning_message(stream, 172)
-
- def create_end(msg_iter, stream):
- msg = msg_iter._create_stream_activity_end_message(stream,
- msg_iter._infinite_clock_snapshot)
- self.assertIsInstance(msg.default_clock_snapshot,
- bt2._InfiniteClockSnapshot)
- return msg
-
- self._test_create_msg(True, create_beginning, create_end)
-
- def test_create_beginning_without_cc_with_known_default_cs(self):
- def create_beginning(msg_iter, stream):
- with self.assertRaises(ValueError):
- msg_iter._create_stream_activity_beginning_message(stream, 172)
-
- return msg_iter._create_stream_activity_beginning_message(stream)
-
- def create_end(msg_iter, stream):
- return msg_iter._create_stream_activity_end_message(stream)
-
- self._test_create_msg(False, create_beginning, create_end)
-
- def test_create_end_without_cc_with_known_default_cs(self):
- def create_beginning(msg_iter, stream):
- return msg_iter._create_stream_activity_beginning_message(stream)
-
- def create_end(msg_iter, stream):
- with self.assertRaises(ValueError):
- msg_iter._create_stream_activity_end_message(stream, 199)
-
- return msg_iter._create_stream_activity_end_message(stream)
-
- self._test_create_msg(False, create_beginning, create_end)
-
- def test_create_beginning_without_cc_with_unknown_default_cs(self):
- def create_beginning(msg_iter, stream):
- msg = msg_iter._create_stream_activity_beginning_message(stream,
- msg_iter._unknown_clock_snapshot)
- self.assertIsInstance(msg.default_clock_snapshot,
- bt2._UnknownClockSnapshot)
- return msg
-
- def create_end(msg_iter, stream):
- return msg_iter._create_stream_activity_end_message(stream)
-
- self._test_create_msg(False, create_beginning, create_end)
-
- def test_create_end_without_cc_with_unknown_default_cs(self):
- def create_beginning(msg_iter, stream):
- return msg_iter._create_stream_activity_beginning_message(stream)
-
- def create_end(msg_iter, stream):
- msg = msg_iter._create_stream_activity_end_message(stream,
- msg_iter._unknown_clock_snapshot)
- self.assertIsInstance(msg.default_clock_snapshot,
- bt2._UnknownClockSnapshot)
- return msg
-
- self._test_create_msg(False, create_beginning, create_end)
-
- def test_create_beginning_without_cc_with_infinite_default_cs(self):
- def create_beginning(msg_iter, stream):
- msg = msg_iter._create_stream_activity_beginning_message(stream,
- msg_iter._infinite_clock_snapshot)
- self.assertIsInstance(msg.default_clock_snapshot,
- bt2._InfiniteClockSnapshot)
- return msg
-
- def create_end(msg_iter, stream):
- return msg_iter._create_stream_activity_end_message(stream)
-
- self._test_create_msg(False, create_beginning, create_end)
-
- def test_create_end_without_cc_with_infinite_default_cs(self):
- def create_beginning(msg_iter, stream):
- return msg_iter._create_stream_activity_beginning_message(stream)
-
- def create_end(msg_iter, stream):
- msg = msg_iter._create_stream_activity_end_message(stream,
- msg_iter._infinite_clock_snapshot)
- self.assertIsInstance(msg.default_clock_snapshot,
- bt2._InfiniteClockSnapshot)
- return msg
-
- self._test_create_msg(False, create_beginning, create_end)
-
- def test_create_beginning_default_cs_wrong_type(self):
- def create_beginning(msg_iter, stream):
- with self.assertRaises(TypeError):
- msg_iter._create_stream_activity_beginning_message(stream, 'infinite')
-
- return msg_iter._create_stream_activity_beginning_message(stream)
-
- def create_end(msg_iter, stream):
- return msg_iter._create_stream_activity_end_message(stream)
-
- self._test_create_msg(False, create_beginning, create_end)
-
- def test_create_end_without_default_cs_wrong_type(self):
- def create_beginning(msg_iter, stream):
- return msg_iter._create_stream_activity_beginning_message(stream)
-
- def create_end(msg_iter, stream):
- with self.assertRaises(TypeError):
- msg_iter._create_stream_activity_end_message(stream, 'unknown')
-
- return msg_iter._create_stream_activity_end_message(stream)
-
- self._test_create_msg(False, create_beginning, create_end)
+ self._src_comp = self._graph.add_component(self._src, 'my_source', params)
+ self._msg_iter = TestOutputPortMessageIterator(
+ self._graph, self._src_comp.output_ports['out']
+ )
+ msgs = list(self._msg_iter)
+
+ msg_stream_beg = msgs[0]
+ self.assertIsInstance(msg_stream_beg, bt2._StreamBeginningMessageConst)
+ self.assertIs(
+ type(msg_stream_beg.default_clock_snapshot),
+ bt2_clock_snapshot._ClockSnapshotConst,
+ )
+ self.assertEqual(msg_stream_beg.default_clock_snapshot.value, 0)
+
+ msg_stream_end = msgs[7]
+ self.assertIsInstance(msg_stream_end, bt2._StreamEndMessageConst)
+ self.assertIs(
+ type(msg_stream_end.default_clock_snapshot),
+ bt2_clock_snapshot._ClockSnapshotConst,
+ )
+ self.assertEqual(msg_stream_end.default_clock_snapshot.value, 7)
+
+ def test_stream_beg_msg(self):
+ msg = utils.get_stream_beginning_message()
+ self.assertIs(type(msg.stream), bt2_stream._Stream)
+
+ def test_stream_end_msg(self):
+ msg = utils.get_stream_end_message()
+ self.assertIs(type(msg.stream), bt2_stream._Stream)
+
+ def test_packet_beg_msg(self):
+ msg = utils.get_packet_beginning_message()
+ self.assertIs(type(msg.packet), bt2_packet._Packet)
+
+ def test_packet_end_msg(self):
+ msg = utils.get_packet_end_message()
+ self.assertIs(type(msg.packet), bt2_packet._Packet)
+
+ def test_event_msg(self):
+ msg = utils.get_event_message()
+ self.assertIs(type(msg.event), bt2_event._Event)
+
+
+class CreateDiscardedEventMessageTestCase(unittest.TestCase):
+ # Most basic case.
+ def test_create(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(supports_discarded_events=True)
+
+ def msg_iter_next(msg_iter, stream):
+ return msg_iter._create_discarded_events_message(stream)
+
+ msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertIs(type(msg), bt2._DiscardedEventsMessage)
+ self.assertIs(msg.count, None)
+
+ # With event count.
+ def test_create_with_count(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(supports_discarded_events=True)
+
+ def msg_iter_next(msg_iter, stream):
+ return msg_iter._create_discarded_events_message(stream, count=242)
+
+ msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertIs(type(msg), bt2._DiscardedEventsMessage)
+ self.assertEqual(msg.count, 242)
+
+ # With event count == 0.
+ def test_create_with_count_zero_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(supports_discarded_events=True)
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError,
+ 'discarded event count is 0',
+ ):
+ msg_iter._create_discarded_events_message(stream, count=0)
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+ # With clock snapshots.
+ def test_create_with_clock_snapshots(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ default_clock_class=cc,
+ supports_discarded_events=True,
+ discarded_events_have_default_clock_snapshots=True,
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ return msg_iter._create_discarded_events_message(
+ stream, beg_clock_snapshot=10, end_clock_snapshot=20
+ )
+
+ msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertIs(type(msg), bt2._DiscardedEventsMessage)
+ self.assertEqual(msg.beginning_default_clock_snapshot, 10)
+ self.assertEqual(msg.end_default_clock_snapshot, 20)
+
+ # Trying to create when the stream does not support discarded events.
+ def test_create_unsupported_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class()
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError, 'stream class does not support discarded events'
+ ):
+ msg_iter._create_discarded_events_message(stream)
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+ # Trying to create with clock snapshots when the stream does not support
+ # them.
+ def test_create_unsupported_clock_snapshots_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(supports_discarded_events=True)
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError,
+ 'discarded events have no default clock snapshots for this stream class',
+ ):
+ msg_iter._create_discarded_events_message(
+ stream, beg_clock_snapshot=10, end_clock_snapshot=20
+ )
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+ # Trying to create without clock snapshots when the stream requires them.
+ def test_create_missing_clock_snapshots_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ default_clock_class=cc,
+ supports_discarded_events=True,
+ discarded_events_have_default_clock_snapshots=True,
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError,
+ 'discarded events have default clock snapshots for this stream class',
+ ):
+ msg_iter._create_discarded_events_message(stream)
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+ # End clock snapshot greater than beginning clock snapshot.
+ def test_create_clock_snapshots_end_gt_begin_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ default_clock_class=cc,
+ supports_discarded_events=True,
+ discarded_events_have_default_clock_snapshots=True,
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError,
+ r'beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)',
+ ):
+ msg_iter._create_discarded_events_message(
+ stream, beg_clock_snapshot=20, end_clock_snapshot=10
+ )
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+
+class CreateDiscardedPacketMessageTestCase(unittest.TestCase):
+ # Most basic case.
+ def test_create(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ supports_packets=True, supports_discarded_packets=True
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ return msg_iter._create_discarded_packets_message(stream)
+
+ msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertIs(type(msg), bt2._DiscardedPacketsMessage)
+ self.assertIs(msg.count, None)
+
+ # With packet count.
+ def test_create_with_count(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ supports_packets=True, supports_discarded_packets=True
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ return msg_iter._create_discarded_packets_message(stream, count=242)
+
+ msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertIs(type(msg), bt2._DiscardedPacketsMessage)
+ self.assertEqual(msg.count, 242)
+
+ # With packet count == 0.
+ def test_create_with_count_zero_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ supports_packets=True, supports_discarded_packets=True
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError,
+ 'discarded packet count is 0',
+ ):
+ msg_iter._create_discarded_packets_message(stream, count=0)
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+ # With clock snapshots.
+ def test_create_with_clock_snapshots(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ default_clock_class=cc,
+ supports_packets=True,
+ supports_discarded_packets=True,
+ discarded_packets_have_default_clock_snapshots=True,
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ return msg_iter._create_discarded_packets_message(
+ stream, beg_clock_snapshot=10, end_clock_snapshot=20
+ )
+
+ msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertIs(type(msg), bt2._DiscardedPacketsMessage)
+ self.assertEqual(msg.beginning_default_clock_snapshot, 10)
+ self.assertEqual(msg.end_default_clock_snapshot, 20)
+
+ # Trying to create when the stream does not support discarded packets.
+ def test_create_unsupported_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ supports_packets=True,
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError, 'stream class does not support discarded packets'
+ ):
+ msg_iter._create_discarded_packets_message(stream)
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+ # Trying to create with clock snapshots when the stream does not support
+ # them.
+ def test_create_unsupported_clock_snapshots_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ supports_packets=True, supports_discarded_packets=True
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError,
+ 'discarded packets have no default clock snapshots for this stream class',
+ ):
+ msg_iter._create_discarded_packets_message(
+ stream, beg_clock_snapshot=10, end_clock_snapshot=20
+ )
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+ # Trying to create without clock snapshots when the stream requires them.
+ def test_create_missing_clock_snapshots_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ default_clock_class=cc,
+ supports_packets=True,
+ supports_discarded_packets=True,
+ discarded_packets_have_default_clock_snapshots=True,
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError,
+ 'discarded packets have default clock snapshots for this stream class',
+ ):
+ msg_iter._create_discarded_packets_message(stream)
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+ # End clock snapshot greater than beginning clock snapshot.
+ def test_create_clock_snapshots_end_gt_begin_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ default_clock_class=cc,
+ supports_packets=True,
+ supports_discarded_packets=True,
+ discarded_packets_have_default_clock_snapshots=True,
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError,
+ r'beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)',
+ ):
+ msg_iter._create_discarded_packets_message(
+ stream, beg_clock_snapshot=20, end_clock_snapshot=10
+ )
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+
+if __name__ == '__main__':
+ unittest.main()