1 # SPDX-License-Identifier: GPL-2.0-only
3 # Copyright (C) 2019 EfficiOS Inc.
9 from utils
import TestOutputPortMessageIterator
10 from bt2
import clock_snapshot
as bt2_clock_snapshot
11 from bt2
import event
as bt2_event
12 from bt2
import event_class
as bt2_event_class
13 from bt2
import field
as bt2_field
14 from bt2
import packet
as bt2_packet
15 from bt2
import stream
as bt2_stream
16 from bt2
import stream_class
as bt2_stream_class
17 from bt2
import trace
as bt2_trace
18 from bt2
import trace_class
as bt2_trace_class
21 class AllMessagesTestCase(unittest
.TestCase
):
23 class MyIter(bt2
._UserMessageIterator
):
24 def __init__(self
, config
, self_port_output
):
26 self
._with
_stream
_msgs
_clock
_snapshots
= self_port_output
.user_data
.get(
27 'with_stream_msgs_clock_snapshots', False
31 if test_obj
._clock
_class
:
33 if self
._with
_stream
_msgs
_clock
_snapshots
:
34 msg
= self
._create
_stream
_beginning
_message
(
35 test_obj
._stream
, default_clock_snapshot
=self
._at
38 msg
= self
._create
_stream
_beginning
_message
(
41 test_obj
.assertIs(type(msg
), bt2
._StreamBeginningMessage
)
43 msg
= self
._create
_packet
_beginning
_message
(
44 test_obj
._packet
, self
._at
46 test_obj
.assertIs(type(msg
), bt2
._PacketBeginningMessage
)
48 msg
= self
._create
_event
_message
(
49 test_obj
._event
_class
, test_obj
._packet
, self
._at
51 test_obj
.assertIs(type(msg
), bt2
._EventMessage
)
53 msg
= self
._create
_message
_iterator
_inactivity
_message
(
54 test_obj
._clock
_class
, self
._at
57 msg
= self
._create
_discarded
_events
_message
(
58 test_obj
._stream
, 890, self
._at
, self
._at
60 test_obj
.assertIs(type(msg
), bt2
._DiscardedEventsMessage
)
62 msg
= self
._create
_packet
_end
_message
(
63 test_obj
._packet
, self
._at
65 test_obj
.assertIs(type(msg
), bt2
._PacketEndMessage
)
67 msg
= self
._create
_discarded
_packets
_message
(
68 test_obj
._stream
, 678, self
._at
, self
._at
70 test_obj
.assertIs(type(msg
), bt2
._DiscardedPacketsMessage
)
72 if self
._with
_stream
_msgs
_clock
_snapshots
:
73 msg
= self
._create
_stream
_end
_message
(
74 test_obj
._stream
, default_clock_snapshot
=self
._at
77 msg
= self
._create
_stream
_end
_message
(test_obj
._stream
)
78 test_obj
.assertIs(type(msg
), bt2
._StreamEndMessage
)
83 msg
= self
._create
_stream
_beginning
_message
(test_obj
._stream
)
85 msg
= self
._create
_packet
_beginning
_message
(test_obj
._packet
)
87 msg
= self
._create
_event
_message
(
88 test_obj
._event
_class
, test_obj
._packet
91 msg
= self
._create
_discarded
_events
_message
(
95 msg
= self
._create
_packet
_end
_message
(test_obj
._packet
)
97 msg
= self
._create
_discarded
_packets
_message
(
101 msg
= self
._create
_stream
_end
_message
(test_obj
._stream
)
108 class MySrc(bt2
._UserSourceComponent
, message_iterator_class
=MyIter
):
109 def __init__(self
, config
, params
, obj
):
110 self
._add
_output
_port
('out', params
)
112 with_cc
= bool(params
['with_cc'])
113 tc
= self
._create
_trace
_class
()
115 cc
= self
._create
_clock
_class
()
119 sc
= tc
.create_stream_class(
120 default_clock_class
=cc
,
121 supports_packets
=True,
122 packets_have_beginning_default_clock_snapshot
=with_cc
,
123 packets_have_end_default_clock_snapshot
=with_cc
,
124 supports_discarded_events
=True,
125 discarded_events_have_default_clock_snapshots
=with_cc
,
126 supports_discarded_packets
=True,
127 discarded_packets_have_default_clock_snapshots
=with_cc
,
130 # Create payload field class
131 my_int_fc
= tc
.create_signed_integer_field_class(32)
132 payload_fc
= tc
.create_structure_field_class()
133 payload_fc
+= [('my_int', my_int_fc
)]
135 # Create specific context field class
136 my_int_fc
= tc
.create_signed_integer_field_class(32)
137 specific_fc
= tc
.create_structure_field_class()
138 specific_fc
+= [('my_int', my_int_fc
)]
140 ec
= sc
.create_event_class(
142 payload_field_class
=payload_fc
,
143 specific_context_field_class
=specific_fc
,
147 stream
= trace
.create_stream(sc
)
148 packet
= stream
.create_packet()
150 test_obj
._trace
= trace
151 test_obj
._stream
= stream
152 test_obj
._packet
= packet
153 test_obj
._event
_class
= ec
154 test_obj
._clock
_class
= cc
157 self
._graph
= bt2
.Graph()
161 def test_all_msg_with_cc(self
):
162 params
= {'with_cc': True}
163 self
._src
_comp
= self
._graph
.add_component(self
._src
, 'my_source', params
)
164 self
._msg
_iter
= TestOutputPortMessageIterator(
165 self
._graph
, self
._src
_comp
.output_ports
['out']
168 for i
, msg
in enumerate(self
._msg
_iter
):
170 self
.assertIs(type(msg
), bt2
._StreamBeginningMessageConst
)
171 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
172 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
173 self
.assertIsInstance(
174 msg
.default_clock_snapshot
, bt2
._UnknownClockSnapshot
177 self
.assertIs(type(msg
), bt2
._PacketBeginningMessageConst
)
178 self
.assertIs(type(msg
.packet
), bt2_packet
._PacketConst
)
180 type(msg
.default_clock_snapshot
),
181 bt2_clock_snapshot
._ClockSnapshotConst
,
183 self
.assertEqual(msg
.packet
.addr
, self
._packet
.addr
)
184 self
.assertEqual(msg
.default_clock_snapshot
.value
, i
)
186 self
.assertIs(type(msg
), bt2
._EventMessageConst
)
187 self
.assertIs(type(msg
.event
), bt2_event
._EventConst
)
189 type(msg
.default_clock_snapshot
),
190 bt2_clock_snapshot
._ClockSnapshotConst
,
193 type(msg
.event
.payload_field
), bt2_field
._StructureFieldConst
196 type(msg
.event
.payload_field
['my_int']),
197 bt2_field
._SignedIntegerFieldConst
,
200 self
.assertEqual(msg
.event
.cls
.addr
, self
._event
_class
.addr
)
201 self
.assertEqual(msg
.default_clock_snapshot
.value
, i
)
203 self
.assertIs(type(msg
), bt2
._MessageIteratorInactivityMessageConst
)
205 type(msg
.clock_snapshot
), bt2_clock_snapshot
._ClockSnapshotConst
207 self
.assertEqual(msg
.clock_snapshot
.value
, i
)
209 self
.assertIs(type(msg
), bt2
._DiscardedEventsMessageConst
)
210 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
211 self
.assertIs(type(msg
.stream
.cls
), bt2_stream_class
._StreamClassConst
)
213 type(msg
.beginning_default_clock_snapshot
),
214 bt2_clock_snapshot
._ClockSnapshotConst
,
217 type(msg
.end_default_clock_snapshot
),
218 bt2_clock_snapshot
._ClockSnapshotConst
,
221 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
222 self
.assertEqual(msg
.count
, 890)
224 msg
.stream
.cls
.default_clock_class
.addr
, self
._clock
_class
.addr
226 self
.assertEqual(msg
.beginning_default_clock_snapshot
.value
, i
)
227 self
.assertEqual(msg
.end_default_clock_snapshot
.value
, i
)
229 self
.assertIs(type(msg
), bt2
._PacketEndMessageConst
)
230 self
.assertIs(type(msg
.packet
), bt2_packet
._PacketConst
)
232 type(msg
.default_clock_snapshot
),
233 bt2_clock_snapshot
._ClockSnapshotConst
,
235 self
.assertEqual(msg
.packet
.addr
, self
._packet
.addr
)
236 self
.assertEqual(msg
.default_clock_snapshot
.value
, i
)
238 self
.assertIs(type(msg
), bt2
._DiscardedPacketsMessageConst
)
239 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
240 self
.assertIs(type(msg
.stream
.trace
), bt2_trace
._TraceConst
)
242 type(msg
.stream
.trace
.cls
), bt2_trace_class
._TraceClassConst
245 type(msg
.beginning_default_clock_snapshot
),
246 bt2_clock_snapshot
._ClockSnapshotConst
,
249 type(msg
.end_default_clock_snapshot
),
250 bt2_clock_snapshot
._ClockSnapshotConst
,
252 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
253 self
.assertEqual(msg
.count
, 678)
255 msg
.stream
.cls
.default_clock_class
.addr
, self
._clock
_class
.addr
257 self
.assertEqual(msg
.beginning_default_clock_snapshot
.value
, i
)
258 self
.assertEqual(msg
.end_default_clock_snapshot
.value
, i
)
260 self
.assertIs(type(msg
), bt2
._StreamEndMessageConst
)
261 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
262 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
264 type(msg
.default_clock_snapshot
), bt2
._UnknownClockSnapshot
269 def test_all_msg_without_cc(self
):
270 params
= {'with_cc': False}
271 self
._src
_comp
= self
._graph
.add_component(self
._src
, 'my_source', params
)
272 self
._msg
_iter
= TestOutputPortMessageIterator(
273 self
._graph
, self
._src
_comp
.output_ports
['out']
276 for i
, msg
in enumerate(self
._msg
_iter
):
278 self
.assertIsInstance(msg
, bt2
._StreamBeginningMessageConst
)
279 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
280 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
281 with self
.assertRaisesRegex(
282 ValueError, 'stream class has no default clock class'
284 msg
.default_clock_snapshot
286 self
.assertIsInstance(msg
, bt2
._PacketBeginningMessageConst
)
287 self
.assertIs(type(msg
.packet
), bt2_packet
._PacketConst
)
288 self
.assertEqual(msg
.packet
.addr
, self
._packet
.addr
)
290 self
.assertIsInstance(msg
, bt2
._EventMessageConst
)
291 self
.assertIs(type(msg
.event
), bt2_event
._EventConst
)
292 self
.assertIs(type(msg
.event
.cls
), bt2_event_class
._EventClassConst
)
293 self
.assertEqual(msg
.event
.cls
.addr
, self
._event
_class
.addr
)
294 with self
.assertRaisesRegex(
295 ValueError, 'stream class has no default clock class'
297 msg
.default_clock_snapshot
299 self
.assertIsInstance(msg
, bt2
._DiscardedEventsMessageConst
)
300 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
301 self
.assertIs(type(msg
.stream
.cls
), bt2_stream_class
._StreamClassConst
)
302 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
303 self
.assertEqual(msg
.count
, 890)
304 self
.assertIsNone(msg
.stream
.cls
.default_clock_class
)
305 with self
.assertRaisesRegex(
307 'such a message has no clock snapshots for this stream class',
309 msg
.beginning_default_clock_snapshot
310 with self
.assertRaisesRegex(
312 'such a message has no clock snapshots for this stream class',
314 msg
.end_default_clock_snapshot
316 self
.assertIsInstance(msg
, bt2
._PacketEndMessageConst
)
317 self
.assertEqual(msg
.packet
.addr
, self
._packet
.addr
)
318 self
.assertIs(type(msg
.packet
), bt2_packet
._PacketConst
)
320 self
.assertIsInstance(msg
, bt2
._DiscardedPacketsMessageConst
)
321 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
322 self
.assertIs(type(msg
.stream
.cls
), bt2_stream_class
._StreamClassConst
)
324 type(msg
.stream
.cls
.trace_class
), bt2_trace_class
._TraceClassConst
326 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
327 self
.assertEqual(msg
.count
, 678)
328 self
.assertIsNone(msg
.stream
.cls
.default_clock_class
)
329 with self
.assertRaisesRegex(
331 'such a message has no clock snapshots for this stream class',
333 msg
.beginning_default_clock_snapshot
334 with self
.assertRaisesRegex(
336 'such a message has no clock snapshots for this stream class',
338 msg
.end_default_clock_snapshot
340 self
.assertIsInstance(msg
, bt2
._StreamEndMessageConst
)
341 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
342 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
343 with self
.assertRaisesRegex(
344 ValueError, 'stream class has no default clock class'
346 msg
.default_clock_snapshot
350 def test_msg_stream_with_clock_snapshots(self
):
351 params
= {'with_cc': True, 'with_stream_msgs_clock_snapshots': True}
353 self
._src
_comp
= self
._graph
.add_component(self
._src
, 'my_source', params
)
354 self
._msg
_iter
= TestOutputPortMessageIterator(
355 self
._graph
, self
._src
_comp
.output_ports
['out']
357 msgs
= list(self
._msg
_iter
)
359 msg_stream_beg
= msgs
[0]
360 self
.assertIsInstance(msg_stream_beg
, bt2
._StreamBeginningMessageConst
)
362 type(msg_stream_beg
.default_clock_snapshot
),
363 bt2_clock_snapshot
._ClockSnapshotConst
,
365 self
.assertEqual(msg_stream_beg
.default_clock_snapshot
.value
, 0)
367 msg_stream_end
= msgs
[7]
368 self
.assertIsInstance(msg_stream_end
, bt2
._StreamEndMessageConst
)
370 type(msg_stream_end
.default_clock_snapshot
),
371 bt2_clock_snapshot
._ClockSnapshotConst
,
373 self
.assertEqual(msg_stream_end
.default_clock_snapshot
.value
, 7)
375 def test_stream_beg_msg(self
):
376 msg
= utils
.get_stream_beginning_message()
377 self
.assertIs(type(msg
.stream
), bt2_stream
._Stream
)
379 def test_stream_end_msg(self
):
380 msg
= utils
.get_stream_end_message()
381 self
.assertIs(type(msg
.stream
), bt2_stream
._Stream
)
383 def test_packet_beg_msg(self
):
384 msg
= utils
.get_packet_beginning_message()
385 self
.assertIs(type(msg
.packet
), bt2_packet
._Packet
)
387 def test_packet_end_msg(self
):
388 msg
= utils
.get_packet_end_message()
389 self
.assertIs(type(msg
.packet
), bt2_packet
._Packet
)
391 def test_event_msg(self
):
392 msg
= utils
.get_event_message()
393 self
.assertIs(type(msg
.event
), bt2_event
._Event
)
396 class CreateDiscardedEventMessageTestCase(unittest
.TestCase
):
398 def test_create(self
):
399 def create_stream_class(tc
, cc
):
400 return tc
.create_stream_class(supports_discarded_events
=True)
402 def msg_iter_next(msg_iter
, stream
):
403 return msg_iter
._create
_discarded
_events
_message
(stream
)
405 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
406 self
.assertIs(type(msg
), bt2
._DiscardedEventsMessage
)
407 self
.assertIs(msg
.count
, None)
410 def test_create_with_count(self
):
411 def create_stream_class(tc
, cc
):
412 return tc
.create_stream_class(supports_discarded_events
=True)
414 def msg_iter_next(msg_iter
, stream
):
415 return msg_iter
._create
_discarded
_events
_message
(stream
, count
=242)
417 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
418 self
.assertIs(type(msg
), bt2
._DiscardedEventsMessage
)
419 self
.assertEqual(msg
.count
, 242)
421 # With event count == 0.
422 def test_create_with_count_zero_raises(self
):
423 def create_stream_class(tc
, cc
):
424 return tc
.create_stream_class(supports_discarded_events
=True)
426 def msg_iter_next(msg_iter
, stream
):
427 with self
.assertRaisesRegex(
428 ValueError, 'discarded event count is 0',
430 msg_iter
._create
_discarded
_events
_message
(stream
, count
=0)
434 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
435 self
.assertEqual(res
, 123)
437 # With clock snapshots.
438 def test_create_with_clock_snapshots(self
):
439 def create_stream_class(tc
, cc
):
440 return tc
.create_stream_class(
441 default_clock_class
=cc
,
442 supports_discarded_events
=True,
443 discarded_events_have_default_clock_snapshots
=True,
446 def msg_iter_next(msg_iter
, stream
):
447 return msg_iter
._create
_discarded
_events
_message
(
448 stream
, beg_clock_snapshot
=10, end_clock_snapshot
=20
451 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
452 self
.assertIs(type(msg
), bt2
._DiscardedEventsMessage
)
453 self
.assertEqual(msg
.beginning_default_clock_snapshot
, 10)
454 self
.assertEqual(msg
.end_default_clock_snapshot
, 20)
456 # Trying to create when the stream does not support discarded events.
457 def test_create_unsupported_raises(self
):
458 def create_stream_class(tc
, cc
):
459 return tc
.create_stream_class()
461 def msg_iter_next(msg_iter
, stream
):
462 with self
.assertRaisesRegex(
463 ValueError, 'stream class does not support discarded events'
465 msg_iter
._create
_discarded
_events
_message
(stream
)
469 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
470 self
.assertEqual(res
, 123)
472 # Trying to create with clock snapshots when the stream does not support
474 def test_create_unsupported_clock_snapshots_raises(self
):
475 def create_stream_class(tc
, cc
):
476 return tc
.create_stream_class(supports_discarded_events
=True)
478 def msg_iter_next(msg_iter
, stream
):
479 with self
.assertRaisesRegex(
481 'discarded events have no default clock snapshots for this stream class',
483 msg_iter
._create
_discarded
_events
_message
(
484 stream
, beg_clock_snapshot
=10, end_clock_snapshot
=20
489 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
490 self
.assertEqual(res
, 123)
492 # Trying to create without clock snapshots when the stream requires them.
493 def test_create_missing_clock_snapshots_raises(self
):
494 def create_stream_class(tc
, cc
):
495 return tc
.create_stream_class(
496 default_clock_class
=cc
,
497 supports_discarded_events
=True,
498 discarded_events_have_default_clock_snapshots
=True,
501 def msg_iter_next(msg_iter
, stream
):
502 with self
.assertRaisesRegex(
504 'discarded events have default clock snapshots for this stream class',
506 msg_iter
._create
_discarded
_events
_message
(stream
)
510 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
511 self
.assertEqual(res
, 123)
513 # End clock snapshot greater than beginning clock snapshot.
514 def test_create_clock_snapshots_end_gt_begin_raises(self
):
515 def create_stream_class(tc
, cc
):
516 return tc
.create_stream_class(
517 default_clock_class
=cc
,
518 supports_discarded_events
=True,
519 discarded_events_have_default_clock_snapshots
=True,
522 def msg_iter_next(msg_iter
, stream
):
523 with self
.assertRaisesRegex(
525 r
'beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)',
527 msg_iter
._create
_discarded
_events
_message
(
528 stream
, beg_clock_snapshot
=20, end_clock_snapshot
=10
533 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
534 self
.assertEqual(res
, 123)
537 class CreateDiscardedPacketMessageTestCase(unittest
.TestCase
):
539 def test_create(self
):
540 def create_stream_class(tc
, cc
):
541 return tc
.create_stream_class(
542 supports_packets
=True, supports_discarded_packets
=True
545 def msg_iter_next(msg_iter
, stream
):
546 return msg_iter
._create
_discarded
_packets
_message
(stream
)
548 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
549 self
.assertIs(type(msg
), bt2
._DiscardedPacketsMessage
)
550 self
.assertIs(msg
.count
, None)
553 def test_create_with_count(self
):
554 def create_stream_class(tc
, cc
):
555 return tc
.create_stream_class(
556 supports_packets
=True, supports_discarded_packets
=True
559 def msg_iter_next(msg_iter
, stream
):
560 return msg_iter
._create
_discarded
_packets
_message
(stream
, count
=242)
562 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
563 self
.assertIs(type(msg
), bt2
._DiscardedPacketsMessage
)
564 self
.assertEqual(msg
.count
, 242)
566 # With packet count == 0.
567 def test_create_with_count_zero_raises(self
):
568 def create_stream_class(tc
, cc
):
569 return tc
.create_stream_class(
570 supports_packets
=True, supports_discarded_packets
=True
573 def msg_iter_next(msg_iter
, stream
):
574 with self
.assertRaisesRegex(
575 ValueError, 'discarded packet count is 0',
577 msg_iter
._create
_discarded
_packets
_message
(stream
, count
=0)
581 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
582 self
.assertEqual(res
, 123)
584 # With clock snapshots.
585 def test_create_with_clock_snapshots(self
):
586 def create_stream_class(tc
, cc
):
587 return tc
.create_stream_class(
588 default_clock_class
=cc
,
589 supports_packets
=True,
590 supports_discarded_packets
=True,
591 discarded_packets_have_default_clock_snapshots
=True,
594 def msg_iter_next(msg_iter
, stream
):
595 return msg_iter
._create
_discarded
_packets
_message
(
596 stream
, beg_clock_snapshot
=10, end_clock_snapshot
=20
599 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
600 self
.assertIs(type(msg
), bt2
._DiscardedPacketsMessage
)
601 self
.assertEqual(msg
.beginning_default_clock_snapshot
, 10)
602 self
.assertEqual(msg
.end_default_clock_snapshot
, 20)
604 # Trying to create when the stream does not support discarded packets.
605 def test_create_unsupported_raises(self
):
606 def create_stream_class(tc
, cc
):
607 return tc
.create_stream_class(supports_packets
=True,)
609 def msg_iter_next(msg_iter
, stream
):
610 with self
.assertRaisesRegex(
611 ValueError, 'stream class does not support discarded packets'
613 msg_iter
._create
_discarded
_packets
_message
(stream
)
617 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
618 self
.assertEqual(res
, 123)
620 # Trying to create with clock snapshots when the stream does not support
622 def test_create_unsupported_clock_snapshots_raises(self
):
623 def create_stream_class(tc
, cc
):
624 return tc
.create_stream_class(
625 supports_packets
=True, supports_discarded_packets
=True
628 def msg_iter_next(msg_iter
, stream
):
629 with self
.assertRaisesRegex(
631 'discarded packets have no default clock snapshots for this stream class',
633 msg_iter
._create
_discarded
_packets
_message
(
634 stream
, beg_clock_snapshot
=10, end_clock_snapshot
=20
639 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
640 self
.assertEqual(res
, 123)
642 # Trying to create without clock snapshots when the stream requires them.
643 def test_create_missing_clock_snapshots_raises(self
):
644 def create_stream_class(tc
, cc
):
645 return tc
.create_stream_class(
646 default_clock_class
=cc
,
647 supports_packets
=True,
648 supports_discarded_packets
=True,
649 discarded_packets_have_default_clock_snapshots
=True,
652 def msg_iter_next(msg_iter
, stream
):
653 with self
.assertRaisesRegex(
655 'discarded packets have default clock snapshots for this stream class',
657 msg_iter
._create
_discarded
_packets
_message
(stream
)
661 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
662 self
.assertEqual(res
, 123)
664 # End clock snapshot greater than beginning clock snapshot.
665 def test_create_clock_snapshots_end_gt_begin_raises(self
):
666 def create_stream_class(tc
, cc
):
667 return tc
.create_stream_class(
668 default_clock_class
=cc
,
669 supports_packets
=True,
670 supports_discarded_packets
=True,
671 discarded_packets_have_default_clock_snapshots
=True,
674 def msg_iter_next(msg_iter
, stream
):
675 with self
.assertRaisesRegex(
677 r
'beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)',
679 msg_iter
._create
_discarded
_packets
_message
(
680 stream
, beg_clock_snapshot
=20, end_clock_snapshot
=10
685 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
686 self
.assertEqual(res
, 123)
689 if __name__
== '__main__':