cpp-common/bt2c/fmt.hpp: use `wise_enum::string_type` in `EnableIfIsWiseEnum` definition
[babeltrace.git] / tests / bindings / python / bt2 / test_message.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # Copyright (C) 2019 EfficiOS Inc.
4 #
5
6 import unittest
7
8 import bt2
9 import utils
10 from bt2 import event as bt2_event
11 from bt2 import field as bt2_field
12 from bt2 import trace as bt2_trace
13 from bt2 import packet as bt2_packet
14 from bt2 import stream as bt2_stream
15 from bt2 import event_class as bt2_event_class
16 from bt2 import trace_class as bt2_trace_class
17 from bt2 import stream_class as bt2_stream_class
18 from bt2 import clock_snapshot as bt2_clock_snapshot
19 from utils import TestOutputPortMessageIterator
20
21
22 class AllMessagesTestCase(unittest.TestCase):
23 def setUp(self):
24 class MyIter(bt2._UserMessageIterator):
25 def __init__(self, config, self_port_output):
26 self._at = 0
27 self._with_stream_msgs_clock_snapshots = self_port_output.user_data.get(
28 "with_stream_msgs_clock_snapshots", False
29 )
30
31 def __next__(self):
32 if test_obj._clock_class:
33 if self._at == 0:
34 if self._with_stream_msgs_clock_snapshots:
35 msg = self._create_stream_beginning_message(
36 test_obj._stream, default_clock_snapshot=self._at
37 )
38 else:
39 msg = self._create_stream_beginning_message(
40 test_obj._stream
41 )
42 test_obj.assertIs(type(msg), bt2._StreamBeginningMessage)
43 elif self._at == 1:
44 msg = self._create_packet_beginning_message(
45 test_obj._packet, self._at
46 )
47 test_obj.assertIs(type(msg), bt2._PacketBeginningMessage)
48 elif self._at == 2:
49 msg = self._create_event_message(
50 test_obj._event_class, test_obj._packet, self._at
51 )
52 test_obj.assertIs(type(msg), bt2._EventMessage)
53 elif self._at == 3:
54 msg = self._create_message_iterator_inactivity_message(
55 test_obj._clock_class, self._at
56 )
57 elif self._at == 4:
58 msg = self._create_discarded_events_message(
59 test_obj._stream, 890, self._at, self._at
60 )
61 test_obj.assertIs(type(msg), bt2._DiscardedEventsMessage)
62 elif self._at == 5:
63 msg = self._create_packet_end_message(
64 test_obj._packet, self._at
65 )
66 test_obj.assertIs(type(msg), bt2._PacketEndMessage)
67 elif self._at == 6:
68 msg = self._create_discarded_packets_message(
69 test_obj._stream, 678, self._at, self._at
70 )
71 test_obj.assertIs(type(msg), bt2._DiscardedPacketsMessage)
72 elif self._at == 7:
73 if self._with_stream_msgs_clock_snapshots:
74 msg = self._create_stream_end_message(
75 test_obj._stream, default_clock_snapshot=self._at
76 )
77 else:
78 msg = self._create_stream_end_message(test_obj._stream)
79 test_obj.assertIs(type(msg), bt2._StreamEndMessage)
80 elif self._at >= 8:
81 raise bt2.Stop
82 else:
83 if self._at == 0:
84 msg = self._create_stream_beginning_message(test_obj._stream)
85 elif self._at == 1:
86 msg = self._create_packet_beginning_message(test_obj._packet)
87 elif self._at == 2:
88 msg = self._create_event_message(
89 test_obj._event_class, test_obj._packet
90 )
91 elif self._at == 3:
92 msg = self._create_discarded_events_message(
93 test_obj._stream, 890
94 )
95 elif self._at == 4:
96 msg = self._create_packet_end_message(test_obj._packet)
97 elif self._at == 5:
98 msg = self._create_discarded_packets_message(
99 test_obj._stream, 678
100 )
101 elif self._at == 6:
102 msg = self._create_stream_end_message(test_obj._stream)
103 elif self._at >= 7:
104 raise bt2.Stop
105
106 self._at += 1
107 return msg
108
109 class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
110 def __init__(self, config, params, obj):
111 self._add_output_port("out", params)
112
113 with_cc = bool(params["with_cc"])
114 tc = self._create_trace_class()
115 if with_cc:
116 cc = self._create_clock_class()
117 else:
118 cc = None
119
120 sc = tc.create_stream_class(
121 default_clock_class=cc,
122 supports_packets=True,
123 packets_have_beginning_default_clock_snapshot=with_cc,
124 packets_have_end_default_clock_snapshot=with_cc,
125 supports_discarded_events=True,
126 discarded_events_have_default_clock_snapshots=with_cc,
127 supports_discarded_packets=True,
128 discarded_packets_have_default_clock_snapshots=with_cc,
129 )
130
131 # Create payload field class
132 my_int_fc = tc.create_signed_integer_field_class(32)
133 payload_fc = tc.create_structure_field_class()
134 payload_fc += [("my_int", my_int_fc)]
135
136 # Create specific context field class
137 my_int_fc = tc.create_signed_integer_field_class(32)
138 specific_fc = tc.create_structure_field_class()
139 specific_fc += [("my_int", my_int_fc)]
140
141 ec = sc.create_event_class(
142 name="salut",
143 payload_field_class=payload_fc,
144 specific_context_field_class=specific_fc,
145 )
146
147 trace = tc()
148 stream = trace.create_stream(sc)
149 packet = stream.create_packet()
150
151 test_obj._trace = trace
152 test_obj._stream = stream
153 test_obj._packet = packet
154 test_obj._event_class = ec
155 test_obj._clock_class = cc
156
157 test_obj = self
158 self._graph = bt2.Graph()
159 self._src = MySrc
160 self._iter = MyIter
161
162 def test_all_msg_with_cc(self):
163 params = {"with_cc": True}
164 self._src_comp = self._graph.add_component(self._src, "my_source", params)
165 self._msg_iter = TestOutputPortMessageIterator(
166 self._graph, self._src_comp.output_ports["out"]
167 )
168
169 for i, msg in enumerate(self._msg_iter):
170 if i == 0:
171 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
172 self.assertIs(type(msg.stream), bt2_stream._StreamConst)
173 self.assertEqual(msg.stream.addr, self._stream.addr)
174 self.assertIsInstance(
175 msg.default_clock_snapshot, bt2._UnknownClockSnapshot
176 )
177 elif i == 1:
178 self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
179 self.assertIs(type(msg.packet), bt2_packet._PacketConst)
180 self.assertIs(
181 type(msg.default_clock_snapshot),
182 bt2_clock_snapshot._ClockSnapshotConst,
183 )
184 self.assertEqual(msg.packet.addr, self._packet.addr)
185 self.assertEqual(msg.default_clock_snapshot.value, i)
186 elif i == 2:
187 self.assertIs(type(msg), bt2._EventMessageConst)
188 self.assertIs(type(msg.event), bt2_event._EventConst)
189 self.assertIs(
190 type(msg.default_clock_snapshot),
191 bt2_clock_snapshot._ClockSnapshotConst,
192 )
193 self.assertIs(
194 type(msg.event.payload_field), bt2_field._StructureFieldConst
195 )
196 self.assertIs(
197 type(msg.event.payload_field["my_int"]),
198 bt2_field._SignedIntegerFieldConst,
199 )
200
201 self.assertEqual(msg.event.cls.addr, self._event_class.addr)
202 self.assertEqual(msg.default_clock_snapshot.value, i)
203 elif i == 3:
204 self.assertIs(type(msg), bt2._MessageIteratorInactivityMessageConst)
205 self.assertIs(
206 type(msg.clock_snapshot), bt2_clock_snapshot._ClockSnapshotConst
207 )
208 self.assertEqual(msg.clock_snapshot.value, i)
209 elif i == 4:
210 self.assertIs(type(msg), bt2._DiscardedEventsMessageConst)
211 self.assertIs(type(msg.stream), bt2_stream._StreamConst)
212 self.assertIs(type(msg.stream.cls), bt2_stream_class._StreamClassConst)
213 self.assertIs(
214 type(msg.beginning_default_clock_snapshot),
215 bt2_clock_snapshot._ClockSnapshotConst,
216 )
217 self.assertIs(
218 type(msg.end_default_clock_snapshot),
219 bt2_clock_snapshot._ClockSnapshotConst,
220 )
221
222 self.assertEqual(msg.stream.addr, self._stream.addr)
223 self.assertEqual(msg.count, 890)
224 self.assertEqual(
225 msg.stream.cls.default_clock_class.addr, self._clock_class.addr
226 )
227 self.assertEqual(msg.beginning_default_clock_snapshot.value, i)
228 self.assertEqual(msg.end_default_clock_snapshot.value, i)
229 elif i == 5:
230 self.assertIs(type(msg), bt2._PacketEndMessageConst)
231 self.assertIs(type(msg.packet), bt2_packet._PacketConst)
232 self.assertIs(
233 type(msg.default_clock_snapshot),
234 bt2_clock_snapshot._ClockSnapshotConst,
235 )
236 self.assertEqual(msg.packet.addr, self._packet.addr)
237 self.assertEqual(msg.default_clock_snapshot.value, i)
238 elif i == 6:
239 self.assertIs(type(msg), bt2._DiscardedPacketsMessageConst)
240 self.assertIs(type(msg.stream), bt2_stream._StreamConst)
241 self.assertIs(type(msg.stream.trace), bt2_trace._TraceConst)
242 self.assertIs(
243 type(msg.stream.trace.cls), bt2_trace_class._TraceClassConst
244 )
245 self.assertIs(
246 type(msg.beginning_default_clock_snapshot),
247 bt2_clock_snapshot._ClockSnapshotConst,
248 )
249 self.assertIs(
250 type(msg.end_default_clock_snapshot),
251 bt2_clock_snapshot._ClockSnapshotConst,
252 )
253 self.assertEqual(msg.stream.addr, self._stream.addr)
254 self.assertEqual(msg.count, 678)
255 self.assertEqual(
256 msg.stream.cls.default_clock_class.addr, self._clock_class.addr
257 )
258 self.assertEqual(msg.beginning_default_clock_snapshot.value, i)
259 self.assertEqual(msg.end_default_clock_snapshot.value, i)
260 elif i == 7:
261 self.assertIs(type(msg), bt2._StreamEndMessageConst)
262 self.assertIs(type(msg.stream), bt2_stream._StreamConst)
263 self.assertEqual(msg.stream.addr, self._stream.addr)
264 self.assertIs(
265 type(msg.default_clock_snapshot), bt2._UnknownClockSnapshot
266 )
267 else:
268 raise Exception
269
270 def test_all_msg_without_cc(self):
271 params = {"with_cc": False}
272 self._src_comp = self._graph.add_component(self._src, "my_source", params)
273 self._msg_iter = TestOutputPortMessageIterator(
274 self._graph, self._src_comp.output_ports["out"]
275 )
276
277 for i, msg in enumerate(self._msg_iter):
278 if i == 0:
279 self.assertIsInstance(msg, bt2._StreamBeginningMessageConst)
280 self.assertIs(type(msg.stream), bt2_stream._StreamConst)
281 self.assertEqual(msg.stream.addr, self._stream.addr)
282 with self.assertRaisesRegex(
283 ValueError, "stream class has no default clock class"
284 ):
285 msg.default_clock_snapshot
286 elif i == 1:
287 self.assertIsInstance(msg, bt2._PacketBeginningMessageConst)
288 self.assertIs(type(msg.packet), bt2_packet._PacketConst)
289 self.assertEqual(msg.packet.addr, self._packet.addr)
290 elif i == 2:
291 self.assertIsInstance(msg, bt2._EventMessageConst)
292 self.assertIs(type(msg.event), bt2_event._EventConst)
293 self.assertIs(type(msg.event.cls), bt2_event_class._EventClassConst)
294 self.assertEqual(msg.event.cls.addr, self._event_class.addr)
295 with self.assertRaisesRegex(
296 ValueError, "stream class has no default clock class"
297 ):
298 msg.default_clock_snapshot
299 elif i == 3:
300 self.assertIsInstance(msg, bt2._DiscardedEventsMessageConst)
301 self.assertIs(type(msg.stream), bt2_stream._StreamConst)
302 self.assertIs(type(msg.stream.cls), bt2_stream_class._StreamClassConst)
303 self.assertEqual(msg.stream.addr, self._stream.addr)
304 self.assertEqual(msg.count, 890)
305 self.assertIsNone(msg.stream.cls.default_clock_class)
306 with self.assertRaisesRegex(
307 ValueError,
308 "such a message has no clock snapshots for this stream class",
309 ):
310 msg.beginning_default_clock_snapshot
311 with self.assertRaisesRegex(
312 ValueError,
313 "such a message has no clock snapshots for this stream class",
314 ):
315 msg.end_default_clock_snapshot
316 elif i == 4:
317 self.assertIsInstance(msg, bt2._PacketEndMessageConst)
318 self.assertEqual(msg.packet.addr, self._packet.addr)
319 self.assertIs(type(msg.packet), bt2_packet._PacketConst)
320 elif i == 5:
321 self.assertIsInstance(msg, bt2._DiscardedPacketsMessageConst)
322 self.assertIs(type(msg.stream), bt2_stream._StreamConst)
323 self.assertIs(type(msg.stream.cls), bt2_stream_class._StreamClassConst)
324 self.assertIs(
325 type(msg.stream.cls.trace_class), bt2_trace_class._TraceClassConst
326 )
327 self.assertEqual(msg.stream.addr, self._stream.addr)
328 self.assertEqual(msg.count, 678)
329 self.assertIsNone(msg.stream.cls.default_clock_class)
330 with self.assertRaisesRegex(
331 ValueError,
332 "such a message has no clock snapshots for this stream class",
333 ):
334 msg.beginning_default_clock_snapshot
335 with self.assertRaisesRegex(
336 ValueError,
337 "such a message has no clock snapshots for this stream class",
338 ):
339 msg.end_default_clock_snapshot
340 elif i == 6:
341 self.assertIsInstance(msg, bt2._StreamEndMessageConst)
342 self.assertIs(type(msg.stream), bt2_stream._StreamConst)
343 self.assertEqual(msg.stream.addr, self._stream.addr)
344 with self.assertRaisesRegex(
345 ValueError, "stream class has no default clock class"
346 ):
347 msg.default_clock_snapshot
348 else:
349 raise Exception
350
351 def test_msg_stream_with_clock_snapshots(self):
352 params = {"with_cc": True, "with_stream_msgs_clock_snapshots": True}
353
354 self._src_comp = self._graph.add_component(self._src, "my_source", params)
355 self._msg_iter = TestOutputPortMessageIterator(
356 self._graph, self._src_comp.output_ports["out"]
357 )
358 msgs = list(self._msg_iter)
359
360 msg_stream_beg = msgs[0]
361 self.assertIsInstance(msg_stream_beg, bt2._StreamBeginningMessageConst)
362 self.assertIs(
363 type(msg_stream_beg.default_clock_snapshot),
364 bt2_clock_snapshot._ClockSnapshotConst,
365 )
366 self.assertEqual(msg_stream_beg.default_clock_snapshot.value, 0)
367
368 msg_stream_end = msgs[7]
369 self.assertIsInstance(msg_stream_end, bt2._StreamEndMessageConst)
370 self.assertIs(
371 type(msg_stream_end.default_clock_snapshot),
372 bt2_clock_snapshot._ClockSnapshotConst,
373 )
374 self.assertEqual(msg_stream_end.default_clock_snapshot.value, 7)
375
376 def test_stream_beg_msg(self):
377 msg = utils.get_stream_beginning_message()
378 self.assertIs(type(msg.stream), bt2_stream._Stream)
379
380 def test_stream_end_msg(self):
381 msg = utils.get_stream_end_message()
382 self.assertIs(type(msg.stream), bt2_stream._Stream)
383
384 def test_packet_beg_msg(self):
385 msg = utils.get_packet_beginning_message()
386 self.assertIs(type(msg.packet), bt2_packet._Packet)
387
388 def test_packet_end_msg(self):
389 msg = utils.get_packet_end_message()
390 self.assertIs(type(msg.packet), bt2_packet._Packet)
391
392 def test_event_msg(self):
393 msg = utils.get_event_message()
394 self.assertIs(type(msg.event), bt2_event._Event)
395
396
397 class CreateDiscardedEventMessageTestCase(unittest.TestCase):
398 # Most basic case.
399 def test_create(self):
400 def create_stream_class(tc, cc):
401 return tc.create_stream_class(supports_discarded_events=True)
402
403 def msg_iter_next(msg_iter, stream):
404 return msg_iter._create_discarded_events_message(stream)
405
406 msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
407 self.assertIs(type(msg), bt2._DiscardedEventsMessage)
408 self.assertIs(msg.count, None)
409
410 # With event count.
411 def test_create_with_count(self):
412 def create_stream_class(tc, cc):
413 return tc.create_stream_class(supports_discarded_events=True)
414
415 def msg_iter_next(msg_iter, stream):
416 return msg_iter._create_discarded_events_message(stream, count=242)
417
418 msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
419 self.assertIs(type(msg), bt2._DiscardedEventsMessage)
420 self.assertEqual(msg.count, 242)
421
422 # With event count == 0.
423 def test_create_with_count_zero_raises(self):
424 def create_stream_class(tc, cc):
425 return tc.create_stream_class(supports_discarded_events=True)
426
427 def msg_iter_next(msg_iter, stream):
428 with self.assertRaisesRegex(
429 ValueError,
430 "discarded event count is 0",
431 ):
432 msg_iter._create_discarded_events_message(stream, count=0)
433
434 return 123
435
436 res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
437 self.assertEqual(res, 123)
438
439 # With clock snapshots.
440 def test_create_with_clock_snapshots(self):
441 def create_stream_class(tc, cc):
442 return tc.create_stream_class(
443 default_clock_class=cc,
444 supports_discarded_events=True,
445 discarded_events_have_default_clock_snapshots=True,
446 )
447
448 def msg_iter_next(msg_iter, stream):
449 return msg_iter._create_discarded_events_message(
450 stream, beg_clock_snapshot=10, end_clock_snapshot=20
451 )
452
453 msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
454 self.assertIs(type(msg), bt2._DiscardedEventsMessage)
455 self.assertEqual(msg.beginning_default_clock_snapshot, 10)
456 self.assertEqual(msg.end_default_clock_snapshot, 20)
457
458 # Trying to create when the stream does not support discarded events.
459 def test_create_unsupported_raises(self):
460 def create_stream_class(tc, cc):
461 return tc.create_stream_class()
462
463 def msg_iter_next(msg_iter, stream):
464 with self.assertRaisesRegex(
465 ValueError, "stream class does not support discarded events"
466 ):
467 msg_iter._create_discarded_events_message(stream)
468
469 return 123
470
471 res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
472 self.assertEqual(res, 123)
473
474 # Trying to create with clock snapshots when the stream does not support
475 # them.
476 def test_create_unsupported_clock_snapshots_raises(self):
477 def create_stream_class(tc, cc):
478 return tc.create_stream_class(supports_discarded_events=True)
479
480 def msg_iter_next(msg_iter, stream):
481 with self.assertRaisesRegex(
482 ValueError,
483 "discarded events have no default clock snapshots for this stream class",
484 ):
485 msg_iter._create_discarded_events_message(
486 stream, beg_clock_snapshot=10, end_clock_snapshot=20
487 )
488
489 return 123
490
491 res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
492 self.assertEqual(res, 123)
493
494 # Trying to create without clock snapshots when the stream requires them.
495 def test_create_missing_clock_snapshots_raises(self):
496 def create_stream_class(tc, cc):
497 return tc.create_stream_class(
498 default_clock_class=cc,
499 supports_discarded_events=True,
500 discarded_events_have_default_clock_snapshots=True,
501 )
502
503 def msg_iter_next(msg_iter, stream):
504 with self.assertRaisesRegex(
505 ValueError,
506 "discarded events have default clock snapshots for this stream class",
507 ):
508 msg_iter._create_discarded_events_message(stream)
509
510 return 123
511
512 res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
513 self.assertEqual(res, 123)
514
515 # End clock snapshot greater than beginning clock snapshot.
516 def test_create_clock_snapshots_end_gt_begin_raises(self):
517 def create_stream_class(tc, cc):
518 return tc.create_stream_class(
519 default_clock_class=cc,
520 supports_discarded_events=True,
521 discarded_events_have_default_clock_snapshots=True,
522 )
523
524 def msg_iter_next(msg_iter, stream):
525 with self.assertRaisesRegex(
526 ValueError,
527 r"beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)",
528 ):
529 msg_iter._create_discarded_events_message(
530 stream, beg_clock_snapshot=20, end_clock_snapshot=10
531 )
532
533 return 123
534
535 res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
536 self.assertEqual(res, 123)
537
538
539 class CreateDiscardedPacketMessageTestCase(unittest.TestCase):
540 # Most basic case.
541 def test_create(self):
542 def create_stream_class(tc, cc):
543 return tc.create_stream_class(
544 supports_packets=True, supports_discarded_packets=True
545 )
546
547 def msg_iter_next(msg_iter, stream):
548 return msg_iter._create_discarded_packets_message(stream)
549
550 msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
551 self.assertIs(type(msg), bt2._DiscardedPacketsMessage)
552 self.assertIs(msg.count, None)
553
554 # With packet count.
555 def test_create_with_count(self):
556 def create_stream_class(tc, cc):
557 return tc.create_stream_class(
558 supports_packets=True, supports_discarded_packets=True
559 )
560
561 def msg_iter_next(msg_iter, stream):
562 return msg_iter._create_discarded_packets_message(stream, count=242)
563
564 msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
565 self.assertIs(type(msg), bt2._DiscardedPacketsMessage)
566 self.assertEqual(msg.count, 242)
567
568 # With packet count == 0.
569 def test_create_with_count_zero_raises(self):
570 def create_stream_class(tc, cc):
571 return tc.create_stream_class(
572 supports_packets=True, supports_discarded_packets=True
573 )
574
575 def msg_iter_next(msg_iter, stream):
576 with self.assertRaisesRegex(
577 ValueError,
578 "discarded packet count is 0",
579 ):
580 msg_iter._create_discarded_packets_message(stream, count=0)
581
582 return 123
583
584 res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
585 self.assertEqual(res, 123)
586
587 # With clock snapshots.
588 def test_create_with_clock_snapshots(self):
589 def create_stream_class(tc, cc):
590 return tc.create_stream_class(
591 default_clock_class=cc,
592 supports_packets=True,
593 supports_discarded_packets=True,
594 discarded_packets_have_default_clock_snapshots=True,
595 )
596
597 def msg_iter_next(msg_iter, stream):
598 return msg_iter._create_discarded_packets_message(
599 stream, beg_clock_snapshot=10, end_clock_snapshot=20
600 )
601
602 msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
603 self.assertIs(type(msg), bt2._DiscardedPacketsMessage)
604 self.assertEqual(msg.beginning_default_clock_snapshot, 10)
605 self.assertEqual(msg.end_default_clock_snapshot, 20)
606
607 # Trying to create when the stream does not support discarded packets.
608 def test_create_unsupported_raises(self):
609 def create_stream_class(tc, cc):
610 return tc.create_stream_class(
611 supports_packets=True,
612 )
613
614 def msg_iter_next(msg_iter, stream):
615 with self.assertRaisesRegex(
616 ValueError, "stream class does not support discarded packets"
617 ):
618 msg_iter._create_discarded_packets_message(stream)
619
620 return 123
621
622 res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
623 self.assertEqual(res, 123)
624
625 # Trying to create with clock snapshots when the stream does not support
626 # them.
627 def test_create_unsupported_clock_snapshots_raises(self):
628 def create_stream_class(tc, cc):
629 return tc.create_stream_class(
630 supports_packets=True, supports_discarded_packets=True
631 )
632
633 def msg_iter_next(msg_iter, stream):
634 with self.assertRaisesRegex(
635 ValueError,
636 "discarded packets have no default clock snapshots for this stream class",
637 ):
638 msg_iter._create_discarded_packets_message(
639 stream, beg_clock_snapshot=10, end_clock_snapshot=20
640 )
641
642 return 123
643
644 res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
645 self.assertEqual(res, 123)
646
647 # Trying to create without clock snapshots when the stream requires them.
648 def test_create_missing_clock_snapshots_raises(self):
649 def create_stream_class(tc, cc):
650 return tc.create_stream_class(
651 default_clock_class=cc,
652 supports_packets=True,
653 supports_discarded_packets=True,
654 discarded_packets_have_default_clock_snapshots=True,
655 )
656
657 def msg_iter_next(msg_iter, stream):
658 with self.assertRaisesRegex(
659 ValueError,
660 "discarded packets have default clock snapshots for this stream class",
661 ):
662 msg_iter._create_discarded_packets_message(stream)
663
664 return 123
665
666 res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
667 self.assertEqual(res, 123)
668
669 # End clock snapshot greater than beginning clock snapshot.
670 def test_create_clock_snapshots_end_gt_begin_raises(self):
671 def create_stream_class(tc, cc):
672 return tc.create_stream_class(
673 default_clock_class=cc,
674 supports_packets=True,
675 supports_discarded_packets=True,
676 discarded_packets_have_default_clock_snapshots=True,
677 )
678
679 def msg_iter_next(msg_iter, stream):
680 with self.assertRaisesRegex(
681 ValueError,
682 r"beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)",
683 ):
684 msg_iter._create_discarded_packets_message(
685 stream, beg_clock_snapshot=20, end_clock_snapshot=10
686 )
687
688 return 123
689
690 res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
691 self.assertEqual(res, 123)
692
693
694 if __name__ == "__main__":
695 unittest.main()
This page took 0.052863 seconds and 4 git commands to generate.