X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=tests%2Fbindings%2Fpython%2Fbt2%2Ftest_message.py;h=11d12a024b09526349698648d7d6db676e4fe42b;hb=f5567ea88d172767b34373bc6e402da8bfd85ef8;hp=c98edea77848edf513afc22627a74aed5c8d9284;hpb=188edac1113dbbb29030681dbde2de0ada742499;p=babeltrace.git diff --git a/tests/bindings/python/bt2/test_message.py b/tests/bindings/python/bt2/test_message.py index c98edea7..11d12a02 100644 --- a/tests/bindings/python/bt2/test_message.py +++ b/tests/bindings/python/bt2/test_message.py @@ -1,58 +1,81 @@ +# SPDX-License-Identifier: GPL-2.0-only # # Copyright (C) 2019 EfficiOS Inc. # -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; only version 2 -# of the License. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. -# -import collections import unittest import bt2 +import utils +from utils import TestOutputPortMessageIterator +from bt2 import clock_snapshot as bt2_clock_snapshot +from bt2 import event as bt2_event +from bt2 import event_class as bt2_event_class +from bt2 import field as bt2_field +from bt2 import packet as bt2_packet +from bt2 import stream as bt2_stream +from bt2 import stream_class as bt2_stream_class +from bt2 import trace as bt2_trace +from bt2 import trace_class as bt2_trace_class class AllMessagesTestCase(unittest.TestCase): def setUp(self): - class MyIter(bt2._UserMessageIterator): - def __init__(self, self_port_output): + def __init__(self, config, self_port_output): self._at = 0 - self._with_stream_msgs_clock_snapshots = self_port_output.user_data.get('with_stream_msgs_clock_snapshots', False) + self._with_stream_msgs_clock_snapshots = self_port_output.user_data.get( + "with_stream_msgs_clock_snapshots", False + ) def __next__(self): if test_obj._clock_class: if self._at == 0: if self._with_stream_msgs_clock_snapshots: - msg = self._create_stream_beginning_message(test_obj._stream, default_clock_snapshot=self._at) + msg = self._create_stream_beginning_message( + test_obj._stream, default_clock_snapshot=self._at + ) else: - msg = self._create_stream_beginning_message(test_obj._stream) + msg = self._create_stream_beginning_message( + test_obj._stream + ) + test_obj.assertIs(type(msg), bt2._StreamBeginningMessage) elif self._at == 1: - msg = self._create_packet_beginning_message(test_obj._packet, self._at) + msg = self._create_packet_beginning_message( + test_obj._packet, self._at + ) + test_obj.assertIs(type(msg), bt2._PacketBeginningMessage) elif self._at == 2: - msg = self._create_event_message(test_obj._event_class, test_obj._packet, self._at) + msg = self._create_event_message( + test_obj._event_class, test_obj._packet, self._at + ) + test_obj.assertIs(type(msg), bt2._EventMessage) elif self._at == 3: - msg = self._create_message_iterator_inactivity_message(test_obj._clock_class, self._at) + msg = self._create_message_iterator_inactivity_message( + test_obj._clock_class, self._at + ) elif self._at == 4: - msg = self._create_discarded_events_message(test_obj._stream, 890, self._at, self._at) + msg = self._create_discarded_events_message( + test_obj._stream, 890, self._at, self._at + ) + test_obj.assertIs(type(msg), bt2._DiscardedEventsMessage) elif self._at == 5: - msg = self._create_packet_end_message(test_obj._packet, self._at) + msg = self._create_packet_end_message( + test_obj._packet, self._at + ) + test_obj.assertIs(type(msg), bt2._PacketEndMessage) elif self._at == 6: - msg = self._create_discarded_packets_message(test_obj._stream, 678, self._at, self._at) + msg = self._create_discarded_packets_message( + test_obj._stream, 678, self._at, self._at + ) + test_obj.assertIs(type(msg), bt2._DiscardedPacketsMessage) elif self._at == 7: if self._with_stream_msgs_clock_snapshots: - msg = self._create_stream_end_message(test_obj._stream, default_clock_snapshot=self._at) + msg = self._create_stream_end_message( + test_obj._stream, default_clock_snapshot=self._at + ) else: msg = self._create_stream_end_message(test_obj._stream) + test_obj.assertIs(type(msg), bt2._StreamEndMessage) elif self._at >= 8: raise bt2.Stop else: @@ -61,13 +84,19 @@ class AllMessagesTestCase(unittest.TestCase): elif self._at == 1: msg = self._create_packet_beginning_message(test_obj._packet) elif self._at == 2: - msg = self._create_event_message(test_obj._event_class, test_obj._packet) + msg = self._create_event_message( + test_obj._event_class, test_obj._packet + ) elif self._at == 3: - msg = self._create_discarded_events_message(test_obj._stream, 890) + msg = self._create_discarded_events_message( + test_obj._stream, 890 + ) elif self._at == 4: msg = self._create_packet_end_message(test_obj._packet) elif self._at == 5: - msg = self._create_discarded_packets_message(test_obj._stream, 678) + msg = self._create_discarded_packets_message( + test_obj._stream, 678 + ) elif self._at == 6: msg = self._create_stream_end_message(test_obj._stream) elif self._at >= 7: @@ -77,32 +106,42 @@ class AllMessagesTestCase(unittest.TestCase): return msg class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter): - def __init__(self, params): - self._add_output_port('out', params) + def __init__(self, config, params, obj): + self._add_output_port("out", params) - with_cc = bool(params['with_cc']) + with_cc = bool(params["with_cc"]) tc = self._create_trace_class() if with_cc: cc = self._create_clock_class() else: cc = None - sc = tc.create_stream_class(default_clock_class=cc, - packets_have_beginning_default_clock_snapshot=with_cc, - packets_have_end_default_clock_snapshot=with_cc, - supports_discarded_events=True, - discarded_events_have_default_clock_snapshots=with_cc, - supports_discarded_packets=True, - discarded_packets_have_default_clock_snapshots=with_cc) + sc = tc.create_stream_class( + default_clock_class=cc, + supports_packets=True, + packets_have_beginning_default_clock_snapshot=with_cc, + packets_have_end_default_clock_snapshot=with_cc, + supports_discarded_events=True, + discarded_events_have_default_clock_snapshots=with_cc, + supports_discarded_packets=True, + discarded_packets_have_default_clock_snapshots=with_cc, + ) # Create payload field class my_int_fc = tc.create_signed_integer_field_class(32) payload_fc = tc.create_structure_field_class() - payload_fc += collections.OrderedDict([ - ('my_int', my_int_fc), - ]) + payload_fc += [("my_int", my_int_fc)] + + # Create specific context field class + my_int_fc = tc.create_signed_integer_field_class(32) + specific_fc = tc.create_structure_field_class() + specific_fc += [("my_int", my_int_fc)] - ec = sc.create_event_class(name='salut', payload_field_class=payload_fc) + ec = sc.create_event_class( + name="salut", + payload_field_class=payload_fc, + specific_context_field_class=specific_fc, + ) trace = tc() stream = trace.create_stream(sc) @@ -120,113 +159,536 @@ class AllMessagesTestCase(unittest.TestCase): self._iter = MyIter def test_all_msg_with_cc(self): - params = {'with_cc': True} - self._src_comp = self._graph.add_component(self._src, 'my_source', params) - self._msg_iter = self._graph.create_output_port_message_iterator(self._src_comp.output_ports['out']) + params = {"with_cc": True} + self._src_comp = self._graph.add_component(self._src, "my_source", params) + self._msg_iter = TestOutputPortMessageIterator( + self._graph, self._src_comp.output_ports["out"] + ) for i, msg in enumerate(self._msg_iter): if i == 0: - self.assertIsInstance(msg, bt2.message._StreamBeginningMessage) + self.assertIs(type(msg), bt2._StreamBeginningMessageConst) + self.assertIs(type(msg.stream), bt2_stream._StreamConst) self.assertEqual(msg.stream.addr, self._stream.addr) - self.assertIsInstance(msg.default_clock_snapshot, bt2.clock_snapshot._UnknownClockSnapshot) + self.assertIsInstance( + msg.default_clock_snapshot, bt2._UnknownClockSnapshot + ) elif i == 1: - self.assertIsInstance(msg, bt2.message._PacketBeginningMessage) + self.assertIs(type(msg), bt2._PacketBeginningMessageConst) + self.assertIs(type(msg.packet), bt2_packet._PacketConst) + self.assertIs( + type(msg.default_clock_snapshot), + bt2_clock_snapshot._ClockSnapshotConst, + ) self.assertEqual(msg.packet.addr, self._packet.addr) self.assertEqual(msg.default_clock_snapshot.value, i) elif i == 2: - self.assertIsInstance(msg, bt2.message._EventMessage) + self.assertIs(type(msg), bt2._EventMessageConst) + self.assertIs(type(msg.event), bt2_event._EventConst) + self.assertIs( + type(msg.default_clock_snapshot), + bt2_clock_snapshot._ClockSnapshotConst, + ) + self.assertIs( + type(msg.event.payload_field), bt2_field._StructureFieldConst + ) + self.assertIs( + type(msg.event.payload_field["my_int"]), + bt2_field._SignedIntegerFieldConst, + ) + self.assertEqual(msg.event.cls.addr, self._event_class.addr) self.assertEqual(msg.default_clock_snapshot.value, i) elif i == 3: - self.assertIsInstance(msg, bt2.message._MessageIteratorInactivityMessage) - self.assertEqual(msg.default_clock_snapshot.value, i) + self.assertIs(type(msg), bt2._MessageIteratorInactivityMessageConst) + self.assertIs( + type(msg.clock_snapshot), bt2_clock_snapshot._ClockSnapshotConst + ) + self.assertEqual(msg.clock_snapshot.value, i) elif i == 4: - self.assertIsInstance(msg, bt2.message._DiscardedEventsMessage) + self.assertIs(type(msg), bt2._DiscardedEventsMessageConst) + self.assertIs(type(msg.stream), bt2_stream._StreamConst) + self.assertIs(type(msg.stream.cls), bt2_stream_class._StreamClassConst) + self.assertIs( + type(msg.beginning_default_clock_snapshot), + bt2_clock_snapshot._ClockSnapshotConst, + ) + self.assertIs( + type(msg.end_default_clock_snapshot), + bt2_clock_snapshot._ClockSnapshotConst, + ) + self.assertEqual(msg.stream.addr, self._stream.addr) self.assertEqual(msg.count, 890) - self.assertEqual(msg.stream.cls.default_clock_class.addr, self._clock_class.addr) + self.assertEqual( + msg.stream.cls.default_clock_class.addr, self._clock_class.addr + ) self.assertEqual(msg.beginning_default_clock_snapshot.value, i) self.assertEqual(msg.end_default_clock_snapshot.value, i) elif i == 5: - self.assertIsInstance(msg, bt2.message._PacketEndMessage) + self.assertIs(type(msg), bt2._PacketEndMessageConst) + self.assertIs(type(msg.packet), bt2_packet._PacketConst) + self.assertIs( + type(msg.default_clock_snapshot), + bt2_clock_snapshot._ClockSnapshotConst, + ) self.assertEqual(msg.packet.addr, self._packet.addr) self.assertEqual(msg.default_clock_snapshot.value, i) elif i == 6: - self.assertIsInstance(msg, bt2.message._DiscardedPacketsMessage) + self.assertIs(type(msg), bt2._DiscardedPacketsMessageConst) + self.assertIs(type(msg.stream), bt2_stream._StreamConst) + self.assertIs(type(msg.stream.trace), bt2_trace._TraceConst) + self.assertIs( + type(msg.stream.trace.cls), bt2_trace_class._TraceClassConst + ) + self.assertIs( + type(msg.beginning_default_clock_snapshot), + bt2_clock_snapshot._ClockSnapshotConst, + ) + self.assertIs( + type(msg.end_default_clock_snapshot), + bt2_clock_snapshot._ClockSnapshotConst, + ) self.assertEqual(msg.stream.addr, self._stream.addr) self.assertEqual(msg.count, 678) - self.assertEqual(msg.stream.cls.default_clock_class.addr, self._clock_class.addr) + self.assertEqual( + msg.stream.cls.default_clock_class.addr, self._clock_class.addr + ) self.assertEqual(msg.beginning_default_clock_snapshot.value, i) self.assertEqual(msg.end_default_clock_snapshot.value, i) elif i == 7: - self.assertIsInstance(msg, bt2.message._StreamEndMessage) + self.assertIs(type(msg), bt2._StreamEndMessageConst) + self.assertIs(type(msg.stream), bt2_stream._StreamConst) self.assertEqual(msg.stream.addr, self._stream.addr) - self.assertIsInstance(msg.default_clock_snapshot, bt2.clock_snapshot._UnknownClockSnapshot) + self.assertIs( + type(msg.default_clock_snapshot), bt2._UnknownClockSnapshot + ) else: raise Exception def test_all_msg_without_cc(self): - params = {'with_cc': False} - self._src_comp = self._graph.add_component(self._src, 'my_source', params) - self._msg_iter = self._graph.create_output_port_message_iterator(self._src_comp.output_ports['out']) + params = {"with_cc": False} + self._src_comp = self._graph.add_component(self._src, "my_source", params) + self._msg_iter = TestOutputPortMessageIterator( + self._graph, self._src_comp.output_ports["out"] + ) for i, msg in enumerate(self._msg_iter): if i == 0: - self.assertIsInstance(msg, bt2.message._StreamBeginningMessage) + self.assertIsInstance(msg, bt2._StreamBeginningMessageConst) + self.assertIs(type(msg.stream), bt2_stream._StreamConst) self.assertEqual(msg.stream.addr, self._stream.addr) - with self.assertRaises(bt2.NonexistentClockSnapshot): + with self.assertRaisesRegex( + ValueError, "stream class has no default clock class" + ): msg.default_clock_snapshot elif i == 1: - self.assertIsInstance(msg, bt2.message._PacketBeginningMessage) + self.assertIsInstance(msg, bt2._PacketBeginningMessageConst) + self.assertIs(type(msg.packet), bt2_packet._PacketConst) self.assertEqual(msg.packet.addr, self._packet.addr) elif i == 2: - self.assertIsInstance(msg, bt2.message._EventMessage) + self.assertIsInstance(msg, bt2._EventMessageConst) + self.assertIs(type(msg.event), bt2_event._EventConst) + self.assertIs(type(msg.event.cls), bt2_event_class._EventClassConst) self.assertEqual(msg.event.cls.addr, self._event_class.addr) - with self.assertRaises(bt2.NonexistentClockSnapshot): + with self.assertRaisesRegex( + ValueError, "stream class has no default clock class" + ): msg.default_clock_snapshot elif i == 3: - self.assertIsInstance(msg, bt2.message._DiscardedEventsMessage) + self.assertIsInstance(msg, bt2._DiscardedEventsMessageConst) + self.assertIs(type(msg.stream), bt2_stream._StreamConst) + self.assertIs(type(msg.stream.cls), bt2_stream_class._StreamClassConst) self.assertEqual(msg.stream.addr, self._stream.addr) self.assertEqual(msg.count, 890) self.assertIsNone(msg.stream.cls.default_clock_class) - with self.assertRaises(bt2.NonexistentClockSnapshot): + with self.assertRaisesRegex( + ValueError, + "such a message has no clock snapshots for this stream class", + ): msg.beginning_default_clock_snapshot - with self.assertRaises(bt2.NonexistentClockSnapshot): + with self.assertRaisesRegex( + ValueError, + "such a message has no clock snapshots for this stream class", + ): msg.end_default_clock_snapshot elif i == 4: - self.assertIsInstance(msg, bt2.message._PacketEndMessage) + self.assertIsInstance(msg, bt2._PacketEndMessageConst) self.assertEqual(msg.packet.addr, self._packet.addr) + self.assertIs(type(msg.packet), bt2_packet._PacketConst) elif i == 5: - self.assertIsInstance(msg, bt2.message._DiscardedPacketsMessage) + self.assertIsInstance(msg, bt2._DiscardedPacketsMessageConst) + self.assertIs(type(msg.stream), bt2_stream._StreamConst) + self.assertIs(type(msg.stream.cls), bt2_stream_class._StreamClassConst) + self.assertIs( + type(msg.stream.cls.trace_class), bt2_trace_class._TraceClassConst + ) self.assertEqual(msg.stream.addr, self._stream.addr) self.assertEqual(msg.count, 678) self.assertIsNone(msg.stream.cls.default_clock_class) - with self.assertRaises(bt2.NonexistentClockSnapshot): + with self.assertRaisesRegex( + ValueError, + "such a message has no clock snapshots for this stream class", + ): msg.beginning_default_clock_snapshot - with self.assertRaises(bt2.NonexistentClockSnapshot): + with self.assertRaisesRegex( + ValueError, + "such a message has no clock snapshots for this stream class", + ): msg.end_default_clock_snapshot elif i == 6: - self.assertIsInstance(msg, bt2.message._StreamEndMessage) + self.assertIsInstance(msg, bt2._StreamEndMessageConst) + self.assertIs(type(msg.stream), bt2_stream._StreamConst) self.assertEqual(msg.stream.addr, self._stream.addr) - with self.assertRaises(bt2.NonexistentClockSnapshot): + with self.assertRaisesRegex( + ValueError, "stream class has no default clock class" + ): msg.default_clock_snapshot else: raise Exception def test_msg_stream_with_clock_snapshots(self): - params = { - 'with_cc': True, - 'with_stream_msgs_clock_snapshots': True, - } + params = {"with_cc": True, "with_stream_msgs_clock_snapshots": True} - self._src_comp = self._graph.add_component(self._src, 'my_source', params) - self._msg_iter = self._graph.create_output_port_message_iterator(self._src_comp.output_ports['out']) + self._src_comp = self._graph.add_component(self._src, "my_source", params) + self._msg_iter = TestOutputPortMessageIterator( + self._graph, self._src_comp.output_ports["out"] + ) msgs = list(self._msg_iter) msg_stream_beg = msgs[0] - self.assertIsInstance(msg_stream_beg, bt2.message._StreamBeginningMessage) + self.assertIsInstance(msg_stream_beg, bt2._StreamBeginningMessageConst) + self.assertIs( + type(msg_stream_beg.default_clock_snapshot), + bt2_clock_snapshot._ClockSnapshotConst, + ) self.assertEqual(msg_stream_beg.default_clock_snapshot.value, 0) msg_stream_end = msgs[7] - self.assertIsInstance(msg_stream_end, bt2.message._StreamEndMessage) + self.assertIsInstance(msg_stream_end, bt2._StreamEndMessageConst) + self.assertIs( + type(msg_stream_end.default_clock_snapshot), + bt2_clock_snapshot._ClockSnapshotConst, + ) self.assertEqual(msg_stream_end.default_clock_snapshot.value, 7) + + def test_stream_beg_msg(self): + msg = utils.get_stream_beginning_message() + self.assertIs(type(msg.stream), bt2_stream._Stream) + + def test_stream_end_msg(self): + msg = utils.get_stream_end_message() + self.assertIs(type(msg.stream), bt2_stream._Stream) + + def test_packet_beg_msg(self): + msg = utils.get_packet_beginning_message() + self.assertIs(type(msg.packet), bt2_packet._Packet) + + def test_packet_end_msg(self): + msg = utils.get_packet_end_message() + self.assertIs(type(msg.packet), bt2_packet._Packet) + + def test_event_msg(self): + msg = utils.get_event_message() + self.assertIs(type(msg.event), bt2_event._Event) + + +class CreateDiscardedEventMessageTestCase(unittest.TestCase): + # Most basic case. + def test_create(self): + def create_stream_class(tc, cc): + return tc.create_stream_class(supports_discarded_events=True) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_events_message(stream) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedEventsMessage) + self.assertIs(msg.count, None) + + # With event count. + def test_create_with_count(self): + def create_stream_class(tc, cc): + return tc.create_stream_class(supports_discarded_events=True) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_events_message(stream, count=242) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedEventsMessage) + self.assertEqual(msg.count, 242) + + # With event count == 0. + def test_create_with_count_zero_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class(supports_discarded_events=True) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + "discarded event count is 0", + ): + msg_iter._create_discarded_events_message(stream, count=0) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # With clock snapshots. + def test_create_with_clock_snapshots(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + default_clock_class=cc, + supports_discarded_events=True, + discarded_events_have_default_clock_snapshots=True, + ) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_events_message( + stream, beg_clock_snapshot=10, end_clock_snapshot=20 + ) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedEventsMessage) + self.assertEqual(msg.beginning_default_clock_snapshot, 10) + self.assertEqual(msg.end_default_clock_snapshot, 20) + + # Trying to create when the stream does not support discarded events. + def test_create_unsupported_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class() + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, "stream class does not support discarded events" + ): + msg_iter._create_discarded_events_message(stream) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # Trying to create with clock snapshots when the stream does not support + # them. + def test_create_unsupported_clock_snapshots_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class(supports_discarded_events=True) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + "discarded events have no default clock snapshots for this stream class", + ): + msg_iter._create_discarded_events_message( + stream, beg_clock_snapshot=10, end_clock_snapshot=20 + ) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # Trying to create without clock snapshots when the stream requires them. + def test_create_missing_clock_snapshots_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + default_clock_class=cc, + supports_discarded_events=True, + discarded_events_have_default_clock_snapshots=True, + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + "discarded events have default clock snapshots for this stream class", + ): + msg_iter._create_discarded_events_message(stream) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # End clock snapshot greater than beginning clock snapshot. + def test_create_clock_snapshots_end_gt_begin_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + default_clock_class=cc, + supports_discarded_events=True, + discarded_events_have_default_clock_snapshots=True, + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + r"beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)", + ): + msg_iter._create_discarded_events_message( + stream, beg_clock_snapshot=20, end_clock_snapshot=10 + ) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + +class CreateDiscardedPacketMessageTestCase(unittest.TestCase): + # Most basic case. + def test_create(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + supports_packets=True, supports_discarded_packets=True + ) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_packets_message(stream) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedPacketsMessage) + self.assertIs(msg.count, None) + + # With packet count. + def test_create_with_count(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + supports_packets=True, supports_discarded_packets=True + ) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_packets_message(stream, count=242) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedPacketsMessage) + self.assertEqual(msg.count, 242) + + # With packet count == 0. + def test_create_with_count_zero_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + supports_packets=True, supports_discarded_packets=True + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + "discarded packet count is 0", + ): + msg_iter._create_discarded_packets_message(stream, count=0) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # With clock snapshots. + def test_create_with_clock_snapshots(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + default_clock_class=cc, + supports_packets=True, + supports_discarded_packets=True, + discarded_packets_have_default_clock_snapshots=True, + ) + + def msg_iter_next(msg_iter, stream): + return msg_iter._create_discarded_packets_message( + stream, beg_clock_snapshot=10, end_clock_snapshot=20 + ) + + msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertIs(type(msg), bt2._DiscardedPacketsMessage) + self.assertEqual(msg.beginning_default_clock_snapshot, 10) + self.assertEqual(msg.end_default_clock_snapshot, 20) + + # Trying to create when the stream does not support discarded packets. + def test_create_unsupported_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + supports_packets=True, + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, "stream class does not support discarded packets" + ): + msg_iter._create_discarded_packets_message(stream) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # Trying to create with clock snapshots when the stream does not support + # them. + def test_create_unsupported_clock_snapshots_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + supports_packets=True, supports_discarded_packets=True + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + "discarded packets have no default clock snapshots for this stream class", + ): + msg_iter._create_discarded_packets_message( + stream, beg_clock_snapshot=10, end_clock_snapshot=20 + ) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # Trying to create without clock snapshots when the stream requires them. + def test_create_missing_clock_snapshots_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + default_clock_class=cc, + supports_packets=True, + supports_discarded_packets=True, + discarded_packets_have_default_clock_snapshots=True, + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + "discarded packets have default clock snapshots for this stream class", + ): + msg_iter._create_discarded_packets_message(stream) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + # End clock snapshot greater than beginning clock snapshot. + def test_create_clock_snapshots_end_gt_begin_raises(self): + def create_stream_class(tc, cc): + return tc.create_stream_class( + default_clock_class=cc, + supports_packets=True, + supports_discarded_packets=True, + discarded_packets_have_default_clock_snapshots=True, + ) + + def msg_iter_next(msg_iter, stream): + with self.assertRaisesRegex( + ValueError, + r"beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)", + ): + msg_iter._create_discarded_packets_message( + stream, beg_clock_snapshot=20, end_clock_snapshot=10 + ) + + return 123 + + res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next) + self.assertEqual(res, 123) + + +if __name__ == "__main__": + unittest.main()