1 # SPDX-License-Identifier: GPL-2.0-only
3 # Copyright (C) 2019 EfficiOS Inc.
10 from bt2
import event
as bt2_event
11 from bt2
import field
as bt2_field
12 from bt2
import trace
as bt2_trace
13 from bt2
import packet
as bt2_packet
14 from bt2
import stream
as bt2_stream
15 from bt2
import event_class
as bt2_event_class
16 from bt2
import trace_class
as bt2_trace_class
17 from bt2
import stream_class
as bt2_stream_class
18 from bt2
import clock_snapshot
as bt2_clock_snapshot
19 from utils
import TestOutputPortMessageIterator
22 class AllMessagesTestCase(unittest
.TestCase
):
24 class MyIter(bt2
._UserMessageIterator
):
25 def __init__(self
, config
, self_port_output
):
27 self
._with
_stream
_msgs
_clock
_snapshots
= self_port_output
.user_data
.get(
28 "with_stream_msgs_clock_snapshots", False
32 if test_obj
._clock
_class
:
34 if self
._with
_stream
_msgs
_clock
_snapshots
:
35 msg
= self
._create
_stream
_beginning
_message
(
36 test_obj
._stream
, default_clock_snapshot
=self
._at
39 msg
= self
._create
_stream
_beginning
_message
(
42 test_obj
.assertIs(type(msg
), bt2
._StreamBeginningMessage
)
44 msg
= self
._create
_packet
_beginning
_message
(
45 test_obj
._packet
, self
._at
47 test_obj
.assertIs(type(msg
), bt2
._PacketBeginningMessage
)
49 msg
= self
._create
_event
_message
(
50 test_obj
._event
_class
, test_obj
._packet
, self
._at
52 test_obj
.assertIs(type(msg
), bt2
._EventMessage
)
54 msg
= self
._create
_message
_iterator
_inactivity
_message
(
55 test_obj
._clock
_class
, self
._at
58 msg
= self
._create
_discarded
_events
_message
(
59 test_obj
._stream
, 890, self
._at
, self
._at
61 test_obj
.assertIs(type(msg
), bt2
._DiscardedEventsMessage
)
63 msg
= self
._create
_packet
_end
_message
(
64 test_obj
._packet
, self
._at
66 test_obj
.assertIs(type(msg
), bt2
._PacketEndMessage
)
68 msg
= self
._create
_discarded
_packets
_message
(
69 test_obj
._stream
, 678, self
._at
, self
._at
71 test_obj
.assertIs(type(msg
), bt2
._DiscardedPacketsMessage
)
73 if self
._with
_stream
_msgs
_clock
_snapshots
:
74 msg
= self
._create
_stream
_end
_message
(
75 test_obj
._stream
, default_clock_snapshot
=self
._at
78 msg
= self
._create
_stream
_end
_message
(test_obj
._stream
)
79 test_obj
.assertIs(type(msg
), bt2
._StreamEndMessage
)
84 msg
= self
._create
_stream
_beginning
_message
(test_obj
._stream
)
86 msg
= self
._create
_packet
_beginning
_message
(test_obj
._packet
)
88 msg
= self
._create
_event
_message
(
89 test_obj
._event
_class
, test_obj
._packet
92 msg
= self
._create
_discarded
_events
_message
(
96 msg
= self
._create
_packet
_end
_message
(test_obj
._packet
)
98 msg
= self
._create
_discarded
_packets
_message
(
102 msg
= self
._create
_stream
_end
_message
(test_obj
._stream
)
109 class MySrc(bt2
._UserSourceComponent
, message_iterator_class
=MyIter
):
110 def __init__(self
, config
, params
, obj
):
111 self
._add
_output
_port
("out", params
)
113 with_cc
= bool(params
["with_cc"])
114 tc
= self
._create
_trace
_class
()
116 cc
= self
._create
_clock
_class
()
120 sc
= tc
.create_stream_class(
121 default_clock_class
=cc
,
122 supports_packets
=True,
123 packets_have_beginning_default_clock_snapshot
=with_cc
,
124 packets_have_end_default_clock_snapshot
=with_cc
,
125 supports_discarded_events
=True,
126 discarded_events_have_default_clock_snapshots
=with_cc
,
127 supports_discarded_packets
=True,
128 discarded_packets_have_default_clock_snapshots
=with_cc
,
131 # Create payload field class
132 my_int_fc
= tc
.create_signed_integer_field_class(32)
133 payload_fc
= tc
.create_structure_field_class()
134 payload_fc
+= [("my_int", my_int_fc
)]
136 # Create specific context field class
137 my_int_fc
= tc
.create_signed_integer_field_class(32)
138 specific_fc
= tc
.create_structure_field_class()
139 specific_fc
+= [("my_int", my_int_fc
)]
141 ec
= sc
.create_event_class(
143 payload_field_class
=payload_fc
,
144 specific_context_field_class
=specific_fc
,
148 stream
= trace
.create_stream(sc
)
149 packet
= stream
.create_packet()
151 test_obj
._trace
= trace
152 test_obj
._stream
= stream
153 test_obj
._packet
= packet
154 test_obj
._event
_class
= ec
155 test_obj
._clock
_class
= cc
158 self
._graph
= bt2
.Graph()
162 def test_all_msg_with_cc(self
):
163 params
= {"with_cc": True}
164 self
._src
_comp
= self
._graph
.add_component(self
._src
, "my_source", params
)
165 self
._msg
_iter
= TestOutputPortMessageIterator(
166 self
._graph
, self
._src
_comp
.output_ports
["out"]
169 for i
, msg
in enumerate(self
._msg
_iter
):
171 self
.assertIs(type(msg
), bt2
._StreamBeginningMessageConst
)
172 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
173 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
174 self
.assertIsInstance(
175 msg
.default_clock_snapshot
, bt2
._UnknownClockSnapshot
178 self
.assertIs(type(msg
), bt2
._PacketBeginningMessageConst
)
179 self
.assertIs(type(msg
.packet
), bt2_packet
._PacketConst
)
181 type(msg
.default_clock_snapshot
),
182 bt2_clock_snapshot
._ClockSnapshotConst
,
184 self
.assertEqual(msg
.packet
.addr
, self
._packet
.addr
)
185 self
.assertEqual(msg
.default_clock_snapshot
.value
, i
)
187 self
.assertIs(type(msg
), bt2
._EventMessageConst
)
188 self
.assertIs(type(msg
.event
), bt2_event
._EventConst
)
190 type(msg
.default_clock_snapshot
),
191 bt2_clock_snapshot
._ClockSnapshotConst
,
194 type(msg
.event
.payload_field
), bt2_field
._StructureFieldConst
197 type(msg
.event
.payload_field
["my_int"]),
198 bt2_field
._SignedIntegerFieldConst
,
201 self
.assertEqual(msg
.event
.cls
.addr
, self
._event
_class
.addr
)
202 self
.assertEqual(msg
.default_clock_snapshot
.value
, i
)
204 self
.assertIs(type(msg
), bt2
._MessageIteratorInactivityMessageConst
)
206 type(msg
.clock_snapshot
), bt2_clock_snapshot
._ClockSnapshotConst
208 self
.assertEqual(msg
.clock_snapshot
.value
, i
)
210 self
.assertIs(type(msg
), bt2
._DiscardedEventsMessageConst
)
211 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
212 self
.assertIs(type(msg
.stream
.cls
), bt2_stream_class
._StreamClassConst
)
214 type(msg
.beginning_default_clock_snapshot
),
215 bt2_clock_snapshot
._ClockSnapshotConst
,
218 type(msg
.end_default_clock_snapshot
),
219 bt2_clock_snapshot
._ClockSnapshotConst
,
222 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
223 self
.assertEqual(msg
.count
, 890)
225 msg
.stream
.cls
.default_clock_class
.addr
, self
._clock
_class
.addr
227 self
.assertEqual(msg
.beginning_default_clock_snapshot
.value
, i
)
228 self
.assertEqual(msg
.end_default_clock_snapshot
.value
, i
)
230 self
.assertIs(type(msg
), bt2
._PacketEndMessageConst
)
231 self
.assertIs(type(msg
.packet
), bt2_packet
._PacketConst
)
233 type(msg
.default_clock_snapshot
),
234 bt2_clock_snapshot
._ClockSnapshotConst
,
236 self
.assertEqual(msg
.packet
.addr
, self
._packet
.addr
)
237 self
.assertEqual(msg
.default_clock_snapshot
.value
, i
)
239 self
.assertIs(type(msg
), bt2
._DiscardedPacketsMessageConst
)
240 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
241 self
.assertIs(type(msg
.stream
.trace
), bt2_trace
._TraceConst
)
243 type(msg
.stream
.trace
.cls
), bt2_trace_class
._TraceClassConst
246 type(msg
.beginning_default_clock_snapshot
),
247 bt2_clock_snapshot
._ClockSnapshotConst
,
250 type(msg
.end_default_clock_snapshot
),
251 bt2_clock_snapshot
._ClockSnapshotConst
,
253 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
254 self
.assertEqual(msg
.count
, 678)
256 msg
.stream
.cls
.default_clock_class
.addr
, self
._clock
_class
.addr
258 self
.assertEqual(msg
.beginning_default_clock_snapshot
.value
, i
)
259 self
.assertEqual(msg
.end_default_clock_snapshot
.value
, i
)
261 self
.assertIs(type(msg
), bt2
._StreamEndMessageConst
)
262 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
263 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
265 type(msg
.default_clock_snapshot
), bt2
._UnknownClockSnapshot
270 def test_all_msg_without_cc(self
):
271 params
= {"with_cc": False}
272 self
._src
_comp
= self
._graph
.add_component(self
._src
, "my_source", params
)
273 self
._msg
_iter
= TestOutputPortMessageIterator(
274 self
._graph
, self
._src
_comp
.output_ports
["out"]
277 for i
, msg
in enumerate(self
._msg
_iter
):
279 self
.assertIsInstance(msg
, bt2
._StreamBeginningMessageConst
)
280 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
281 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
282 with self
.assertRaisesRegex(
283 ValueError, "stream class has no default clock class"
285 msg
.default_clock_snapshot
287 self
.assertIsInstance(msg
, bt2
._PacketBeginningMessageConst
)
288 self
.assertIs(type(msg
.packet
), bt2_packet
._PacketConst
)
289 self
.assertEqual(msg
.packet
.addr
, self
._packet
.addr
)
291 self
.assertIsInstance(msg
, bt2
._EventMessageConst
)
292 self
.assertIs(type(msg
.event
), bt2_event
._EventConst
)
293 self
.assertIs(type(msg
.event
.cls
), bt2_event_class
._EventClassConst
)
294 self
.assertEqual(msg
.event
.cls
.addr
, self
._event
_class
.addr
)
295 with self
.assertRaisesRegex(
296 ValueError, "stream class has no default clock class"
298 msg
.default_clock_snapshot
300 self
.assertIsInstance(msg
, bt2
._DiscardedEventsMessageConst
)
301 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
302 self
.assertIs(type(msg
.stream
.cls
), bt2_stream_class
._StreamClassConst
)
303 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
304 self
.assertEqual(msg
.count
, 890)
305 self
.assertIsNone(msg
.stream
.cls
.default_clock_class
)
306 with self
.assertRaisesRegex(
308 "such a message has no clock snapshots for this stream class",
310 msg
.beginning_default_clock_snapshot
311 with self
.assertRaisesRegex(
313 "such a message has no clock snapshots for this stream class",
315 msg
.end_default_clock_snapshot
317 self
.assertIsInstance(msg
, bt2
._PacketEndMessageConst
)
318 self
.assertEqual(msg
.packet
.addr
, self
._packet
.addr
)
319 self
.assertIs(type(msg
.packet
), bt2_packet
._PacketConst
)
321 self
.assertIsInstance(msg
, bt2
._DiscardedPacketsMessageConst
)
322 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
323 self
.assertIs(type(msg
.stream
.cls
), bt2_stream_class
._StreamClassConst
)
325 type(msg
.stream
.cls
.trace_class
), bt2_trace_class
._TraceClassConst
327 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
328 self
.assertEqual(msg
.count
, 678)
329 self
.assertIsNone(msg
.stream
.cls
.default_clock_class
)
330 with self
.assertRaisesRegex(
332 "such a message has no clock snapshots for this stream class",
334 msg
.beginning_default_clock_snapshot
335 with self
.assertRaisesRegex(
337 "such a message has no clock snapshots for this stream class",
339 msg
.end_default_clock_snapshot
341 self
.assertIsInstance(msg
, bt2
._StreamEndMessageConst
)
342 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
343 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
344 with self
.assertRaisesRegex(
345 ValueError, "stream class has no default clock class"
347 msg
.default_clock_snapshot
351 def test_msg_stream_with_clock_snapshots(self
):
352 params
= {"with_cc": True, "with_stream_msgs_clock_snapshots": True}
354 self
._src
_comp
= self
._graph
.add_component(self
._src
, "my_source", params
)
355 self
._msg
_iter
= TestOutputPortMessageIterator(
356 self
._graph
, self
._src
_comp
.output_ports
["out"]
358 msgs
= list(self
._msg
_iter
)
360 msg_stream_beg
= msgs
[0]
361 self
.assertIsInstance(msg_stream_beg
, bt2
._StreamBeginningMessageConst
)
363 type(msg_stream_beg
.default_clock_snapshot
),
364 bt2_clock_snapshot
._ClockSnapshotConst
,
366 self
.assertEqual(msg_stream_beg
.default_clock_snapshot
.value
, 0)
368 msg_stream_end
= msgs
[7]
369 self
.assertIsInstance(msg_stream_end
, bt2
._StreamEndMessageConst
)
371 type(msg_stream_end
.default_clock_snapshot
),
372 bt2_clock_snapshot
._ClockSnapshotConst
,
374 self
.assertEqual(msg_stream_end
.default_clock_snapshot
.value
, 7)
376 def test_stream_beg_msg(self
):
377 msg
= utils
.get_stream_beginning_message()
378 self
.assertIs(type(msg
.stream
), bt2_stream
._Stream
)
380 def test_stream_end_msg(self
):
381 msg
= utils
.get_stream_end_message()
382 self
.assertIs(type(msg
.stream
), bt2_stream
._Stream
)
384 def test_packet_beg_msg(self
):
385 msg
= utils
.get_packet_beginning_message()
386 self
.assertIs(type(msg
.packet
), bt2_packet
._Packet
)
388 def test_packet_end_msg(self
):
389 msg
= utils
.get_packet_end_message()
390 self
.assertIs(type(msg
.packet
), bt2_packet
._Packet
)
392 def test_event_msg(self
):
393 msg
= utils
.get_event_message()
394 self
.assertIs(type(msg
.event
), bt2_event
._Event
)
397 class CreateDiscardedEventMessageTestCase(unittest
.TestCase
):
399 def test_create(self
):
400 def create_stream_class(tc
, cc
):
401 return tc
.create_stream_class(supports_discarded_events
=True)
403 def msg_iter_next(msg_iter
, stream
):
404 return msg_iter
._create
_discarded
_events
_message
(stream
)
406 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
407 self
.assertIs(type(msg
), bt2
._DiscardedEventsMessage
)
408 self
.assertIs(msg
.count
, None)
411 def test_create_with_count(self
):
412 def create_stream_class(tc
, cc
):
413 return tc
.create_stream_class(supports_discarded_events
=True)
415 def msg_iter_next(msg_iter
, stream
):
416 return msg_iter
._create
_discarded
_events
_message
(stream
, count
=242)
418 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
419 self
.assertIs(type(msg
), bt2
._DiscardedEventsMessage
)
420 self
.assertEqual(msg
.count
, 242)
422 # With event count == 0.
423 def test_create_with_count_zero_raises(self
):
424 def create_stream_class(tc
, cc
):
425 return tc
.create_stream_class(supports_discarded_events
=True)
427 def msg_iter_next(msg_iter
, stream
):
428 with self
.assertRaisesRegex(
430 "discarded event count is 0",
432 msg_iter
._create
_discarded
_events
_message
(stream
, count
=0)
436 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
437 self
.assertEqual(res
, 123)
439 # With clock snapshots.
440 def test_create_with_clock_snapshots(self
):
441 def create_stream_class(tc
, cc
):
442 return tc
.create_stream_class(
443 default_clock_class
=cc
,
444 supports_discarded_events
=True,
445 discarded_events_have_default_clock_snapshots
=True,
448 def msg_iter_next(msg_iter
, stream
):
449 return msg_iter
._create
_discarded
_events
_message
(
450 stream
, beg_clock_snapshot
=10, end_clock_snapshot
=20
453 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
454 self
.assertIs(type(msg
), bt2
._DiscardedEventsMessage
)
455 self
.assertEqual(msg
.beginning_default_clock_snapshot
, 10)
456 self
.assertEqual(msg
.end_default_clock_snapshot
, 20)
458 # Trying to create when the stream does not support discarded events.
459 def test_create_unsupported_raises(self
):
460 def create_stream_class(tc
, cc
):
461 return tc
.create_stream_class()
463 def msg_iter_next(msg_iter
, stream
):
464 with self
.assertRaisesRegex(
465 ValueError, "stream class does not support discarded events"
467 msg_iter
._create
_discarded
_events
_message
(stream
)
471 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
472 self
.assertEqual(res
, 123)
474 # Trying to create with clock snapshots when the stream does not support
476 def test_create_unsupported_clock_snapshots_raises(self
):
477 def create_stream_class(tc
, cc
):
478 return tc
.create_stream_class(supports_discarded_events
=True)
480 def msg_iter_next(msg_iter
, stream
):
481 with self
.assertRaisesRegex(
483 "discarded events have no default clock snapshots for this stream class",
485 msg_iter
._create
_discarded
_events
_message
(
486 stream
, beg_clock_snapshot
=10, end_clock_snapshot
=20
491 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
492 self
.assertEqual(res
, 123)
494 # Trying to create without clock snapshots when the stream requires them.
495 def test_create_missing_clock_snapshots_raises(self
):
496 def create_stream_class(tc
, cc
):
497 return tc
.create_stream_class(
498 default_clock_class
=cc
,
499 supports_discarded_events
=True,
500 discarded_events_have_default_clock_snapshots
=True,
503 def msg_iter_next(msg_iter
, stream
):
504 with self
.assertRaisesRegex(
506 "discarded events have default clock snapshots for this stream class",
508 msg_iter
._create
_discarded
_events
_message
(stream
)
512 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
513 self
.assertEqual(res
, 123)
515 # End clock snapshot greater than beginning clock snapshot.
516 def test_create_clock_snapshots_end_gt_begin_raises(self
):
517 def create_stream_class(tc
, cc
):
518 return tc
.create_stream_class(
519 default_clock_class
=cc
,
520 supports_discarded_events
=True,
521 discarded_events_have_default_clock_snapshots
=True,
524 def msg_iter_next(msg_iter
, stream
):
525 with self
.assertRaisesRegex(
527 r
"beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)",
529 msg_iter
._create
_discarded
_events
_message
(
530 stream
, beg_clock_snapshot
=20, end_clock_snapshot
=10
535 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
536 self
.assertEqual(res
, 123)
539 class CreateDiscardedPacketMessageTestCase(unittest
.TestCase
):
541 def test_create(self
):
542 def create_stream_class(tc
, cc
):
543 return tc
.create_stream_class(
544 supports_packets
=True, supports_discarded_packets
=True
547 def msg_iter_next(msg_iter
, stream
):
548 return msg_iter
._create
_discarded
_packets
_message
(stream
)
550 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
551 self
.assertIs(type(msg
), bt2
._DiscardedPacketsMessage
)
552 self
.assertIs(msg
.count
, None)
555 def test_create_with_count(self
):
556 def create_stream_class(tc
, cc
):
557 return tc
.create_stream_class(
558 supports_packets
=True, supports_discarded_packets
=True
561 def msg_iter_next(msg_iter
, stream
):
562 return msg_iter
._create
_discarded
_packets
_message
(stream
, count
=242)
564 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
565 self
.assertIs(type(msg
), bt2
._DiscardedPacketsMessage
)
566 self
.assertEqual(msg
.count
, 242)
568 # With packet count == 0.
569 def test_create_with_count_zero_raises(self
):
570 def create_stream_class(tc
, cc
):
571 return tc
.create_stream_class(
572 supports_packets
=True, supports_discarded_packets
=True
575 def msg_iter_next(msg_iter
, stream
):
576 with self
.assertRaisesRegex(
578 "discarded packet count is 0",
580 msg_iter
._create
_discarded
_packets
_message
(stream
, count
=0)
584 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
585 self
.assertEqual(res
, 123)
587 # With clock snapshots.
588 def test_create_with_clock_snapshots(self
):
589 def create_stream_class(tc
, cc
):
590 return tc
.create_stream_class(
591 default_clock_class
=cc
,
592 supports_packets
=True,
593 supports_discarded_packets
=True,
594 discarded_packets_have_default_clock_snapshots
=True,
597 def msg_iter_next(msg_iter
, stream
):
598 return msg_iter
._create
_discarded
_packets
_message
(
599 stream
, beg_clock_snapshot
=10, end_clock_snapshot
=20
602 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
603 self
.assertIs(type(msg
), bt2
._DiscardedPacketsMessage
)
604 self
.assertEqual(msg
.beginning_default_clock_snapshot
, 10)
605 self
.assertEqual(msg
.end_default_clock_snapshot
, 20)
607 # Trying to create when the stream does not support discarded packets.
608 def test_create_unsupported_raises(self
):
609 def create_stream_class(tc
, cc
):
610 return tc
.create_stream_class(
611 supports_packets
=True,
614 def msg_iter_next(msg_iter
, stream
):
615 with self
.assertRaisesRegex(
616 ValueError, "stream class does not support discarded packets"
618 msg_iter
._create
_discarded
_packets
_message
(stream
)
622 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
623 self
.assertEqual(res
, 123)
625 # Trying to create with clock snapshots when the stream does not support
627 def test_create_unsupported_clock_snapshots_raises(self
):
628 def create_stream_class(tc
, cc
):
629 return tc
.create_stream_class(
630 supports_packets
=True, supports_discarded_packets
=True
633 def msg_iter_next(msg_iter
, stream
):
634 with self
.assertRaisesRegex(
636 "discarded packets have no default clock snapshots for this stream class",
638 msg_iter
._create
_discarded
_packets
_message
(
639 stream
, beg_clock_snapshot
=10, end_clock_snapshot
=20
644 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
645 self
.assertEqual(res
, 123)
647 # Trying to create without clock snapshots when the stream requires them.
648 def test_create_missing_clock_snapshots_raises(self
):
649 def create_stream_class(tc
, cc
):
650 return tc
.create_stream_class(
651 default_clock_class
=cc
,
652 supports_packets
=True,
653 supports_discarded_packets
=True,
654 discarded_packets_have_default_clock_snapshots
=True,
657 def msg_iter_next(msg_iter
, stream
):
658 with self
.assertRaisesRegex(
660 "discarded packets have default clock snapshots for this stream class",
662 msg_iter
._create
_discarded
_packets
_message
(stream
)
666 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
667 self
.assertEqual(res
, 123)
669 # End clock snapshot greater than beginning clock snapshot.
670 def test_create_clock_snapshots_end_gt_begin_raises(self
):
671 def create_stream_class(tc
, cc
):
672 return tc
.create_stream_class(
673 default_clock_class
=cc
,
674 supports_packets
=True,
675 supports_discarded_packets
=True,
676 discarded_packets_have_default_clock_snapshots
=True,
679 def msg_iter_next(msg_iter
, stream
):
680 with self
.assertRaisesRegex(
682 r
"beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)",
684 msg_iter
._create
_discarded
_packets
_message
(
685 stream
, beg_clock_snapshot
=20, end_clock_snapshot
=10
690 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
691 self
.assertEqual(res
, 123)
694 if __name__
== "__main__":