tests: remove unused MyFilter in UserMessageIteratorTestCase.test_try_again_many_times
[babeltrace.git] / tests / bindings / python / bt2 / test_message_iterator.py
... / ...
CommitLineData
1# SPDX-License-Identifier: GPL-2.0-only
2#
3# Copyright (C) 2019 EfficiOS Inc.
4#
5
6import sys
7import unittest
8
9import bt2
10from bt2 import port as bt2_port
11from bt2 import message_iterator as bt2_message_iterator
12from utils import TestOutputPortMessageIterator
13
14
15class SimpleSink(bt2._UserSinkComponent):
16 # Straightforward sink that creates one input port (`in`) and consumes from
17 # it.
18
19 def __init__(self, config, params, obj):
20 self._add_input_port("in")
21
22 def _user_consume(self):
23 next(self._msg_iter)
24
25 def _user_graph_is_configured(self):
26 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
27
28
29def _create_graph(src_comp_cls, sink_comp_cls, flt_comp_cls=None):
30 graph = bt2.Graph()
31
32 src_comp = graph.add_component(src_comp_cls, "src")
33 sink_comp = graph.add_component(sink_comp_cls, "sink")
34
35 if flt_comp_cls is not None:
36 flt_comp = graph.add_component(flt_comp_cls, "flt")
37 graph.connect_ports(src_comp.output_ports["out"], flt_comp.input_ports["in"])
38 graph.connect_ports(flt_comp.output_ports["out"], sink_comp.input_ports["in"])
39 else:
40 graph.connect_ports(src_comp.output_ports["out"], sink_comp.input_ports["in"])
41
42 return graph
43
44
45class UserMessageIteratorTestCase(unittest.TestCase):
46 def test_init(self):
47 the_output_port_from_source = None
48 the_output_port_from_iter = None
49
50 class MyIter(bt2._UserMessageIterator):
51 def __init__(self, config, self_port_output):
52 nonlocal initialized
53 nonlocal the_output_port_from_iter
54 initialized = True
55 the_output_port_from_iter = self_port_output
56
57 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
58 def __init__(self, config, params, obj):
59 nonlocal the_output_port_from_source
60 the_output_port_from_source = self._add_output_port("out", "user data")
61
62 initialized = False
63 graph = _create_graph(MySource, SimpleSink)
64 graph.run()
65 self.assertTrue(initialized)
66 self.assertEqual(
67 the_output_port_from_source.addr, the_output_port_from_iter.addr
68 )
69 self.assertEqual(the_output_port_from_iter.user_data, "user data")
70
71 def test_create_from_message_iterator(self):
72 class MySourceIter(bt2._UserMessageIterator):
73 def __init__(self, config, self_port_output):
74 nonlocal src_iter_initialized
75 src_iter_initialized = True
76
77 class MySource(bt2._UserSourceComponent, message_iterator_class=MySourceIter):
78 def __init__(self, config, params, obj):
79 self._add_output_port("out")
80
81 class MyFilterIter(bt2._UserMessageIterator):
82 def __init__(self, config, self_port_output):
83 nonlocal flt_iter_initialized
84 flt_iter_initialized = True
85 self._up_iter = self._create_message_iterator(
86 self._component._input_ports["in"]
87 )
88
89 def __next__(self):
90 return next(self._up_iter)
91
92 class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
93 def __init__(self, config, params, obj):
94 self._add_input_port("in")
95 self._add_output_port("out")
96
97 src_iter_initialized = False
98 flt_iter_initialized = False
99 graph = _create_graph(MySource, SimpleSink, MyFilter)
100 graph.run()
101 self.assertTrue(src_iter_initialized)
102 self.assertTrue(flt_iter_initialized)
103
104 # Test that creating a message iterator from a sink component on a
105 # non-connected inport port raises.
106 def test_create_from_sink_component_unconnected_port_raises(self):
107 class MySink(bt2._UserSinkComponent):
108 def __init__(comp_self, config, params, obj):
109 comp_self._input_port = comp_self._add_input_port("in")
110
111 def _user_graph_is_configured(comp_self):
112 with self.assertRaisesRegex(ValueError, "input port is not connected"):
113 comp_self._create_message_iterator(comp_self._input_port)
114
115 nonlocal seen
116 seen = True
117
118 def _user_consume(self):
119 raise bt2.Stop
120
121 seen = False
122 graph = bt2.Graph()
123 graph.add_component(MySink, "snk")
124 graph.run()
125 self.assertTrue(seen)
126
127 # Test that creating a message iterator from a message iteartor on a
128 # non-connected inport port raises.
129 def test_create_from_message_iterator_unconnected_port_raises(self):
130 class MyFilterIter(bt2._UserMessageIterator):
131 def __init__(iter_self, config, port):
132 input_port = iter_self._component._input_ports["in"]
133
134 with self.assertRaisesRegex(ValueError, "input port is not connected"):
135 iter_self._create_message_iterator(input_port)
136
137 nonlocal seen
138 seen = True
139
140 class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
141 def __init__(comp_self, config, params, obj):
142 comp_self._add_input_port("in")
143 comp_self._add_output_port("out")
144
145 class MySink(bt2._UserSinkComponent):
146 def __init__(comp_self, config, params, obj):
147 comp_self._input_port = comp_self._add_input_port("in")
148
149 def _user_graph_is_configured(comp_self):
150 comp_self._input_iter = comp_self._create_message_iterator(
151 comp_self._input_port
152 )
153
154 def _user_consume(self):
155 raise bt2.Stop
156
157 seen = False
158 graph = bt2.Graph()
159 flt = graph.add_component(MyFilter, "flt")
160 snk = graph.add_component(MySink, "snk")
161 graph.connect_ports(flt.output_ports["out"], snk.input_ports["in"])
162 graph.run()
163 self.assertTrue(seen)
164
165 def test_create_user_error(self):
166 # This tests both error handling by
167 # _UserSinkComponent._create_message_iterator
168 # and _UserMessageIterator._create_message_iterator, as they
169 # are both used in the graph.
170 class MySourceIter(bt2._UserMessageIterator):
171 def __init__(self, config, self_port_output):
172 raise ValueError("Very bad error")
173
174 class MySource(bt2._UserSourceComponent, message_iterator_class=MySourceIter):
175 def __init__(self, config, params, obj):
176 self._add_output_port("out")
177
178 class MyFilterIter(bt2._UserMessageIterator):
179 def __init__(self, config, self_port_output):
180 # This is expected to raise because of the error in
181 # MySourceIter.__init__.
182 self._create_message_iterator(self._component._input_ports["in"])
183
184 class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
185 def __init__(self, config, params, obj):
186 self._add_input_port("in")
187 self._add_output_port("out")
188
189 graph = _create_graph(MySource, SimpleSink, MyFilter)
190
191 with self.assertRaises(bt2._Error) as ctx:
192 graph.run()
193
194 exc = ctx.exception
195 cause = exc[0]
196
197 self.assertIsInstance(cause, bt2._MessageIteratorErrorCause)
198 self.assertEqual(cause.component_name, "src")
199 self.assertEqual(cause.component_output_port_name, "out")
200 self.assertIn("ValueError: Very bad error", cause.message)
201
202 def test_finalize(self):
203 class MyIter(bt2._UserMessageIterator):
204 def _user_finalize(self):
205 nonlocal finalized
206 finalized = True
207
208 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
209 def __init__(self, config, params, obj):
210 self._add_output_port("out")
211
212 finalized = False
213 graph = _create_graph(MySource, SimpleSink)
214 graph.run()
215 del graph
216 self.assertTrue(finalized)
217
218 def test_config_parameter(self):
219 class MyIter(bt2._UserMessageIterator):
220 def __init__(self, config, port):
221 nonlocal config_type
222 config_type = type(config)
223
224 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
225 def __init__(self, config, params, obj):
226 self._add_output_port("out")
227
228 config_type = None
229 graph = _create_graph(MySource, SimpleSink)
230 graph.run()
231 self.assertIs(config_type, bt2_message_iterator._MessageIteratorConfiguration)
232
233 def _test_config_can_seek_forward(self, set_can_seek_forward):
234 class MyIter(bt2._UserMessageIterator):
235 def __init__(self, config, port):
236 if set_can_seek_forward:
237 config.can_seek_forward = True
238
239 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
240 def __init__(self, config, params, obj):
241 self._add_output_port("out")
242
243 class MySink(bt2._UserSinkComponent):
244 def __init__(self, config, params, obj):
245 self._add_input_port("in")
246
247 def _user_graph_is_configured(self):
248 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
249
250 def _user_consume(self):
251 nonlocal can_seek_forward
252 can_seek_forward = self._msg_iter.can_seek_forward
253
254 can_seek_forward = None
255 graph = _create_graph(MySource, MySink)
256 graph.run_once()
257 self.assertIs(can_seek_forward, set_can_seek_forward)
258
259 def test_config_can_seek_forward_default(self):
260 self._test_config_can_seek_forward(False)
261
262 def test_config_can_seek_forward(self):
263 self._test_config_can_seek_forward(True)
264
265 def test_config_can_seek_forward_wrong_type(self):
266 class MyIter(bt2._UserMessageIterator):
267 def __init__(self, config, port):
268 config.can_seek_forward = 1
269
270 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
271 def __init__(self, config, params, obj):
272 self._add_output_port("out")
273
274 graph = _create_graph(MySource, SimpleSink)
275 with self.assertRaises(bt2._Error) as ctx:
276 graph.run()
277
278 root_cause = ctx.exception[0]
279 self.assertIn("TypeError: 'int' is not a 'bool' object", root_cause.message)
280
281 def test_component(self):
282 class MyIter(bt2._UserMessageIterator):
283 def __init__(self, config, self_port_output):
284 nonlocal salut
285 salut = self._component._salut
286
287 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
288 def __init__(self, config, params, obj):
289 self._add_output_port("out")
290 self._salut = 23
291
292 salut = None
293 graph = _create_graph(MySource, SimpleSink)
294 graph.run()
295 self.assertEqual(salut, 23)
296
297 def test_port(self):
298 class MyIter(bt2._UserMessageIterator):
299 def __init__(self_iter, config, self_port_output):
300 nonlocal called
301 called = True
302 port = self_iter._port
303 self.assertIs(type(self_port_output), bt2_port._UserComponentOutputPort)
304 self.assertIs(type(port), bt2_port._UserComponentOutputPort)
305 self.assertEqual(self_port_output.addr, port.addr)
306
307 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
308 def __init__(self, config, params, obj):
309 self._add_output_port("out")
310
311 called = False
312 graph = _create_graph(MySource, SimpleSink)
313 graph.run()
314 self.assertTrue(called)
315
316 def test_addr(self):
317 class MyIter(bt2._UserMessageIterator):
318 def __init__(self, config, self_port_output):
319 nonlocal addr
320 addr = self.addr
321
322 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
323 def __init__(self, config, params, obj):
324 self._add_output_port("out")
325
326 addr = None
327 graph = _create_graph(MySource, SimpleSink)
328 graph.run()
329 self.assertIsNotNone(addr)
330 self.assertNotEqual(addr, 0)
331
332 # Test that messages returned by _UserMessageIterator.__next__ remain valid
333 # and can be re-used.
334 def test_reuse_message(self):
335 class MyIter(bt2._UserMessageIterator):
336 def __init__(self, config, port):
337 tc, sc, ec = port.user_data
338 trace = tc()
339 stream = trace.create_stream(sc)
340 packet = stream.create_packet()
341
342 # This message will be returned twice by __next__.
343 event_message = self._create_event_message(ec, packet)
344
345 self._msgs = [
346 self._create_stream_beginning_message(stream),
347 self._create_packet_beginning_message(packet),
348 event_message,
349 event_message,
350 ]
351
352 def __next__(self):
353 return self._msgs.pop(0)
354
355 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
356 def __init__(self, config, params, obj):
357 tc = self._create_trace_class()
358 sc = tc.create_stream_class(supports_packets=True)
359 ec = sc.create_event_class()
360 self._add_output_port("out", (tc, sc, ec))
361
362 graph = bt2.Graph()
363 src = graph.add_component(MySource, "src")
364 it = TestOutputPortMessageIterator(graph, src.output_ports["out"])
365
366 # Skip beginning messages.
367 msg = next(it)
368 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
369 msg = next(it)
370 self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
371
372 msg_ev1 = next(it)
373 msg_ev2 = next(it)
374
375 self.assertIs(type(msg_ev1), bt2._EventMessageConst)
376 self.assertIs(type(msg_ev2), bt2._EventMessageConst)
377 self.assertEqual(msg_ev1.addr, msg_ev2.addr)
378
379 # Try consuming many times from an iterator that always returns TryAgain.
380 # This verifies that we are not missing an incref of Py_None, making the
381 # refcount of Py_None reach 0.
382 def test_try_again_many_times(self):
383 class MyIter(bt2._UserMessageIterator):
384 def __next__(self):
385 raise bt2.TryAgain
386
387 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
388 def __init__(self, config, params, obj):
389 self._add_output_port("out")
390
391 graph = bt2.Graph()
392 src = graph.add_component(MySource, "src")
393 it = TestOutputPortMessageIterator(graph, src.output_ports["out"])
394
395 # Three times the initial ref count of `None` iterations should
396 # be enough to catch the bug even if there are small differences
397 # between configurations.
398 none_ref_count = sys.getrefcount(None) * 3
399
400 for i in range(none_ref_count):
401 with self.assertRaises(bt2.TryAgain):
402 next(it)
403
404 def test_error_in_iterator_with_cycle_after_having_created_upstream_iterator(self):
405 # Test a failure that triggered an abort in libbabeltrace2, in this situation:
406 #
407 # - The filter iterator creates an upstream iterator.
408 # - The filter iterator creates a reference cycle, including itself.
409 # - An exception is raised, causing the filter iterator's
410 # initialization method to fail.
411 class MySourceIter(bt2._UserMessageIterator):
412 pass
413
414 class MySource(bt2._UserSourceComponent, message_iterator_class=MySourceIter):
415 def __init__(self, config, params, obj):
416 self._add_output_port("out")
417
418 class MyFilterIter(bt2._UserMessageIterator):
419 def __init__(self, config, port):
420 # First, create an upstream iterator.
421 self._upstream_iter = self._create_message_iterator(
422 self._component._input_ports["in"]
423 )
424
425 # Then, voluntarily make a reference cycle that will keep this
426 # Python object alive, which will keep the upstream iterator
427 # Babeltrace object alive.
428 self._self = self
429
430 # Finally, raise an exception to make __init__ fail.
431 raise ValueError("woops")
432
433 class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
434 def __init__(self, config, params, obj):
435 self._in = self._add_input_port("in")
436 self._out = self._add_output_port("out")
437
438 class MySink(bt2._UserSinkComponent):
439 def __init__(self, config, params, obj):
440 self._input_port = self._add_input_port("in")
441
442 def _user_graph_is_configured(self):
443 self._upstream_iter = self._create_message_iterator(self._input_port)
444
445 def _user_consume(self):
446 # We should not reach this.
447 assert False
448
449 g = bt2.Graph()
450 src = g.add_component(MySource, "src")
451 flt = g.add_component(MyFilter, "flt")
452 snk = g.add_component(MySink, "snk")
453 g.connect_ports(src.output_ports["out"], flt.input_ports["in"])
454 g.connect_ports(flt.output_ports["out"], snk.input_ports["in"])
455
456 with self.assertRaisesRegex(bt2._Error, "ValueError: woops"):
457 g.run()
458
459
460def _setup_seek_test(
461 sink_cls,
462 user_seek_beginning=None,
463 user_can_seek_beginning=None,
464 user_seek_ns_from_origin=None,
465 user_can_seek_ns_from_origin=None,
466 can_seek_forward=False,
467):
468 class MySourceIter(bt2._UserMessageIterator):
469 def __init__(self, config, port):
470 tc, sc, ec = port.user_data
471 trace = tc()
472 stream = trace.create_stream(sc)
473 packet = stream.create_packet()
474
475 self._msgs = [
476 self._create_stream_beginning_message(stream),
477 self._create_packet_beginning_message(packet),
478 self._create_event_message(ec, packet),
479 self._create_event_message(ec, packet),
480 self._create_packet_end_message(packet),
481 self._create_stream_end_message(stream),
482 ]
483 self._at = 0
484 config.can_seek_forward = can_seek_forward
485
486 def __next__(self):
487 if self._at < len(self._msgs):
488 msg = self._msgs[self._at]
489 self._at += 1
490 return msg
491 else:
492 raise StopIteration
493
494 if user_seek_beginning is not None:
495 MySourceIter._user_seek_beginning = user_seek_beginning
496
497 if user_can_seek_beginning is not None:
498 MySourceIter._user_can_seek_beginning = user_can_seek_beginning
499
500 if user_seek_ns_from_origin is not None:
501 MySourceIter._user_seek_ns_from_origin = user_seek_ns_from_origin
502
503 if user_can_seek_ns_from_origin is not None:
504 MySourceIter._user_can_seek_ns_from_origin = user_can_seek_ns_from_origin
505
506 class MySource(bt2._UserSourceComponent, message_iterator_class=MySourceIter):
507 def __init__(self, config, params, obj):
508 tc = self._create_trace_class()
509 sc = tc.create_stream_class(supports_packets=True)
510 ec = sc.create_event_class()
511
512 self._add_output_port("out", (tc, sc, ec))
513
514 class MyFilterIter(bt2._UserMessageIterator):
515 def __init__(self, config, port):
516 self._upstream_iter = self._create_message_iterator(
517 self._component._input_ports["in"]
518 )
519 config.can_seek_forward = self._upstream_iter.can_seek_forward
520
521 def __next__(self):
522 return next(self._upstream_iter)
523
524 def _user_can_seek_beginning(self):
525 return self._upstream_iter.can_seek_beginning()
526
527 def _user_seek_beginning(self):
528 self._upstream_iter.seek_beginning()
529
530 def _user_can_seek_ns_from_origin(self, ns_from_origin):
531 return self._upstream_iter.can_seek_ns_from_origin(ns_from_origin)
532
533 def _user_seek_ns_from_origin(self, ns_from_origin):
534 self._upstream_iter.seek_ns_from_origin(ns_from_origin)
535
536 class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
537 def __init__(self, config, params, obj):
538 self._add_input_port("in")
539 self._add_output_port("out")
540
541 return _create_graph(MySource, sink_cls, flt_comp_cls=MyFilter)
542
543
544class UserMessageIteratorSeekBeginningTestCase(unittest.TestCase):
545 def test_can_seek_beginning_without_seek_beginning(self):
546 with self.assertRaisesRegex(
547 bt2._IncompleteUserClass,
548 "cannot create component class 'MySource': message iterator class implements _user_can_seek_beginning but not _user_seek_beginning",
549 ):
550 _setup_seek_test(SimpleSink, user_can_seek_beginning=lambda: None)
551
552 def test_can_seek_beginning(self):
553 class MySink(bt2._UserSinkComponent):
554 def __init__(self, config, params, obj):
555 self._add_input_port("in")
556
557 def _user_graph_is_configured(self):
558 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
559
560 def _user_consume(self):
561 nonlocal can_seek_beginning
562 can_seek_beginning = self._msg_iter.can_seek_beginning()
563
564 def _user_can_seek_beginning(self):
565 nonlocal input_port_iter_can_seek_beginning
566 return input_port_iter_can_seek_beginning
567
568 graph = _setup_seek_test(
569 MySink,
570 user_can_seek_beginning=_user_can_seek_beginning,
571 user_seek_beginning=lambda: None,
572 )
573
574 input_port_iter_can_seek_beginning = True
575 can_seek_beginning = None
576 graph.run_once()
577 self.assertIs(can_seek_beginning, True)
578
579 input_port_iter_can_seek_beginning = False
580 can_seek_beginning = None
581 graph.run_once()
582 self.assertIs(can_seek_beginning, False)
583
584 def test_no_can_seek_beginning_with_seek_beginning(self):
585 # Test an iterator without a _user_can_seek_beginning method, but with
586 # a _user_seek_beginning method.
587 class MySink(bt2._UserSinkComponent):
588 def __init__(self, config, params, obj):
589 self._add_input_port("in")
590
591 def _user_graph_is_configured(self):
592 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
593
594 def _user_consume(self):
595 nonlocal can_seek_beginning
596 can_seek_beginning = self._msg_iter.can_seek_beginning()
597
598 def _user_seek_beginning(self):
599 pass
600
601 graph = _setup_seek_test(MySink, user_seek_beginning=_user_seek_beginning)
602 can_seek_beginning = None
603 graph.run_once()
604 self.assertIs(can_seek_beginning, True)
605
606 def test_no_can_seek_beginning(self):
607 # Test an iterator without a _user_can_seek_beginning method, without
608 # a _user_seek_beginning method.
609 class MySink(bt2._UserSinkComponent):
610 def __init__(self, config, params, obj):
611 self._add_input_port("in")
612
613 def _user_graph_is_configured(self):
614 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
615
616 def _user_consume(self):
617 nonlocal can_seek_beginning
618 can_seek_beginning = self._msg_iter.can_seek_beginning()
619
620 graph = _setup_seek_test(MySink)
621 can_seek_beginning = None
622 graph.run_once()
623 self.assertIs(can_seek_beginning, False)
624
625 def test_can_seek_beginning_user_error(self):
626 class MySink(bt2._UserSinkComponent):
627 def __init__(self, config, params, obj):
628 self._add_input_port("in")
629
630 def _user_graph_is_configured(self):
631 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
632
633 def _user_consume(self):
634 # This is expected to raise.
635 self._msg_iter.can_seek_beginning()
636
637 def _user_can_seek_beginning(self):
638 raise ValueError("moustiquaire")
639
640 graph = _setup_seek_test(
641 MySink,
642 user_can_seek_beginning=_user_can_seek_beginning,
643 user_seek_beginning=lambda: None,
644 )
645
646 with self.assertRaises(bt2._Error) as ctx:
647 graph.run_once()
648
649 cause = ctx.exception[0]
650 self.assertIn("ValueError: moustiquaire", cause.message)
651
652 def test_can_seek_beginning_wrong_return_value(self):
653 class MySink(bt2._UserSinkComponent):
654 def __init__(self, config, params, obj):
655 self._add_input_port("in")
656
657 def _user_graph_is_configured(self):
658 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
659
660 def _user_consume(self):
661 # This is expected to raise.
662 self._msg_iter.can_seek_beginning()
663
664 def _user_can_seek_beginning(self):
665 return "Amqui"
666
667 graph = _setup_seek_test(
668 MySink,
669 user_can_seek_beginning=_user_can_seek_beginning,
670 user_seek_beginning=lambda: None,
671 )
672
673 with self.assertRaises(bt2._Error) as ctx:
674 graph.run_once()
675
676 cause = ctx.exception[0]
677 self.assertIn("TypeError: 'str' is not a 'bool' object", cause.message)
678
679 def test_seek_beginning(self):
680 class MySink(bt2._UserSinkComponent):
681 def __init__(self, config, params, obj):
682 self._add_input_port("in")
683
684 def _user_graph_is_configured(self):
685 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
686
687 def _user_consume(self):
688 nonlocal do_seek_beginning
689 nonlocal msg
690
691 if do_seek_beginning:
692 self._msg_iter.seek_beginning()
693 return
694
695 msg = next(self._msg_iter)
696
697 def _user_seek_beginning(self):
698 self._at = 0
699
700 msg = None
701 graph = _setup_seek_test(MySink, user_seek_beginning=_user_seek_beginning)
702
703 # Consume message.
704 do_seek_beginning = False
705 graph.run_once()
706 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
707
708 # Consume message.
709 graph.run_once()
710 self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
711
712 # Seek beginning.
713 do_seek_beginning = True
714 graph.run_once()
715
716 # Consume message.
717 do_seek_beginning = False
718 graph.run_once()
719 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
720
721 def test_seek_beginning_user_error(self):
722 class MySink(bt2._UserSinkComponent):
723 def __init__(self, config, params, obj):
724 self._add_input_port("in")
725
726 def _user_graph_is_configured(self):
727 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
728
729 def _user_consume(self):
730 self._msg_iter.seek_beginning()
731
732 def _user_seek_beginning(self):
733 raise ValueError("ouch")
734
735 graph = _setup_seek_test(MySink, user_seek_beginning=_user_seek_beginning)
736
737 with self.assertRaises(bt2._Error):
738 graph.run_once()
739
740
741class UserMessageIteratorSeekNsFromOriginTestCase(unittest.TestCase):
742 def test_can_seek_ns_from_origin_without_seek_ns_from_origin(self):
743 # Test the case where:
744 #
745 # - can_seek_ns_from_origin: Returns True (don't really care, as long
746 # as it's provided)
747 # - seek_ns_from_origin provided: No
748 # - can the iterator seek beginning: Don't care
749 # - can the iterator seek forward: Don't care
750 for can_seek_ns_from_origin in (False, True):
751 for iter_can_seek_beginning in (False, True):
752 for iter_can_seek_forward in (False, True):
753 with self.assertRaisesRegex(
754 bt2._IncompleteUserClass,
755 "cannot create component class 'MySource': message iterator class implements _user_can_seek_ns_from_origin but not _user_seek_ns_from_origin",
756 ):
757 self._can_seek_ns_from_origin_test(
758 None,
759 user_can_seek_ns_from_origin_ret_val=True,
760 user_seek_ns_from_origin_provided=False,
761 iter_can_seek_beginning=iter_can_seek_beginning,
762 iter_can_seek_forward=iter_can_seek_forward,
763 )
764
765 def test_can_seek_ns_from_origin_returns_true(self):
766 # Test the case where:
767 #
768 # - can_seek_ns_from_origin: returns True
769 # - seek_ns_from_origin provided: Yes
770 # - can the iterator seek beginning: Don't care
771 # - can the iterator seek forward: Don't care
772 #
773 # We expect iter.can_seek_ns_from_origin to return True.
774 for iter_can_seek_beginning in (False, True):
775 for iter_can_seek_forward in (False, True):
776 self._can_seek_ns_from_origin_test(
777 expected_outcome=True,
778 user_can_seek_ns_from_origin_ret_val=True,
779 user_seek_ns_from_origin_provided=True,
780 iter_can_seek_beginning=iter_can_seek_beginning,
781 iter_can_seek_forward=iter_can_seek_forward,
782 )
783
784 def test_can_seek_ns_from_origin_returns_false_can_seek_beginning_forward_seekable(
785 self,
786 ):
787 # Test the case where:
788 #
789 # - can_seek_ns_from_origin: returns False
790 # - seek_ns_from_origin provided: Yes
791 # - can the iterator seek beginning: Yes
792 # - can the iterator seek forward: Yes
793 #
794 # We expect iter.can_seek_ns_from_origin to return True.
795 self._can_seek_ns_from_origin_test(
796 expected_outcome=True,
797 user_can_seek_ns_from_origin_ret_val=False,
798 user_seek_ns_from_origin_provided=True,
799 iter_can_seek_beginning=True,
800 iter_can_seek_forward=True,
801 )
802
803 def test_can_seek_ns_from_origin_returns_false_can_seek_beginning_not_forward_seekable(
804 self,
805 ):
806 # Test the case where:
807 #
808 # - can_seek_ns_from_origin: returns False
809 # - seek_ns_from_origin provided: Yes
810 # - can the iterator seek beginning: Yes
811 # - can the iterator seek forward: No
812 #
813 # We expect iter.can_seek_ns_from_origin to return False.
814 self._can_seek_ns_from_origin_test(
815 expected_outcome=False,
816 user_can_seek_ns_from_origin_ret_val=False,
817 user_seek_ns_from_origin_provided=True,
818 iter_can_seek_beginning=True,
819 iter_can_seek_forward=False,
820 )
821
822 def test_can_seek_ns_from_origin_returns_false_cant_seek_beginning_forward_seekable(
823 self,
824 ):
825 # Test the case where:
826 #
827 # - can_seek_ns_from_origin: returns False
828 # - seek_ns_from_origin provided: Yes
829 # - can the iterator seek beginning: No
830 # - can the iterator seek forward: Yes
831 #
832 # We expect iter.can_seek_ns_from_origin to return False.
833 self._can_seek_ns_from_origin_test(
834 expected_outcome=False,
835 user_can_seek_ns_from_origin_ret_val=False,
836 user_seek_ns_from_origin_provided=True,
837 iter_can_seek_beginning=False,
838 iter_can_seek_forward=True,
839 )
840
841 def test_can_seek_ns_from_origin_returns_false_cant_seek_beginning_not_forward_seekable(
842 self,
843 ):
844 # Test the case where:
845 #
846 # - can_seek_ns_from_origin: returns False
847 # - seek_ns_from_origin provided: Yes
848 # - can the iterator seek beginning: No
849 # - can the iterator seek forward: No
850 #
851 # We expect iter.can_seek_ns_from_origin to return False.
852 self._can_seek_ns_from_origin_test(
853 expected_outcome=False,
854 user_can_seek_ns_from_origin_ret_val=False,
855 user_seek_ns_from_origin_provided=True,
856 iter_can_seek_beginning=False,
857 iter_can_seek_forward=False,
858 )
859
860 def test_no_can_seek_ns_from_origin_seek_ns_from_origin(self):
861 # Test the case where:
862 #
863 # - can_seek_ns_from_origin: Not provided
864 # - seek_ns_from_origin provided: Yes
865 # - can the iterator seek beginning: Don't care
866 # - can the iterator seek forward: Don't care
867 #
868 # We expect iter.can_seek_ns_from_origin to return True.
869 for iter_can_seek_beginning in (False, True):
870 for iter_can_seek_forward in (False, True):
871 self._can_seek_ns_from_origin_test(
872 expected_outcome=True,
873 user_can_seek_ns_from_origin_ret_val=None,
874 user_seek_ns_from_origin_provided=True,
875 iter_can_seek_beginning=iter_can_seek_beginning,
876 iter_can_seek_forward=iter_can_seek_forward,
877 )
878
879 def test_no_can_seek_ns_from_origin_no_seek_ns_from_origin_can_seek_beginning_forward_seekable(
880 self,
881 ):
882 # Test the case where:
883 #
884 # - can_seek_ns_from_origin: Not provided
885 # - seek_ns_from_origin provided: Not provided
886 # - can the iterator seek beginning: Yes
887 # - can the iterator seek forward: Yes
888 #
889 # We expect iter.can_seek_ns_from_origin to return True.
890 self._can_seek_ns_from_origin_test(
891 expected_outcome=True,
892 user_can_seek_ns_from_origin_ret_val=None,
893 user_seek_ns_from_origin_provided=False,
894 iter_can_seek_beginning=True,
895 iter_can_seek_forward=True,
896 )
897
898 def test_no_can_seek_ns_from_origin_no_seek_ns_from_origin_can_seek_beginning_not_forward_seekable(
899 self,
900 ):
901 # Test the case where:
902 #
903 # - can_seek_ns_from_origin: Not provided
904 # - seek_ns_from_origin provided: Not provided
905 # - can the iterator seek beginning: Yes
906 # - can the iterator seek forward: No
907 #
908 # We expect iter.can_seek_ns_from_origin to return False.
909 self._can_seek_ns_from_origin_test(
910 expected_outcome=False,
911 user_can_seek_ns_from_origin_ret_val=None,
912 user_seek_ns_from_origin_provided=False,
913 iter_can_seek_beginning=True,
914 iter_can_seek_forward=False,
915 )
916
917 def test_no_can_seek_ns_from_origin_no_seek_ns_from_origin_cant_seek_beginning_forward_seekable(
918 self,
919 ):
920 # Test the case where:
921 #
922 # - can_seek_ns_from_origin: Not provided
923 # - seek_ns_from_origin provided: Not provided
924 # - can the iterator seek beginning: No
925 # - can the iterator seek forward: Yes
926 #
927 # We expect iter.can_seek_ns_from_origin to return False.
928 self._can_seek_ns_from_origin_test(
929 expected_outcome=False,
930 user_can_seek_ns_from_origin_ret_val=None,
931 user_seek_ns_from_origin_provided=False,
932 iter_can_seek_beginning=False,
933 iter_can_seek_forward=True,
934 )
935
936 def test_no_can_seek_ns_from_origin_no_seek_ns_from_origin_cant_seek_beginning_not_forward_seekable(
937 self,
938 ):
939 # Test the case where:
940 #
941 # - can_seek_ns_from_origin: Not provided
942 # - seek_ns_from_origin provided: Not provided
943 # - can the iterator seek beginning: No
944 # - can the iterator seek forward: No
945 #
946 # We expect iter.can_seek_ns_from_origin to return False.
947 self._can_seek_ns_from_origin_test(
948 expected_outcome=False,
949 user_can_seek_ns_from_origin_ret_val=None,
950 user_seek_ns_from_origin_provided=False,
951 iter_can_seek_beginning=False,
952 iter_can_seek_forward=False,
953 )
954
955 def _can_seek_ns_from_origin_test(
956 self,
957 expected_outcome,
958 user_can_seek_ns_from_origin_ret_val,
959 user_seek_ns_from_origin_provided,
960 iter_can_seek_beginning,
961 iter_can_seek_forward,
962 ):
963 class MySink(bt2._UserSinkComponent):
964 def __init__(self, config, params, obj):
965 self._add_input_port("in")
966
967 def _user_graph_is_configured(self):
968 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
969
970 def _user_consume(self):
971 nonlocal can_seek_ns_from_origin
972 can_seek_ns_from_origin = self._msg_iter.can_seek_ns_from_origin(
973 passed_ns_from_origin
974 )
975
976 if user_can_seek_ns_from_origin_ret_val is not None:
977
978 def user_can_seek_ns_from_origin(self, ns_from_origin):
979 nonlocal received_ns_from_origin
980 received_ns_from_origin = ns_from_origin
981 return user_can_seek_ns_from_origin_ret_val
982
983 else:
984 user_can_seek_ns_from_origin = None
985
986 if user_seek_ns_from_origin_provided:
987
988 def user_seek_ns_from_origin(self, ns_from_origin):
989 pass
990
991 else:
992 user_seek_ns_from_origin = None
993
994 if iter_can_seek_beginning:
995
996 def user_seek_beginning(self):
997 pass
998
999 else:
1000 user_seek_beginning = None
1001
1002 graph = _setup_seek_test(
1003 MySink,
1004 user_can_seek_ns_from_origin=user_can_seek_ns_from_origin,
1005 user_seek_ns_from_origin=user_seek_ns_from_origin,
1006 user_seek_beginning=user_seek_beginning,
1007 can_seek_forward=iter_can_seek_forward,
1008 )
1009
1010 passed_ns_from_origin = 77
1011 received_ns_from_origin = None
1012 can_seek_ns_from_origin = None
1013 graph.run_once()
1014 self.assertIs(can_seek_ns_from_origin, expected_outcome)
1015
1016 if user_can_seek_ns_from_origin_ret_val is not None:
1017 self.assertEqual(received_ns_from_origin, passed_ns_from_origin)
1018
1019 def test_can_seek_ns_from_origin_user_error(self):
1020 class MySink(bt2._UserSinkComponent):
1021 def __init__(self, config, params, obj):
1022 self._add_input_port("in")
1023
1024 def _user_graph_is_configured(self):
1025 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
1026
1027 def _user_consume(self):
1028 # This is expected to raise.
1029 self._msg_iter.can_seek_ns_from_origin(2)
1030
1031 def _user_can_seek_ns_from_origin(self, ns_from_origin):
1032 raise ValueError("Joutel")
1033
1034 graph = _setup_seek_test(
1035 MySink,
1036 user_can_seek_ns_from_origin=_user_can_seek_ns_from_origin,
1037 user_seek_ns_from_origin=lambda: None,
1038 )
1039
1040 with self.assertRaises(bt2._Error) as ctx:
1041 graph.run_once()
1042
1043 cause = ctx.exception[0]
1044 self.assertIn("ValueError: Joutel", cause.message)
1045
1046 def test_can_seek_ns_from_origin_wrong_return_value(self):
1047 class MySink(bt2._UserSinkComponent):
1048 def __init__(self, config, params, obj):
1049 self._add_input_port("in")
1050
1051 def _user_graph_is_configured(self):
1052 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
1053
1054 def _user_consume(self):
1055 # This is expected to raise.
1056 self._msg_iter.can_seek_ns_from_origin(2)
1057
1058 def _user_can_seek_ns_from_origin(self, ns_from_origin):
1059 return "Nitchequon"
1060
1061 graph = _setup_seek_test(
1062 MySink,
1063 user_can_seek_ns_from_origin=_user_can_seek_ns_from_origin,
1064 user_seek_ns_from_origin=lambda: None,
1065 )
1066
1067 with self.assertRaises(bt2._Error) as ctx:
1068 graph.run_once()
1069
1070 cause = ctx.exception[0]
1071 self.assertIn("TypeError: 'str' is not a 'bool' object", cause.message)
1072
1073 def test_seek_ns_from_origin(self):
1074 class MySink(bt2._UserSinkComponent):
1075 def __init__(self, config, params, obj):
1076 self._add_input_port("in")
1077
1078 def _user_graph_is_configured(self):
1079 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
1080
1081 def _user_consume(self):
1082 self._msg_iter.seek_ns_from_origin(17)
1083
1084 def _user_seek_ns_from_origin(self, ns_from_origin):
1085 nonlocal actual_ns_from_origin
1086 actual_ns_from_origin = ns_from_origin
1087
1088 graph = _setup_seek_test(
1089 MySink, user_seek_ns_from_origin=_user_seek_ns_from_origin
1090 )
1091
1092 actual_ns_from_origin = None
1093 graph.run_once()
1094 self.assertEqual(actual_ns_from_origin, 17)
1095
1096
1097if __name__ == "__main__":
1098 unittest.main()
This page took 0.034345 seconds and 4 git commands to generate.