Reformat Python files with Black v20.8b1
[babeltrace.git] / tests / bindings / python / bt2 / test_message.py
... / ...
CommitLineData
1# SPDX-License-Identifier: GPL-2.0-only
2#
3# Copyright (C) 2019 EfficiOS Inc.
4#
5
6import unittest
7import bt2
8import utils
9from utils import TestOutputPortMessageIterator
10from bt2 import clock_snapshot as bt2_clock_snapshot
11from bt2 import event as bt2_event
12from bt2 import event_class as bt2_event_class
13from bt2 import field as bt2_field
14from bt2 import packet as bt2_packet
15from bt2 import stream as bt2_stream
16from bt2 import stream_class as bt2_stream_class
17from bt2 import trace as bt2_trace
18from bt2 import trace_class as bt2_trace_class
19
20
21class AllMessagesTestCase(unittest.TestCase):
22 def setUp(self):
23 class MyIter(bt2._UserMessageIterator):
24 def __init__(self, config, self_port_output):
25 self._at = 0
26 self._with_stream_msgs_clock_snapshots = self_port_output.user_data.get(
27 'with_stream_msgs_clock_snapshots', False
28 )
29
30 def __next__(self):
31 if test_obj._clock_class:
32 if self._at == 0:
33 if self._with_stream_msgs_clock_snapshots:
34 msg = self._create_stream_beginning_message(
35 test_obj._stream, default_clock_snapshot=self._at
36 )
37 else:
38 msg = self._create_stream_beginning_message(
39 test_obj._stream
40 )
41 test_obj.assertIs(type(msg), bt2._StreamBeginningMessage)
42 elif self._at == 1:
43 msg = self._create_packet_beginning_message(
44 test_obj._packet, self._at
45 )
46 test_obj.assertIs(type(msg), bt2._PacketBeginningMessage)
47 elif self._at == 2:
48 msg = self._create_event_message(
49 test_obj._event_class, test_obj._packet, self._at
50 )
51 test_obj.assertIs(type(msg), bt2._EventMessage)
52 elif self._at == 3:
53 msg = self._create_message_iterator_inactivity_message(
54 test_obj._clock_class, self._at
55 )
56 elif self._at == 4:
57 msg = self._create_discarded_events_message(
58 test_obj._stream, 890, self._at, self._at
59 )
60 test_obj.assertIs(type(msg), bt2._DiscardedEventsMessage)
61 elif self._at == 5:
62 msg = self._create_packet_end_message(
63 test_obj._packet, self._at
64 )
65 test_obj.assertIs(type(msg), bt2._PacketEndMessage)
66 elif self._at == 6:
67 msg = self._create_discarded_packets_message(
68 test_obj._stream, 678, self._at, self._at
69 )
70 test_obj.assertIs(type(msg), bt2._DiscardedPacketsMessage)
71 elif self._at == 7:
72 if self._with_stream_msgs_clock_snapshots:
73 msg = self._create_stream_end_message(
74 test_obj._stream, default_clock_snapshot=self._at
75 )
76 else:
77 msg = self._create_stream_end_message(test_obj._stream)
78 test_obj.assertIs(type(msg), bt2._StreamEndMessage)
79 elif self._at >= 8:
80 raise bt2.Stop
81 else:
82 if self._at == 0:
83 msg = self._create_stream_beginning_message(test_obj._stream)
84 elif self._at == 1:
85 msg = self._create_packet_beginning_message(test_obj._packet)
86 elif self._at == 2:
87 msg = self._create_event_message(
88 test_obj._event_class, test_obj._packet
89 )
90 elif self._at == 3:
91 msg = self._create_discarded_events_message(
92 test_obj._stream, 890
93 )
94 elif self._at == 4:
95 msg = self._create_packet_end_message(test_obj._packet)
96 elif self._at == 5:
97 msg = self._create_discarded_packets_message(
98 test_obj._stream, 678
99 )
100 elif self._at == 6:
101 msg = self._create_stream_end_message(test_obj._stream)
102 elif self._at >= 7:
103 raise bt2.Stop
104
105 self._at += 1
106 return msg
107
108 class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
109 def __init__(self, config, params, obj):
110 self._add_output_port('out', params)
111
112 with_cc = bool(params['with_cc'])
113 tc = self._create_trace_class()
114 if with_cc:
115 cc = self._create_clock_class()
116 else:
117 cc = None
118
119 sc = tc.create_stream_class(
120 default_clock_class=cc,
121 supports_packets=True,
122 packets_have_beginning_default_clock_snapshot=with_cc,
123 packets_have_end_default_clock_snapshot=with_cc,
124 supports_discarded_events=True,
125 discarded_events_have_default_clock_snapshots=with_cc,
126 supports_discarded_packets=True,
127 discarded_packets_have_default_clock_snapshots=with_cc,
128 )
129
130 # Create payload field class
131 my_int_fc = tc.create_signed_integer_field_class(32)
132 payload_fc = tc.create_structure_field_class()
133 payload_fc += [('my_int', my_int_fc)]
134
135 # Create specific context field class
136 my_int_fc = tc.create_signed_integer_field_class(32)
137 specific_fc = tc.create_structure_field_class()
138 specific_fc += [('my_int', my_int_fc)]
139
140 ec = sc.create_event_class(
141 name='salut',
142 payload_field_class=payload_fc,
143 specific_context_field_class=specific_fc,
144 )
145
146 trace = tc()
147 stream = trace.create_stream(sc)
148 packet = stream.create_packet()
149
150 test_obj._trace = trace
151 test_obj._stream = stream
152 test_obj._packet = packet
153 test_obj._event_class = ec
154 test_obj._clock_class = cc
155
156 test_obj = self
157 self._graph = bt2.Graph()
158 self._src = MySrc
159 self._iter = MyIter
160
161 def test_all_msg_with_cc(self):
162 params = {'with_cc': True}
163 self._src_comp = self._graph.add_component(self._src, 'my_source', params)
164 self._msg_iter = TestOutputPortMessageIterator(
165 self._graph, self._src_comp.output_ports['out']
166 )
167
168 for i, msg in enumerate(self._msg_iter):
169 if i == 0:
170 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
171 self.assertIs(type(msg.stream), bt2_stream._StreamConst)
172 self.assertEqual(msg.stream.addr, self._stream.addr)
173 self.assertIsInstance(
174 msg.default_clock_snapshot, bt2._UnknownClockSnapshot
175 )
176 elif i == 1:
177 self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
178 self.assertIs(type(msg.packet), bt2_packet._PacketConst)
179 self.assertIs(
180 type(msg.default_clock_snapshot),
181 bt2_clock_snapshot._ClockSnapshotConst,
182 )
183 self.assertEqual(msg.packet.addr, self._packet.addr)
184 self.assertEqual(msg.default_clock_snapshot.value, i)
185 elif i == 2:
186 self.assertIs(type(msg), bt2._EventMessageConst)
187 self.assertIs(type(msg.event), bt2_event._EventConst)
188 self.assertIs(
189 type(msg.default_clock_snapshot),
190 bt2_clock_snapshot._ClockSnapshotConst,
191 )
192 self.assertIs(
193 type(msg.event.payload_field), bt2_field._StructureFieldConst
194 )
195 self.assertIs(
196 type(msg.event.payload_field['my_int']),
197 bt2_field._SignedIntegerFieldConst,
198 )
199
200 self.assertEqual(msg.event.cls.addr, self._event_class.addr)
201 self.assertEqual(msg.default_clock_snapshot.value, i)
202 elif i == 3:
203 self.assertIs(type(msg), bt2._MessageIteratorInactivityMessageConst)
204 self.assertIs(
205 type(msg.clock_snapshot), bt2_clock_snapshot._ClockSnapshotConst
206 )
207 self.assertEqual(msg.clock_snapshot.value, i)
208 elif i == 4:
209 self.assertIs(type(msg), bt2._DiscardedEventsMessageConst)
210 self.assertIs(type(msg.stream), bt2_stream._StreamConst)
211 self.assertIs(type(msg.stream.cls), bt2_stream_class._StreamClassConst)
212 self.assertIs(
213 type(msg.beginning_default_clock_snapshot),
214 bt2_clock_snapshot._ClockSnapshotConst,
215 )
216 self.assertIs(
217 type(msg.end_default_clock_snapshot),
218 bt2_clock_snapshot._ClockSnapshotConst,
219 )
220
221 self.assertEqual(msg.stream.addr, self._stream.addr)
222 self.assertEqual(msg.count, 890)
223 self.assertEqual(
224 msg.stream.cls.default_clock_class.addr, self._clock_class.addr
225 )
226 self.assertEqual(msg.beginning_default_clock_snapshot.value, i)
227 self.assertEqual(msg.end_default_clock_snapshot.value, i)
228 elif i == 5:
229 self.assertIs(type(msg), bt2._PacketEndMessageConst)
230 self.assertIs(type(msg.packet), bt2_packet._PacketConst)
231 self.assertIs(
232 type(msg.default_clock_snapshot),
233 bt2_clock_snapshot._ClockSnapshotConst,
234 )
235 self.assertEqual(msg.packet.addr, self._packet.addr)
236 self.assertEqual(msg.default_clock_snapshot.value, i)
237 elif i == 6:
238 self.assertIs(type(msg), bt2._DiscardedPacketsMessageConst)
239 self.assertIs(type(msg.stream), bt2_stream._StreamConst)
240 self.assertIs(type(msg.stream.trace), bt2_trace._TraceConst)
241 self.assertIs(
242 type(msg.stream.trace.cls), bt2_trace_class._TraceClassConst
243 )
244 self.assertIs(
245 type(msg.beginning_default_clock_snapshot),
246 bt2_clock_snapshot._ClockSnapshotConst,
247 )
248 self.assertIs(
249 type(msg.end_default_clock_snapshot),
250 bt2_clock_snapshot._ClockSnapshotConst,
251 )
252 self.assertEqual(msg.stream.addr, self._stream.addr)
253 self.assertEqual(msg.count, 678)
254 self.assertEqual(
255 msg.stream.cls.default_clock_class.addr, self._clock_class.addr
256 )
257 self.assertEqual(msg.beginning_default_clock_snapshot.value, i)
258 self.assertEqual(msg.end_default_clock_snapshot.value, i)
259 elif i == 7:
260 self.assertIs(type(msg), bt2._StreamEndMessageConst)
261 self.assertIs(type(msg.stream), bt2_stream._StreamConst)
262 self.assertEqual(msg.stream.addr, self._stream.addr)
263 self.assertIs(
264 type(msg.default_clock_snapshot), bt2._UnknownClockSnapshot
265 )
266 else:
267 raise Exception
268
269 def test_all_msg_without_cc(self):
270 params = {'with_cc': False}
271 self._src_comp = self._graph.add_component(self._src, 'my_source', params)
272 self._msg_iter = TestOutputPortMessageIterator(
273 self._graph, self._src_comp.output_ports['out']
274 )
275
276 for i, msg in enumerate(self._msg_iter):
277 if i == 0:
278 self.assertIsInstance(msg, bt2._StreamBeginningMessageConst)
279 self.assertIs(type(msg.stream), bt2_stream._StreamConst)
280 self.assertEqual(msg.stream.addr, self._stream.addr)
281 with self.assertRaisesRegex(
282 ValueError, 'stream class has no default clock class'
283 ):
284 msg.default_clock_snapshot
285 elif i == 1:
286 self.assertIsInstance(msg, bt2._PacketBeginningMessageConst)
287 self.assertIs(type(msg.packet), bt2_packet._PacketConst)
288 self.assertEqual(msg.packet.addr, self._packet.addr)
289 elif i == 2:
290 self.assertIsInstance(msg, bt2._EventMessageConst)
291 self.assertIs(type(msg.event), bt2_event._EventConst)
292 self.assertIs(type(msg.event.cls), bt2_event_class._EventClassConst)
293 self.assertEqual(msg.event.cls.addr, self._event_class.addr)
294 with self.assertRaisesRegex(
295 ValueError, 'stream class has no default clock class'
296 ):
297 msg.default_clock_snapshot
298 elif i == 3:
299 self.assertIsInstance(msg, bt2._DiscardedEventsMessageConst)
300 self.assertIs(type(msg.stream), bt2_stream._StreamConst)
301 self.assertIs(type(msg.stream.cls), bt2_stream_class._StreamClassConst)
302 self.assertEqual(msg.stream.addr, self._stream.addr)
303 self.assertEqual(msg.count, 890)
304 self.assertIsNone(msg.stream.cls.default_clock_class)
305 with self.assertRaisesRegex(
306 ValueError,
307 'such a message has no clock snapshots for this stream class',
308 ):
309 msg.beginning_default_clock_snapshot
310 with self.assertRaisesRegex(
311 ValueError,
312 'such a message has no clock snapshots for this stream class',
313 ):
314 msg.end_default_clock_snapshot
315 elif i == 4:
316 self.assertIsInstance(msg, bt2._PacketEndMessageConst)
317 self.assertEqual(msg.packet.addr, self._packet.addr)
318 self.assertIs(type(msg.packet), bt2_packet._PacketConst)
319 elif i == 5:
320 self.assertIsInstance(msg, bt2._DiscardedPacketsMessageConst)
321 self.assertIs(type(msg.stream), bt2_stream._StreamConst)
322 self.assertIs(type(msg.stream.cls), bt2_stream_class._StreamClassConst)
323 self.assertIs(
324 type(msg.stream.cls.trace_class), bt2_trace_class._TraceClassConst
325 )
326 self.assertEqual(msg.stream.addr, self._stream.addr)
327 self.assertEqual(msg.count, 678)
328 self.assertIsNone(msg.stream.cls.default_clock_class)
329 with self.assertRaisesRegex(
330 ValueError,
331 'such a message has no clock snapshots for this stream class',
332 ):
333 msg.beginning_default_clock_snapshot
334 with self.assertRaisesRegex(
335 ValueError,
336 'such a message has no clock snapshots for this stream class',
337 ):
338 msg.end_default_clock_snapshot
339 elif i == 6:
340 self.assertIsInstance(msg, bt2._StreamEndMessageConst)
341 self.assertIs(type(msg.stream), bt2_stream._StreamConst)
342 self.assertEqual(msg.stream.addr, self._stream.addr)
343 with self.assertRaisesRegex(
344 ValueError, 'stream class has no default clock class'
345 ):
346 msg.default_clock_snapshot
347 else:
348 raise Exception
349
350 def test_msg_stream_with_clock_snapshots(self):
351 params = {'with_cc': True, 'with_stream_msgs_clock_snapshots': True}
352
353 self._src_comp = self._graph.add_component(self._src, 'my_source', params)
354 self._msg_iter = TestOutputPortMessageIterator(
355 self._graph, self._src_comp.output_ports['out']
356 )
357 msgs = list(self._msg_iter)
358
359 msg_stream_beg = msgs[0]
360 self.assertIsInstance(msg_stream_beg, bt2._StreamBeginningMessageConst)
361 self.assertIs(
362 type(msg_stream_beg.default_clock_snapshot),
363 bt2_clock_snapshot._ClockSnapshotConst,
364 )
365 self.assertEqual(msg_stream_beg.default_clock_snapshot.value, 0)
366
367 msg_stream_end = msgs[7]
368 self.assertIsInstance(msg_stream_end, bt2._StreamEndMessageConst)
369 self.assertIs(
370 type(msg_stream_end.default_clock_snapshot),
371 bt2_clock_snapshot._ClockSnapshotConst,
372 )
373 self.assertEqual(msg_stream_end.default_clock_snapshot.value, 7)
374
375 def test_stream_beg_msg(self):
376 msg = utils.get_stream_beginning_message()
377 self.assertIs(type(msg.stream), bt2_stream._Stream)
378
379 def test_stream_end_msg(self):
380 msg = utils.get_stream_end_message()
381 self.assertIs(type(msg.stream), bt2_stream._Stream)
382
383 def test_packet_beg_msg(self):
384 msg = utils.get_packet_beginning_message()
385 self.assertIs(type(msg.packet), bt2_packet._Packet)
386
387 def test_packet_end_msg(self):
388 msg = utils.get_packet_end_message()
389 self.assertIs(type(msg.packet), bt2_packet._Packet)
390
391 def test_event_msg(self):
392 msg = utils.get_event_message()
393 self.assertIs(type(msg.event), bt2_event._Event)
394
395
396class CreateDiscardedEventMessageTestCase(unittest.TestCase):
397 # Most basic case.
398 def test_create(self):
399 def create_stream_class(tc, cc):
400 return tc.create_stream_class(supports_discarded_events=True)
401
402 def msg_iter_next(msg_iter, stream):
403 return msg_iter._create_discarded_events_message(stream)
404
405 msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
406 self.assertIs(type(msg), bt2._DiscardedEventsMessage)
407 self.assertIs(msg.count, None)
408
409 # With event count.
410 def test_create_with_count(self):
411 def create_stream_class(tc, cc):
412 return tc.create_stream_class(supports_discarded_events=True)
413
414 def msg_iter_next(msg_iter, stream):
415 return msg_iter._create_discarded_events_message(stream, count=242)
416
417 msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
418 self.assertIs(type(msg), bt2._DiscardedEventsMessage)
419 self.assertEqual(msg.count, 242)
420
421 # With event count == 0.
422 def test_create_with_count_zero_raises(self):
423 def create_stream_class(tc, cc):
424 return tc.create_stream_class(supports_discarded_events=True)
425
426 def msg_iter_next(msg_iter, stream):
427 with self.assertRaisesRegex(
428 ValueError,
429 'discarded event count is 0',
430 ):
431 msg_iter._create_discarded_events_message(stream, count=0)
432
433 return 123
434
435 res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
436 self.assertEqual(res, 123)
437
438 # With clock snapshots.
439 def test_create_with_clock_snapshots(self):
440 def create_stream_class(tc, cc):
441 return tc.create_stream_class(
442 default_clock_class=cc,
443 supports_discarded_events=True,
444 discarded_events_have_default_clock_snapshots=True,
445 )
446
447 def msg_iter_next(msg_iter, stream):
448 return msg_iter._create_discarded_events_message(
449 stream, beg_clock_snapshot=10, end_clock_snapshot=20
450 )
451
452 msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
453 self.assertIs(type(msg), bt2._DiscardedEventsMessage)
454 self.assertEqual(msg.beginning_default_clock_snapshot, 10)
455 self.assertEqual(msg.end_default_clock_snapshot, 20)
456
457 # Trying to create when the stream does not support discarded events.
458 def test_create_unsupported_raises(self):
459 def create_stream_class(tc, cc):
460 return tc.create_stream_class()
461
462 def msg_iter_next(msg_iter, stream):
463 with self.assertRaisesRegex(
464 ValueError, 'stream class does not support discarded events'
465 ):
466 msg_iter._create_discarded_events_message(stream)
467
468 return 123
469
470 res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
471 self.assertEqual(res, 123)
472
473 # Trying to create with clock snapshots when the stream does not support
474 # them.
475 def test_create_unsupported_clock_snapshots_raises(self):
476 def create_stream_class(tc, cc):
477 return tc.create_stream_class(supports_discarded_events=True)
478
479 def msg_iter_next(msg_iter, stream):
480 with self.assertRaisesRegex(
481 ValueError,
482 'discarded events have no default clock snapshots for this stream class',
483 ):
484 msg_iter._create_discarded_events_message(
485 stream, beg_clock_snapshot=10, end_clock_snapshot=20
486 )
487
488 return 123
489
490 res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
491 self.assertEqual(res, 123)
492
493 # Trying to create without clock snapshots when the stream requires them.
494 def test_create_missing_clock_snapshots_raises(self):
495 def create_stream_class(tc, cc):
496 return tc.create_stream_class(
497 default_clock_class=cc,
498 supports_discarded_events=True,
499 discarded_events_have_default_clock_snapshots=True,
500 )
501
502 def msg_iter_next(msg_iter, stream):
503 with self.assertRaisesRegex(
504 ValueError,
505 'discarded events have default clock snapshots for this stream class',
506 ):
507 msg_iter._create_discarded_events_message(stream)
508
509 return 123
510
511 res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
512 self.assertEqual(res, 123)
513
514 # End clock snapshot greater than beginning clock snapshot.
515 def test_create_clock_snapshots_end_gt_begin_raises(self):
516 def create_stream_class(tc, cc):
517 return tc.create_stream_class(
518 default_clock_class=cc,
519 supports_discarded_events=True,
520 discarded_events_have_default_clock_snapshots=True,
521 )
522
523 def msg_iter_next(msg_iter, stream):
524 with self.assertRaisesRegex(
525 ValueError,
526 r'beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)',
527 ):
528 msg_iter._create_discarded_events_message(
529 stream, beg_clock_snapshot=20, end_clock_snapshot=10
530 )
531
532 return 123
533
534 res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
535 self.assertEqual(res, 123)
536
537
538class CreateDiscardedPacketMessageTestCase(unittest.TestCase):
539 # Most basic case.
540 def test_create(self):
541 def create_stream_class(tc, cc):
542 return tc.create_stream_class(
543 supports_packets=True, supports_discarded_packets=True
544 )
545
546 def msg_iter_next(msg_iter, stream):
547 return msg_iter._create_discarded_packets_message(stream)
548
549 msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
550 self.assertIs(type(msg), bt2._DiscardedPacketsMessage)
551 self.assertIs(msg.count, None)
552
553 # With packet count.
554 def test_create_with_count(self):
555 def create_stream_class(tc, cc):
556 return tc.create_stream_class(
557 supports_packets=True, supports_discarded_packets=True
558 )
559
560 def msg_iter_next(msg_iter, stream):
561 return msg_iter._create_discarded_packets_message(stream, count=242)
562
563 msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
564 self.assertIs(type(msg), bt2._DiscardedPacketsMessage)
565 self.assertEqual(msg.count, 242)
566
567 # With packet count == 0.
568 def test_create_with_count_zero_raises(self):
569 def create_stream_class(tc, cc):
570 return tc.create_stream_class(
571 supports_packets=True, supports_discarded_packets=True
572 )
573
574 def msg_iter_next(msg_iter, stream):
575 with self.assertRaisesRegex(
576 ValueError,
577 'discarded packet count is 0',
578 ):
579 msg_iter._create_discarded_packets_message(stream, count=0)
580
581 return 123
582
583 res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
584 self.assertEqual(res, 123)
585
586 # With clock snapshots.
587 def test_create_with_clock_snapshots(self):
588 def create_stream_class(tc, cc):
589 return tc.create_stream_class(
590 default_clock_class=cc,
591 supports_packets=True,
592 supports_discarded_packets=True,
593 discarded_packets_have_default_clock_snapshots=True,
594 )
595
596 def msg_iter_next(msg_iter, stream):
597 return msg_iter._create_discarded_packets_message(
598 stream, beg_clock_snapshot=10, end_clock_snapshot=20
599 )
600
601 msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
602 self.assertIs(type(msg), bt2._DiscardedPacketsMessage)
603 self.assertEqual(msg.beginning_default_clock_snapshot, 10)
604 self.assertEqual(msg.end_default_clock_snapshot, 20)
605
606 # Trying to create when the stream does not support discarded packets.
607 def test_create_unsupported_raises(self):
608 def create_stream_class(tc, cc):
609 return tc.create_stream_class(
610 supports_packets=True,
611 )
612
613 def msg_iter_next(msg_iter, stream):
614 with self.assertRaisesRegex(
615 ValueError, 'stream class does not support discarded packets'
616 ):
617 msg_iter._create_discarded_packets_message(stream)
618
619 return 123
620
621 res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
622 self.assertEqual(res, 123)
623
624 # Trying to create with clock snapshots when the stream does not support
625 # them.
626 def test_create_unsupported_clock_snapshots_raises(self):
627 def create_stream_class(tc, cc):
628 return tc.create_stream_class(
629 supports_packets=True, supports_discarded_packets=True
630 )
631
632 def msg_iter_next(msg_iter, stream):
633 with self.assertRaisesRegex(
634 ValueError,
635 'discarded packets have no default clock snapshots for this stream class',
636 ):
637 msg_iter._create_discarded_packets_message(
638 stream, beg_clock_snapshot=10, end_clock_snapshot=20
639 )
640
641 return 123
642
643 res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
644 self.assertEqual(res, 123)
645
646 # Trying to create without clock snapshots when the stream requires them.
647 def test_create_missing_clock_snapshots_raises(self):
648 def create_stream_class(tc, cc):
649 return tc.create_stream_class(
650 default_clock_class=cc,
651 supports_packets=True,
652 supports_discarded_packets=True,
653 discarded_packets_have_default_clock_snapshots=True,
654 )
655
656 def msg_iter_next(msg_iter, stream):
657 with self.assertRaisesRegex(
658 ValueError,
659 'discarded packets have default clock snapshots for this stream class',
660 ):
661 msg_iter._create_discarded_packets_message(stream)
662
663 return 123
664
665 res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
666 self.assertEqual(res, 123)
667
668 # End clock snapshot greater than beginning clock snapshot.
669 def test_create_clock_snapshots_end_gt_begin_raises(self):
670 def create_stream_class(tc, cc):
671 return tc.create_stream_class(
672 default_clock_class=cc,
673 supports_packets=True,
674 supports_discarded_packets=True,
675 discarded_packets_have_default_clock_snapshots=True,
676 )
677
678 def msg_iter_next(msg_iter, stream):
679 with self.assertRaisesRegex(
680 ValueError,
681 r'beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)',
682 ):
683 msg_iter._create_discarded_packets_message(
684 stream, beg_clock_snapshot=20, end_clock_snapshot=10
685 )
686
687 return 123
688
689 res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
690 self.assertEqual(res, 123)
691
692
693if __name__ == '__main__':
694 unittest.main()
This page took 0.024637 seconds and 4 git commands to generate.