fix: test_message_iterator.py hangs on Python 3.12
[babeltrace.git] / tests / bindings / python / bt2 / test_message_iterator.py
1 #
2 # Copyright (C) 2019 EfficiOS Inc.
3 #
4 # This program is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License
6 # as published by the Free Software Foundation; only version 2
7 # of the License.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 #
18
19 import unittest
20 import bt2
21 import sys
22 from utils import TestOutputPortMessageIterator
23 from bt2 import port as bt2_port
24 from bt2 import message_iterator as bt2_message_iterator
25
26
27 class SimpleSink(bt2._UserSinkComponent):
28 # Straightforward sink that creates one input port (`in`) and consumes from
29 # it.
30
31 def __init__(self, config, params, obj):
32 self._add_input_port('in')
33
34 def _user_consume(self):
35 next(self._msg_iter)
36
37 def _user_graph_is_configured(self):
38 self._msg_iter = self._create_message_iterator(self._input_ports['in'])
39
40
41 def _create_graph(src_comp_cls, sink_comp_cls, flt_comp_cls=None):
42 graph = bt2.Graph()
43
44 src_comp = graph.add_component(src_comp_cls, 'src')
45 sink_comp = graph.add_component(sink_comp_cls, 'sink')
46
47 if flt_comp_cls is not None:
48 flt_comp = graph.add_component(flt_comp_cls, 'flt')
49 graph.connect_ports(src_comp.output_ports['out'], flt_comp.input_ports['in'])
50 graph.connect_ports(flt_comp.output_ports['out'], sink_comp.input_ports['in'])
51 else:
52 graph.connect_ports(src_comp.output_ports['out'], sink_comp.input_ports['in'])
53
54 return graph
55
56
57 class UserMessageIteratorTestCase(unittest.TestCase):
58 def test_init(self):
59 the_output_port_from_source = None
60 the_output_port_from_iter = None
61
62 class MyIter(bt2._UserMessageIterator):
63 def __init__(self, config, self_port_output):
64 nonlocal initialized
65 nonlocal the_output_port_from_iter
66 initialized = True
67 the_output_port_from_iter = self_port_output
68
69 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
70 def __init__(self, config, params, obj):
71 nonlocal the_output_port_from_source
72 the_output_port_from_source = self._add_output_port('out', 'user data')
73
74 initialized = False
75 graph = _create_graph(MySource, SimpleSink)
76 graph.run()
77 self.assertTrue(initialized)
78 self.assertEqual(
79 the_output_port_from_source.addr, the_output_port_from_iter.addr
80 )
81 self.assertEqual(the_output_port_from_iter.user_data, 'user data')
82
83 def test_create_from_message_iterator(self):
84 class MySourceIter(bt2._UserMessageIterator):
85 def __init__(self, config, self_port_output):
86 nonlocal src_iter_initialized
87 src_iter_initialized = True
88
89 class MySource(bt2._UserSourceComponent, message_iterator_class=MySourceIter):
90 def __init__(self, config, params, obj):
91 self._add_output_port('out')
92
93 class MyFilterIter(bt2._UserMessageIterator):
94 def __init__(self, config, self_port_output):
95 nonlocal flt_iter_initialized
96 flt_iter_initialized = True
97 self._up_iter = self._create_message_iterator(
98 self._component._input_ports['in']
99 )
100
101 def __next__(self):
102 return next(self._up_iter)
103
104 class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
105 def __init__(self, config, params, obj):
106 self._add_input_port('in')
107 self._add_output_port('out')
108
109 src_iter_initialized = False
110 flt_iter_initialized = False
111 graph = _create_graph(MySource, SimpleSink, MyFilter)
112 graph.run()
113 self.assertTrue(src_iter_initialized)
114 self.assertTrue(flt_iter_initialized)
115
116 def test_create_user_error(self):
117 # This tests both error handling by
118 # _UserSinkComponent._create_message_iterator
119 # and _UserMessageIterator._create_message_iterator, as they
120 # are both used in the graph.
121 class MySourceIter(bt2._UserMessageIterator):
122 def __init__(self, config, self_port_output):
123 raise ValueError('Very bad error')
124
125 class MySource(bt2._UserSourceComponent, message_iterator_class=MySourceIter):
126 def __init__(self, config, params, obj):
127 self._add_output_port('out')
128
129 class MyFilterIter(bt2._UserMessageIterator):
130 def __init__(self, config, self_port_output):
131 # This is expected to raise because of the error in
132 # MySourceIter.__init__.
133 self._create_message_iterator(self._component._input_ports['in'])
134
135 class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
136 def __init__(self, config, params, obj):
137 self._add_input_port('in')
138 self._add_output_port('out')
139
140 graph = _create_graph(MySource, SimpleSink, MyFilter)
141
142 with self.assertRaises(bt2._Error) as ctx:
143 graph.run()
144
145 exc = ctx.exception
146 cause = exc[0]
147
148 self.assertIsInstance(cause, bt2._MessageIteratorErrorCause)
149 self.assertEqual(cause.component_name, 'src')
150 self.assertEqual(cause.component_output_port_name, 'out')
151 self.assertIn('ValueError: Very bad error', cause.message)
152
153 def test_finalize(self):
154 class MyIter(bt2._UserMessageIterator):
155 def _user_finalize(self):
156 nonlocal finalized
157 finalized = True
158
159 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
160 def __init__(self, config, params, obj):
161 self._add_output_port('out')
162
163 finalized = False
164 graph = _create_graph(MySource, SimpleSink)
165 graph.run()
166 del graph
167 self.assertTrue(finalized)
168
169 def test_config_parameter(self):
170 class MyIter(bt2._UserMessageIterator):
171 def __init__(self, config, port):
172 nonlocal config_type
173 config_type = type(config)
174
175 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
176 def __init__(self, config, params, obj):
177 self._add_output_port('out')
178
179 config_type = None
180 graph = _create_graph(MySource, SimpleSink)
181 graph.run()
182 self.assertIs(config_type, bt2_message_iterator._MessageIteratorConfiguration)
183
184 def _test_config_can_seek_forward(self, set_can_seek_forward):
185 class MyIter(bt2._UserMessageIterator):
186 def __init__(self, config, port):
187 if set_can_seek_forward:
188 config.can_seek_forward = True
189
190 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
191 def __init__(self, config, params, obj):
192 self._add_output_port('out')
193
194 class MySink(bt2._UserSinkComponent):
195 def __init__(self, config, params, obj):
196 self._add_input_port('in')
197
198 def _user_graph_is_configured(self):
199 self._msg_iter = self._create_message_iterator(self._input_ports['in'])
200
201 def _user_consume(self):
202 nonlocal can_seek_forward
203 can_seek_forward = self._msg_iter.can_seek_forward
204
205 can_seek_forward = None
206 graph = _create_graph(MySource, MySink)
207 graph.run_once()
208 self.assertIs(can_seek_forward, set_can_seek_forward)
209
210 def test_config_can_seek_forward_default(self):
211 self._test_config_can_seek_forward(False)
212
213 def test_config_can_seek_forward(self):
214 self._test_config_can_seek_forward(True)
215
216 def test_config_can_seek_forward_wrong_type(self):
217 class MyIter(bt2._UserMessageIterator):
218 def __init__(self, config, port):
219 config.can_seek_forward = 1
220
221 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
222 def __init__(self, config, params, obj):
223 self._add_output_port('out')
224
225 graph = _create_graph(MySource, SimpleSink)
226 with self.assertRaises(bt2._Error) as ctx:
227 graph.run()
228
229 root_cause = ctx.exception[0]
230 self.assertIn("TypeError: 'int' is not a 'bool' object", root_cause.message)
231
232 def test_component(self):
233 class MyIter(bt2._UserMessageIterator):
234 def __init__(self, config, self_port_output):
235 nonlocal salut
236 salut = self._component._salut
237
238 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
239 def __init__(self, config, params, obj):
240 self._add_output_port('out')
241 self._salut = 23
242
243 salut = None
244 graph = _create_graph(MySource, SimpleSink)
245 graph.run()
246 self.assertEqual(salut, 23)
247
248 def test_port(self):
249 class MyIter(bt2._UserMessageIterator):
250 def __init__(self_iter, config, self_port_output):
251 nonlocal called
252 called = True
253 port = self_iter._port
254 self.assertIs(type(self_port_output), bt2_port._UserComponentOutputPort)
255 self.assertIs(type(port), bt2_port._UserComponentOutputPort)
256 self.assertEqual(self_port_output.addr, port.addr)
257
258 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
259 def __init__(self, config, params, obj):
260 self._add_output_port('out')
261
262 called = False
263 graph = _create_graph(MySource, SimpleSink)
264 graph.run()
265 self.assertTrue(called)
266
267 def test_addr(self):
268 class MyIter(bt2._UserMessageIterator):
269 def __init__(self, config, self_port_output):
270 nonlocal addr
271 addr = self.addr
272
273 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
274 def __init__(self, config, params, obj):
275 self._add_output_port('out')
276
277 addr = None
278 graph = _create_graph(MySource, SimpleSink)
279 graph.run()
280 self.assertIsNotNone(addr)
281 self.assertNotEqual(addr, 0)
282
283 # Test that messages returned by _UserMessageIterator.__next__ remain valid
284 # and can be re-used.
285 def test_reuse_message(self):
286 class MyIter(bt2._UserMessageIterator):
287 def __init__(self, config, port):
288 tc, sc, ec = port.user_data
289 trace = tc()
290 stream = trace.create_stream(sc)
291 packet = stream.create_packet()
292
293 # This message will be returned twice by __next__.
294 event_message = self._create_event_message(ec, packet)
295
296 self._msgs = [
297 self._create_stream_beginning_message(stream),
298 self._create_packet_beginning_message(packet),
299 event_message,
300 event_message,
301 ]
302
303 def __next__(self):
304 return self._msgs.pop(0)
305
306 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
307 def __init__(self, config, params, obj):
308 tc = self._create_trace_class()
309 sc = tc.create_stream_class(supports_packets=True)
310 ec = sc.create_event_class()
311 self._add_output_port('out', (tc, sc, ec))
312
313 graph = bt2.Graph()
314 src = graph.add_component(MySource, 'src')
315 it = TestOutputPortMessageIterator(graph, src.output_ports['out'])
316
317 # Skip beginning messages.
318 msg = next(it)
319 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
320 msg = next(it)
321 self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
322
323 msg_ev1 = next(it)
324 msg_ev2 = next(it)
325
326 self.assertIs(type(msg_ev1), bt2._EventMessageConst)
327 self.assertIs(type(msg_ev2), bt2._EventMessageConst)
328 self.assertEqual(msg_ev1.addr, msg_ev2.addr)
329
330 # Try consuming many times from an iterator that always returns TryAgain.
331 # This verifies that we are not missing an incref of Py_None, making the
332 # refcount of Py_None reach 0.
333 def test_try_again_many_times(self):
334 # Starting with Python 3.12, `None` is immortal: its reference
335 # count operations are no-op. Skip this test in that case.
336 before = sys.getrefcount(None)
337 dummy = None # noqa: F841
338
339 if before == sys.getrefcount(None):
340 raise unittest.SkipTest("`None` is immortal")
341
342 class MyIter(bt2._UserMessageIterator):
343 def __next__(self):
344 raise bt2.TryAgain
345
346 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
347 def __init__(self, config, params, obj):
348 self._add_output_port('out')
349
350 class MyFilterIter(bt2._UserMessageIterator):
351 def __init__(self, port):
352 input_port = port.user_data
353 self._upstream_iter = self._create_message_iterator(input_port)
354
355 def __next__(self):
356 return next(self._upstream_iter)
357
358 def _user_seek_beginning(self):
359 self._upstream_iter.seek_beginning()
360
361 def _user_can_seek_beginning(self):
362 return self._upstream_iter.can_seek_beginning()
363
364 class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
365 def __init__(self, config, params, obj):
366 input_port = self._add_input_port('in')
367 self._add_output_port('out', input_port)
368
369 graph = bt2.Graph()
370 src = graph.add_component(MySource, 'src')
371 it = TestOutputPortMessageIterator(graph, src.output_ports['out'])
372
373 # Three times the initial ref count of `None` iterations should
374 # be enough to catch the bug even if there are small differences
375 # between configurations.
376 none_ref_count = sys.getrefcount(None) * 3
377
378 for i in range(none_ref_count):
379 with self.assertRaises(bt2.TryAgain):
380 next(it)
381
382 def test_error_in_iterator_with_cycle_after_having_created_upstream_iterator(self):
383 # Test a failure that triggered an abort in libbabeltrace2, in this situation:
384 #
385 # - The filter iterator creates an upstream iterator.
386 # - The filter iterator creates a reference cycle, including itself.
387 # - An exception is raised, causing the filter iterator's
388 # initialization method to fail.
389 class MySourceIter(bt2._UserMessageIterator):
390 pass
391
392 class MySource(bt2._UserSourceComponent, message_iterator_class=MySourceIter):
393 def __init__(self, config, params, obj):
394 self._add_output_port('out')
395
396 class MyFilterIter(bt2._UserMessageIterator):
397 def __init__(self, config, port):
398 # First, create an upstream iterator.
399 self._upstream_iter = self._create_message_iterator(
400 self._component._input_ports['in']
401 )
402
403 # Then, voluntarily make a reference cycle that will keep this
404 # Python object alive, which will keep the upstream iterator
405 # Babeltrace object alive.
406 self._self = self
407
408 # Finally, raise an exception to make __init__ fail.
409 raise ValueError('woops')
410
411 class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
412 def __init__(self, config, params, obj):
413 self._in = self._add_input_port('in')
414 self._out = self._add_output_port('out')
415
416 class MySink(bt2._UserSinkComponent):
417 def __init__(self, config, params, obj):
418 self._input_port = self._add_input_port('in')
419
420 def _user_graph_is_configured(self):
421 self._upstream_iter = self._create_message_iterator(self._input_port)
422
423 def _user_consume(self):
424 # We should not reach this.
425 assert False
426
427 g = bt2.Graph()
428 src = g.add_component(MySource, 'src')
429 flt = g.add_component(MyFilter, 'flt')
430 snk = g.add_component(MySink, 'snk')
431 g.connect_ports(src.output_ports['out'], flt.input_ports['in'])
432 g.connect_ports(flt.output_ports['out'], snk.input_ports['in'])
433
434 with self.assertRaisesRegex(bt2._Error, 'ValueError: woops'):
435 g.run()
436
437
438 def _setup_seek_test(
439 sink_cls,
440 user_seek_beginning=None,
441 user_can_seek_beginning=None,
442 user_seek_ns_from_origin=None,
443 user_can_seek_ns_from_origin=None,
444 can_seek_forward=False,
445 ):
446 class MySourceIter(bt2._UserMessageIterator):
447 def __init__(self, config, port):
448 tc, sc, ec = port.user_data
449 trace = tc()
450 stream = trace.create_stream(sc)
451 packet = stream.create_packet()
452
453 self._msgs = [
454 self._create_stream_beginning_message(stream),
455 self._create_packet_beginning_message(packet),
456 self._create_event_message(ec, packet),
457 self._create_event_message(ec, packet),
458 self._create_packet_end_message(packet),
459 self._create_stream_end_message(stream),
460 ]
461 self._at = 0
462 config.can_seek_forward = can_seek_forward
463
464 def __next__(self):
465 if self._at < len(self._msgs):
466 msg = self._msgs[self._at]
467 self._at += 1
468 return msg
469 else:
470 raise StopIteration
471
472 if user_seek_beginning is not None:
473 MySourceIter._user_seek_beginning = user_seek_beginning
474
475 if user_can_seek_beginning is not None:
476 MySourceIter._user_can_seek_beginning = user_can_seek_beginning
477
478 if user_seek_ns_from_origin is not None:
479 MySourceIter._user_seek_ns_from_origin = user_seek_ns_from_origin
480
481 if user_can_seek_ns_from_origin is not None:
482 MySourceIter._user_can_seek_ns_from_origin = user_can_seek_ns_from_origin
483
484 class MySource(bt2._UserSourceComponent, message_iterator_class=MySourceIter):
485 def __init__(self, config, params, obj):
486 tc = self._create_trace_class()
487 sc = tc.create_stream_class(supports_packets=True)
488 ec = sc.create_event_class()
489
490 self._add_output_port('out', (tc, sc, ec))
491
492 class MyFilterIter(bt2._UserMessageIterator):
493 def __init__(self, config, port):
494 self._upstream_iter = self._create_message_iterator(
495 self._component._input_ports['in']
496 )
497 config.can_seek_forward = self._upstream_iter.can_seek_forward
498
499 def __next__(self):
500 return next(self._upstream_iter)
501
502 def _user_can_seek_beginning(self):
503 return self._upstream_iter.can_seek_beginning()
504
505 def _user_seek_beginning(self):
506 self._upstream_iter.seek_beginning()
507
508 def _user_can_seek_ns_from_origin(self, ns_from_origin):
509 return self._upstream_iter.can_seek_ns_from_origin(ns_from_origin)
510
511 def _user_seek_ns_from_origin(self, ns_from_origin):
512 self._upstream_iter.seek_ns_from_origin(ns_from_origin)
513
514 class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
515 def __init__(self, config, params, obj):
516 self._add_input_port('in')
517 self._add_output_port('out')
518
519 return _create_graph(MySource, sink_cls, flt_comp_cls=MyFilter)
520
521
522 class UserMessageIteratorSeekBeginningTestCase(unittest.TestCase):
523 def test_can_seek_beginning_without_seek_beginning(self):
524 with self.assertRaisesRegex(
525 bt2._IncompleteUserClass,
526 "cannot create component class 'MySource': message iterator class implements _user_can_seek_beginning but not _user_seek_beginning",
527 ):
528 _setup_seek_test(SimpleSink, user_can_seek_beginning=lambda: None)
529
530 def test_can_seek_beginning(self):
531 class MySink(bt2._UserSinkComponent):
532 def __init__(self, config, params, obj):
533 self._add_input_port('in')
534
535 def _user_graph_is_configured(self):
536 self._msg_iter = self._create_message_iterator(self._input_ports['in'])
537
538 def _user_consume(self):
539 nonlocal can_seek_beginning
540 can_seek_beginning = self._msg_iter.can_seek_beginning()
541
542 def _user_can_seek_beginning(self):
543 nonlocal input_port_iter_can_seek_beginning
544 return input_port_iter_can_seek_beginning
545
546 graph = _setup_seek_test(
547 MySink,
548 user_can_seek_beginning=_user_can_seek_beginning,
549 user_seek_beginning=lambda: None,
550 )
551
552 input_port_iter_can_seek_beginning = True
553 can_seek_beginning = None
554 graph.run_once()
555 self.assertIs(can_seek_beginning, True)
556
557 input_port_iter_can_seek_beginning = False
558 can_seek_beginning = None
559 graph.run_once()
560 self.assertIs(can_seek_beginning, False)
561
562 def test_no_can_seek_beginning_with_seek_beginning(self):
563 # Test an iterator without a _user_can_seek_beginning method, but with
564 # a _user_seek_beginning method.
565 class MySink(bt2._UserSinkComponent):
566 def __init__(self, config, params, obj):
567 self._add_input_port('in')
568
569 def _user_graph_is_configured(self):
570 self._msg_iter = self._create_message_iterator(self._input_ports['in'])
571
572 def _user_consume(self):
573 nonlocal can_seek_beginning
574 can_seek_beginning = self._msg_iter.can_seek_beginning()
575
576 def _user_seek_beginning(self):
577 pass
578
579 graph = _setup_seek_test(MySink, user_seek_beginning=_user_seek_beginning)
580 can_seek_beginning = None
581 graph.run_once()
582 self.assertIs(can_seek_beginning, True)
583
584 def test_no_can_seek_beginning(self):
585 # Test an iterator without a _user_can_seek_beginning method, without
586 # a _user_seek_beginning method.
587 class MySink(bt2._UserSinkComponent):
588 def __init__(self, config, params, obj):
589 self._add_input_port('in')
590
591 def _user_graph_is_configured(self):
592 self._msg_iter = self._create_message_iterator(self._input_ports['in'])
593
594 def _user_consume(self):
595 nonlocal can_seek_beginning
596 can_seek_beginning = self._msg_iter.can_seek_beginning()
597
598 graph = _setup_seek_test(MySink)
599 can_seek_beginning = None
600 graph.run_once()
601 self.assertIs(can_seek_beginning, False)
602
603 def test_can_seek_beginning_user_error(self):
604 class MySink(bt2._UserSinkComponent):
605 def __init__(self, config, params, obj):
606 self._add_input_port('in')
607
608 def _user_graph_is_configured(self):
609 self._msg_iter = self._create_message_iterator(self._input_ports['in'])
610
611 def _user_consume(self):
612 # This is expected to raise.
613 self._msg_iter.can_seek_beginning()
614
615 def _user_can_seek_beginning(self):
616 raise ValueError('moustiquaire')
617
618 graph = _setup_seek_test(
619 MySink,
620 user_can_seek_beginning=_user_can_seek_beginning,
621 user_seek_beginning=lambda: None,
622 )
623
624 with self.assertRaises(bt2._Error) as ctx:
625 graph.run_once()
626
627 cause = ctx.exception[0]
628 self.assertIn('ValueError: moustiquaire', cause.message)
629
630 def test_can_seek_beginning_wrong_return_value(self):
631 class MySink(bt2._UserSinkComponent):
632 def __init__(self, config, params, obj):
633 self._add_input_port('in')
634
635 def _user_graph_is_configured(self):
636 self._msg_iter = self._create_message_iterator(self._input_ports['in'])
637
638 def _user_consume(self):
639 # This is expected to raise.
640 self._msg_iter.can_seek_beginning()
641
642 def _user_can_seek_beginning(self):
643 return 'Amqui'
644
645 graph = _setup_seek_test(
646 MySink,
647 user_can_seek_beginning=_user_can_seek_beginning,
648 user_seek_beginning=lambda: None,
649 )
650
651 with self.assertRaises(bt2._Error) as ctx:
652 graph.run_once()
653
654 cause = ctx.exception[0]
655 self.assertIn("TypeError: 'str' is not a 'bool' object", cause.message)
656
657 def test_seek_beginning(self):
658 class MySink(bt2._UserSinkComponent):
659 def __init__(self, config, params, obj):
660 self._add_input_port('in')
661
662 def _user_graph_is_configured(self):
663 self._msg_iter = self._create_message_iterator(self._input_ports['in'])
664
665 def _user_consume(self):
666 nonlocal do_seek_beginning
667 nonlocal msg
668
669 if do_seek_beginning:
670 self._msg_iter.seek_beginning()
671 return
672
673 msg = next(self._msg_iter)
674
675 def _user_seek_beginning(self):
676 self._at = 0
677
678 msg = None
679 graph = _setup_seek_test(MySink, user_seek_beginning=_user_seek_beginning)
680
681 # Consume message.
682 do_seek_beginning = False
683 graph.run_once()
684 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
685
686 # Consume message.
687 graph.run_once()
688 self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
689
690 # Seek beginning.
691 do_seek_beginning = True
692 graph.run_once()
693
694 # Consume message.
695 do_seek_beginning = False
696 graph.run_once()
697 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
698
699 def test_seek_beginning_user_error(self):
700 class MySink(bt2._UserSinkComponent):
701 def __init__(self, config, params, obj):
702 self._add_input_port('in')
703
704 def _user_graph_is_configured(self):
705 self._msg_iter = self._create_message_iterator(self._input_ports['in'])
706
707 def _user_consume(self):
708 self._msg_iter.seek_beginning()
709
710 def _user_seek_beginning(self):
711 raise ValueError('ouch')
712
713 graph = _setup_seek_test(MySink, user_seek_beginning=_user_seek_beginning)
714
715 with self.assertRaises(bt2._Error):
716 graph.run_once()
717
718
719 class UserMessageIteratorSeekNsFromOriginTestCase(unittest.TestCase):
720 def test_can_seek_ns_from_origin_without_seek_ns_from_origin(self):
721 # Test the case where:
722 #
723 # - can_seek_ns_from_origin: Returns True (don't really care, as long
724 # as it's provided)
725 # - seek_ns_from_origin provided: No
726 # - can the iterator seek beginning: Don't care
727 # - can the iterator seek forward: Don't care
728 for can_seek_ns_from_origin in (False, True):
729 for iter_can_seek_beginning in (False, True):
730 for iter_can_seek_forward in (False, True):
731 with self.assertRaisesRegex(
732 bt2._IncompleteUserClass,
733 "cannot create component class 'MySource': message iterator class implements _user_can_seek_ns_from_origin but not _user_seek_ns_from_origin",
734 ):
735 self._can_seek_ns_from_origin_test(
736 None,
737 user_can_seek_ns_from_origin_ret_val=True,
738 user_seek_ns_from_origin_provided=False,
739 iter_can_seek_beginning=iter_can_seek_beginning,
740 iter_can_seek_forward=iter_can_seek_forward,
741 )
742
743 def test_can_seek_ns_from_origin_returns_true(self):
744 # Test the case where:
745 #
746 # - can_seek_ns_from_origin: returns True
747 # - seek_ns_from_origin provided: Yes
748 # - can the iterator seek beginning: Don't care
749 # - can the iterator seek forward: Don't care
750 #
751 # We expect iter.can_seek_ns_from_origin to return True.
752 for iter_can_seek_beginning in (False, True):
753 for iter_can_seek_forward in (False, True):
754 self._can_seek_ns_from_origin_test(
755 expected_outcome=True,
756 user_can_seek_ns_from_origin_ret_val=True,
757 user_seek_ns_from_origin_provided=True,
758 iter_can_seek_beginning=iter_can_seek_beginning,
759 iter_can_seek_forward=iter_can_seek_forward,
760 )
761
762 def test_can_seek_ns_from_origin_returns_false_can_seek_beginning_forward_seekable(
763 self,
764 ):
765 # Test the case where:
766 #
767 # - can_seek_ns_from_origin: returns False
768 # - seek_ns_from_origin provided: Yes
769 # - can the iterator seek beginning: Yes
770 # - can the iterator seek forward: Yes
771 #
772 # We expect iter.can_seek_ns_from_origin to return True.
773 self._can_seek_ns_from_origin_test(
774 expected_outcome=True,
775 user_can_seek_ns_from_origin_ret_val=False,
776 user_seek_ns_from_origin_provided=True,
777 iter_can_seek_beginning=True,
778 iter_can_seek_forward=True,
779 )
780
781 def test_can_seek_ns_from_origin_returns_false_can_seek_beginning_not_forward_seekable(
782 self,
783 ):
784 # Test the case where:
785 #
786 # - can_seek_ns_from_origin: returns False
787 # - seek_ns_from_origin provided: Yes
788 # - can the iterator seek beginning: Yes
789 # - can the iterator seek forward: No
790 #
791 # We expect iter.can_seek_ns_from_origin to return False.
792 self._can_seek_ns_from_origin_test(
793 expected_outcome=False,
794 user_can_seek_ns_from_origin_ret_val=False,
795 user_seek_ns_from_origin_provided=True,
796 iter_can_seek_beginning=True,
797 iter_can_seek_forward=False,
798 )
799
800 def test_can_seek_ns_from_origin_returns_false_cant_seek_beginning_forward_seekable(
801 self,
802 ):
803 # Test the case where:
804 #
805 # - can_seek_ns_from_origin: returns False
806 # - seek_ns_from_origin provided: Yes
807 # - can the iterator seek beginning: No
808 # - can the iterator seek forward: Yes
809 #
810 # We expect iter.can_seek_ns_from_origin to return False.
811 self._can_seek_ns_from_origin_test(
812 expected_outcome=False,
813 user_can_seek_ns_from_origin_ret_val=False,
814 user_seek_ns_from_origin_provided=True,
815 iter_can_seek_beginning=False,
816 iter_can_seek_forward=True,
817 )
818
819 def test_can_seek_ns_from_origin_returns_false_cant_seek_beginning_not_forward_seekable(
820 self,
821 ):
822 # Test the case where:
823 #
824 # - can_seek_ns_from_origin: returns False
825 # - seek_ns_from_origin provided: Yes
826 # - can the iterator seek beginning: No
827 # - can the iterator seek forward: No
828 #
829 # We expect iter.can_seek_ns_from_origin to return False.
830 self._can_seek_ns_from_origin_test(
831 expected_outcome=False,
832 user_can_seek_ns_from_origin_ret_val=False,
833 user_seek_ns_from_origin_provided=True,
834 iter_can_seek_beginning=False,
835 iter_can_seek_forward=False,
836 )
837
838 def test_no_can_seek_ns_from_origin_seek_ns_from_origin(self):
839 # Test the case where:
840 #
841 # - can_seek_ns_from_origin: Not provided
842 # - seek_ns_from_origin provided: Yes
843 # - can the iterator seek beginning: Don't care
844 # - can the iterator seek forward: Don't care
845 #
846 # We expect iter.can_seek_ns_from_origin to return True.
847 for iter_can_seek_beginning in (False, True):
848 for iter_can_seek_forward in (False, True):
849 self._can_seek_ns_from_origin_test(
850 expected_outcome=True,
851 user_can_seek_ns_from_origin_ret_val=None,
852 user_seek_ns_from_origin_provided=True,
853 iter_can_seek_beginning=iter_can_seek_beginning,
854 iter_can_seek_forward=iter_can_seek_forward,
855 )
856
857 def test_no_can_seek_ns_from_origin_no_seek_ns_from_origin_can_seek_beginning_forward_seekable(
858 self,
859 ):
860 # Test the case where:
861 #
862 # - can_seek_ns_from_origin: Not provided
863 # - seek_ns_from_origin provided: Not provided
864 # - can the iterator seek beginning: Yes
865 # - can the iterator seek forward: Yes
866 #
867 # We expect iter.can_seek_ns_from_origin to return True.
868 self._can_seek_ns_from_origin_test(
869 expected_outcome=True,
870 user_can_seek_ns_from_origin_ret_val=None,
871 user_seek_ns_from_origin_provided=False,
872 iter_can_seek_beginning=True,
873 iter_can_seek_forward=True,
874 )
875
876 def test_no_can_seek_ns_from_origin_no_seek_ns_from_origin_can_seek_beginning_not_forward_seekable(
877 self,
878 ):
879 # Test the case where:
880 #
881 # - can_seek_ns_from_origin: Not provided
882 # - seek_ns_from_origin provided: Not provided
883 # - can the iterator seek beginning: Yes
884 # - can the iterator seek forward: No
885 #
886 # We expect iter.can_seek_ns_from_origin to return False.
887 self._can_seek_ns_from_origin_test(
888 expected_outcome=False,
889 user_can_seek_ns_from_origin_ret_val=None,
890 user_seek_ns_from_origin_provided=False,
891 iter_can_seek_beginning=True,
892 iter_can_seek_forward=False,
893 )
894
895 def test_no_can_seek_ns_from_origin_no_seek_ns_from_origin_cant_seek_beginning_forward_seekable(
896 self,
897 ):
898 # Test the case where:
899 #
900 # - can_seek_ns_from_origin: Not provided
901 # - seek_ns_from_origin provided: Not provided
902 # - can the iterator seek beginning: No
903 # - can the iterator seek forward: Yes
904 #
905 # We expect iter.can_seek_ns_from_origin to return False.
906 self._can_seek_ns_from_origin_test(
907 expected_outcome=False,
908 user_can_seek_ns_from_origin_ret_val=None,
909 user_seek_ns_from_origin_provided=False,
910 iter_can_seek_beginning=False,
911 iter_can_seek_forward=True,
912 )
913
914 def test_no_can_seek_ns_from_origin_no_seek_ns_from_origin_cant_seek_beginning_not_forward_seekable(
915 self,
916 ):
917 # Test the case where:
918 #
919 # - can_seek_ns_from_origin: Not provided
920 # - seek_ns_from_origin provided: Not provided
921 # - can the iterator seek beginning: No
922 # - can the iterator seek forward: No
923 #
924 # We expect iter.can_seek_ns_from_origin to return False.
925 self._can_seek_ns_from_origin_test(
926 expected_outcome=False,
927 user_can_seek_ns_from_origin_ret_val=None,
928 user_seek_ns_from_origin_provided=False,
929 iter_can_seek_beginning=False,
930 iter_can_seek_forward=False,
931 )
932
933 def _can_seek_ns_from_origin_test(
934 self,
935 expected_outcome,
936 user_can_seek_ns_from_origin_ret_val,
937 user_seek_ns_from_origin_provided,
938 iter_can_seek_beginning,
939 iter_can_seek_forward,
940 ):
941 class MySink(bt2._UserSinkComponent):
942 def __init__(self, config, params, obj):
943 self._add_input_port('in')
944
945 def _user_graph_is_configured(self):
946 self._msg_iter = self._create_message_iterator(self._input_ports['in'])
947
948 def _user_consume(self):
949 nonlocal can_seek_ns_from_origin
950 can_seek_ns_from_origin = self._msg_iter.can_seek_ns_from_origin(
951 passed_ns_from_origin
952 )
953
954 if user_can_seek_ns_from_origin_ret_val is not None:
955
956 def user_can_seek_ns_from_origin(self, ns_from_origin):
957 nonlocal received_ns_from_origin
958 received_ns_from_origin = ns_from_origin
959 return user_can_seek_ns_from_origin_ret_val
960
961 else:
962 user_can_seek_ns_from_origin = None
963
964 if user_seek_ns_from_origin_provided:
965
966 def user_seek_ns_from_origin(self, ns_from_origin):
967 pass
968
969 else:
970 user_seek_ns_from_origin = None
971
972 if iter_can_seek_beginning:
973
974 def user_seek_beginning(self):
975 pass
976
977 else:
978 user_seek_beginning = None
979
980 graph = _setup_seek_test(
981 MySink,
982 user_can_seek_ns_from_origin=user_can_seek_ns_from_origin,
983 user_seek_ns_from_origin=user_seek_ns_from_origin,
984 user_seek_beginning=user_seek_beginning,
985 can_seek_forward=iter_can_seek_forward,
986 )
987
988 passed_ns_from_origin = 77
989 received_ns_from_origin = None
990 can_seek_ns_from_origin = None
991 graph.run_once()
992 self.assertIs(can_seek_ns_from_origin, expected_outcome)
993
994 if user_can_seek_ns_from_origin_ret_val is not None:
995 self.assertEqual(received_ns_from_origin, passed_ns_from_origin)
996
997 def test_can_seek_ns_from_origin_user_error(self):
998 class MySink(bt2._UserSinkComponent):
999 def __init__(self, config, params, obj):
1000 self._add_input_port('in')
1001
1002 def _user_graph_is_configured(self):
1003 self._msg_iter = self._create_message_iterator(self._input_ports['in'])
1004
1005 def _user_consume(self):
1006 # This is expected to raise.
1007 self._msg_iter.can_seek_ns_from_origin(2)
1008
1009 def _user_can_seek_ns_from_origin(self, ns_from_origin):
1010 raise ValueError('Joutel')
1011
1012 graph = _setup_seek_test(
1013 MySink,
1014 user_can_seek_ns_from_origin=_user_can_seek_ns_from_origin,
1015 user_seek_ns_from_origin=lambda: None,
1016 )
1017
1018 with self.assertRaises(bt2._Error) as ctx:
1019 graph.run_once()
1020
1021 cause = ctx.exception[0]
1022 self.assertIn('ValueError: Joutel', cause.message)
1023
1024 def test_can_seek_ns_from_origin_wrong_return_value(self):
1025 class MySink(bt2._UserSinkComponent):
1026 def __init__(self, config, params, obj):
1027 self._add_input_port('in')
1028
1029 def _user_graph_is_configured(self):
1030 self._msg_iter = self._create_message_iterator(self._input_ports['in'])
1031
1032 def _user_consume(self):
1033 # This is expected to raise.
1034 self._msg_iter.can_seek_ns_from_origin(2)
1035
1036 def _user_can_seek_ns_from_origin(self, ns_from_origin):
1037 return 'Nitchequon'
1038
1039 graph = _setup_seek_test(
1040 MySink,
1041 user_can_seek_ns_from_origin=_user_can_seek_ns_from_origin,
1042 user_seek_ns_from_origin=lambda: None,
1043 )
1044
1045 with self.assertRaises(bt2._Error) as ctx:
1046 graph.run_once()
1047
1048 cause = ctx.exception[0]
1049 self.assertIn("TypeError: 'str' is not a 'bool' object", cause.message)
1050
1051 def test_seek_ns_from_origin(self):
1052 class MySink(bt2._UserSinkComponent):
1053 def __init__(self, config, params, obj):
1054 self._add_input_port('in')
1055
1056 def _user_graph_is_configured(self):
1057 self._msg_iter = self._create_message_iterator(self._input_ports['in'])
1058
1059 def _user_consume(self):
1060 self._msg_iter.seek_ns_from_origin(17)
1061
1062 def _user_seek_ns_from_origin(self, ns_from_origin):
1063 nonlocal actual_ns_from_origin
1064 actual_ns_from_origin = ns_from_origin
1065
1066 graph = _setup_seek_test(
1067 MySink, user_seek_ns_from_origin=_user_seek_ns_from_origin
1068 )
1069
1070 actual_ns_from_origin = None
1071 graph.run_once()
1072 self.assertEqual(actual_ns_from_origin, 17)
1073
1074
1075 if __name__ == '__main__':
1076 unittest.main()
This page took 0.053942 seconds and 4 git commands to generate.