python: run isort on everything
[babeltrace.git] / tests / bindings / python / bt2 / test_message_iterator.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # Copyright (C) 2019 EfficiOS Inc.
4 #
5
6 import sys
7 import unittest
8
9 import bt2
10 from bt2 import port as bt2_port
11 from bt2 import message_iterator as bt2_message_iterator
12 from utils import TestOutputPortMessageIterator
13
14
15 class SimpleSink(bt2._UserSinkComponent):
16 # Straightforward sink that creates one input port (`in`) and consumes from
17 # it.
18
19 def __init__(self, config, params, obj):
20 self._add_input_port("in")
21
22 def _user_consume(self):
23 next(self._msg_iter)
24
25 def _user_graph_is_configured(self):
26 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
27
28
29 def _create_graph(src_comp_cls, sink_comp_cls, flt_comp_cls=None):
30 graph = bt2.Graph()
31
32 src_comp = graph.add_component(src_comp_cls, "src")
33 sink_comp = graph.add_component(sink_comp_cls, "sink")
34
35 if flt_comp_cls is not None:
36 flt_comp = graph.add_component(flt_comp_cls, "flt")
37 graph.connect_ports(src_comp.output_ports["out"], flt_comp.input_ports["in"])
38 graph.connect_ports(flt_comp.output_ports["out"], sink_comp.input_ports["in"])
39 else:
40 graph.connect_ports(src_comp.output_ports["out"], sink_comp.input_ports["in"])
41
42 return graph
43
44
45 class UserMessageIteratorTestCase(unittest.TestCase):
46 def test_init(self):
47 the_output_port_from_source = None
48 the_output_port_from_iter = None
49
50 class MyIter(bt2._UserMessageIterator):
51 def __init__(self, config, self_port_output):
52 nonlocal initialized
53 nonlocal the_output_port_from_iter
54 initialized = True
55 the_output_port_from_iter = self_port_output
56
57 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
58 def __init__(self, config, params, obj):
59 nonlocal the_output_port_from_source
60 the_output_port_from_source = self._add_output_port("out", "user data")
61
62 initialized = False
63 graph = _create_graph(MySource, SimpleSink)
64 graph.run()
65 self.assertTrue(initialized)
66 self.assertEqual(
67 the_output_port_from_source.addr, the_output_port_from_iter.addr
68 )
69 self.assertEqual(the_output_port_from_iter.user_data, "user data")
70
71 def test_create_from_message_iterator(self):
72 class MySourceIter(bt2._UserMessageIterator):
73 def __init__(self, config, self_port_output):
74 nonlocal src_iter_initialized
75 src_iter_initialized = True
76
77 class MySource(bt2._UserSourceComponent, message_iterator_class=MySourceIter):
78 def __init__(self, config, params, obj):
79 self._add_output_port("out")
80
81 class MyFilterIter(bt2._UserMessageIterator):
82 def __init__(self, config, self_port_output):
83 nonlocal flt_iter_initialized
84 flt_iter_initialized = True
85 self._up_iter = self._create_message_iterator(
86 self._component._input_ports["in"]
87 )
88
89 def __next__(self):
90 return next(self._up_iter)
91
92 class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
93 def __init__(self, config, params, obj):
94 self._add_input_port("in")
95 self._add_output_port("out")
96
97 src_iter_initialized = False
98 flt_iter_initialized = False
99 graph = _create_graph(MySource, SimpleSink, MyFilter)
100 graph.run()
101 self.assertTrue(src_iter_initialized)
102 self.assertTrue(flt_iter_initialized)
103
104 # Test that creating a message iterator from a sink component on a
105 # non-connected inport port raises.
106 def test_create_from_sink_component_unconnected_port_raises(self):
107 class MySink(bt2._UserSinkComponent):
108 def __init__(comp_self, config, params, obj):
109 comp_self._input_port = comp_self._add_input_port("in")
110
111 def _user_graph_is_configured(comp_self):
112 with self.assertRaisesRegex(ValueError, "input port is not connected"):
113 comp_self._create_message_iterator(comp_self._input_port)
114
115 nonlocal seen
116 seen = True
117
118 def _user_consume(self):
119 raise bt2.Stop
120
121 seen = False
122 graph = bt2.Graph()
123 graph.add_component(MySink, "snk")
124 graph.run()
125 self.assertTrue(seen)
126
127 # Test that creating a message iterator from a message iteartor on a
128 # non-connected inport port raises.
129 def test_create_from_message_iterator_unconnected_port_raises(self):
130 class MyFilterIter(bt2._UserMessageIterator):
131 def __init__(iter_self, config, port):
132 input_port = iter_self._component._input_ports["in"]
133
134 with self.assertRaisesRegex(ValueError, "input port is not connected"):
135 iter_self._create_message_iterator(input_port)
136
137 nonlocal seen
138 seen = True
139
140 class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
141 def __init__(comp_self, config, params, obj):
142 comp_self._add_input_port("in")
143 comp_self._add_output_port("out")
144
145 class MySink(bt2._UserSinkComponent):
146 def __init__(comp_self, config, params, obj):
147 comp_self._input_port = comp_self._add_input_port("in")
148
149 def _user_graph_is_configured(comp_self):
150 comp_self._input_iter = comp_self._create_message_iterator(
151 comp_self._input_port
152 )
153
154 def _user_consume(self):
155 raise bt2.Stop
156
157 seen = False
158 graph = bt2.Graph()
159 flt = graph.add_component(MyFilter, "flt")
160 snk = graph.add_component(MySink, "snk")
161 graph.connect_ports(flt.output_ports["out"], snk.input_ports["in"])
162 graph.run()
163 self.assertTrue(seen)
164
165 def test_create_user_error(self):
166 # This tests both error handling by
167 # _UserSinkComponent._create_message_iterator
168 # and _UserMessageIterator._create_message_iterator, as they
169 # are both used in the graph.
170 class MySourceIter(bt2._UserMessageIterator):
171 def __init__(self, config, self_port_output):
172 raise ValueError("Very bad error")
173
174 class MySource(bt2._UserSourceComponent, message_iterator_class=MySourceIter):
175 def __init__(self, config, params, obj):
176 self._add_output_port("out")
177
178 class MyFilterIter(bt2._UserMessageIterator):
179 def __init__(self, config, self_port_output):
180 # This is expected to raise because of the error in
181 # MySourceIter.__init__.
182 self._create_message_iterator(self._component._input_ports["in"])
183
184 class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
185 def __init__(self, config, params, obj):
186 self._add_input_port("in")
187 self._add_output_port("out")
188
189 graph = _create_graph(MySource, SimpleSink, MyFilter)
190
191 with self.assertRaises(bt2._Error) as ctx:
192 graph.run()
193
194 exc = ctx.exception
195 cause = exc[0]
196
197 self.assertIsInstance(cause, bt2._MessageIteratorErrorCause)
198 self.assertEqual(cause.component_name, "src")
199 self.assertEqual(cause.component_output_port_name, "out")
200 self.assertIn("ValueError: Very bad error", cause.message)
201
202 def test_finalize(self):
203 class MyIter(bt2._UserMessageIterator):
204 def _user_finalize(self):
205 nonlocal finalized
206 finalized = True
207
208 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
209 def __init__(self, config, params, obj):
210 self._add_output_port("out")
211
212 finalized = False
213 graph = _create_graph(MySource, SimpleSink)
214 graph.run()
215 del graph
216 self.assertTrue(finalized)
217
218 def test_config_parameter(self):
219 class MyIter(bt2._UserMessageIterator):
220 def __init__(self, config, port):
221 nonlocal config_type
222 config_type = type(config)
223
224 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
225 def __init__(self, config, params, obj):
226 self._add_output_port("out")
227
228 config_type = None
229 graph = _create_graph(MySource, SimpleSink)
230 graph.run()
231 self.assertIs(config_type, bt2_message_iterator._MessageIteratorConfiguration)
232
233 def _test_config_can_seek_forward(self, set_can_seek_forward):
234 class MyIter(bt2._UserMessageIterator):
235 def __init__(self, config, port):
236 if set_can_seek_forward:
237 config.can_seek_forward = True
238
239 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
240 def __init__(self, config, params, obj):
241 self._add_output_port("out")
242
243 class MySink(bt2._UserSinkComponent):
244 def __init__(self, config, params, obj):
245 self._add_input_port("in")
246
247 def _user_graph_is_configured(self):
248 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
249
250 def _user_consume(self):
251 nonlocal can_seek_forward
252 can_seek_forward = self._msg_iter.can_seek_forward
253
254 can_seek_forward = None
255 graph = _create_graph(MySource, MySink)
256 graph.run_once()
257 self.assertIs(can_seek_forward, set_can_seek_forward)
258
259 def test_config_can_seek_forward_default(self):
260 self._test_config_can_seek_forward(False)
261
262 def test_config_can_seek_forward(self):
263 self._test_config_can_seek_forward(True)
264
265 def test_config_can_seek_forward_wrong_type(self):
266 class MyIter(bt2._UserMessageIterator):
267 def __init__(self, config, port):
268 config.can_seek_forward = 1
269
270 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
271 def __init__(self, config, params, obj):
272 self._add_output_port("out")
273
274 graph = _create_graph(MySource, SimpleSink)
275 with self.assertRaises(bt2._Error) as ctx:
276 graph.run()
277
278 root_cause = ctx.exception[0]
279 self.assertIn("TypeError: 'int' is not a 'bool' object", root_cause.message)
280
281 def test_component(self):
282 class MyIter(bt2._UserMessageIterator):
283 def __init__(self, config, self_port_output):
284 nonlocal salut
285 salut = self._component._salut
286
287 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
288 def __init__(self, config, params, obj):
289 self._add_output_port("out")
290 self._salut = 23
291
292 salut = None
293 graph = _create_graph(MySource, SimpleSink)
294 graph.run()
295 self.assertEqual(salut, 23)
296
297 def test_port(self):
298 class MyIter(bt2._UserMessageIterator):
299 def __init__(self_iter, config, self_port_output):
300 nonlocal called
301 called = True
302 port = self_iter._port
303 self.assertIs(type(self_port_output), bt2_port._UserComponentOutputPort)
304 self.assertIs(type(port), bt2_port._UserComponentOutputPort)
305 self.assertEqual(self_port_output.addr, port.addr)
306
307 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
308 def __init__(self, config, params, obj):
309 self._add_output_port("out")
310
311 called = False
312 graph = _create_graph(MySource, SimpleSink)
313 graph.run()
314 self.assertTrue(called)
315
316 def test_addr(self):
317 class MyIter(bt2._UserMessageIterator):
318 def __init__(self, config, self_port_output):
319 nonlocal addr
320 addr = self.addr
321
322 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
323 def __init__(self, config, params, obj):
324 self._add_output_port("out")
325
326 addr = None
327 graph = _create_graph(MySource, SimpleSink)
328 graph.run()
329 self.assertIsNotNone(addr)
330 self.assertNotEqual(addr, 0)
331
332 # Test that messages returned by _UserMessageIterator.__next__ remain valid
333 # and can be re-used.
334 def test_reuse_message(self):
335 class MyIter(bt2._UserMessageIterator):
336 def __init__(self, config, port):
337 tc, sc, ec = port.user_data
338 trace = tc()
339 stream = trace.create_stream(sc)
340 packet = stream.create_packet()
341
342 # This message will be returned twice by __next__.
343 event_message = self._create_event_message(ec, packet)
344
345 self._msgs = [
346 self._create_stream_beginning_message(stream),
347 self._create_packet_beginning_message(packet),
348 event_message,
349 event_message,
350 ]
351
352 def __next__(self):
353 return self._msgs.pop(0)
354
355 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
356 def __init__(self, config, params, obj):
357 tc = self._create_trace_class()
358 sc = tc.create_stream_class(supports_packets=True)
359 ec = sc.create_event_class()
360 self._add_output_port("out", (tc, sc, ec))
361
362 graph = bt2.Graph()
363 src = graph.add_component(MySource, "src")
364 it = TestOutputPortMessageIterator(graph, src.output_ports["out"])
365
366 # Skip beginning messages.
367 msg = next(it)
368 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
369 msg = next(it)
370 self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
371
372 msg_ev1 = next(it)
373 msg_ev2 = next(it)
374
375 self.assertIs(type(msg_ev1), bt2._EventMessageConst)
376 self.assertIs(type(msg_ev2), bt2._EventMessageConst)
377 self.assertEqual(msg_ev1.addr, msg_ev2.addr)
378
379 # Try consuming many times from an iterator that always returns TryAgain.
380 # This verifies that we are not missing an incref of Py_None, making the
381 # refcount of Py_None reach 0.
382 def test_try_again_many_times(self):
383 class MyIter(bt2._UserMessageIterator):
384 def __next__(self):
385 raise bt2.TryAgain
386
387 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
388 def __init__(self, config, params, obj):
389 self._add_output_port("out")
390
391 class MyFilterIter(bt2._UserMessageIterator):
392 def __init__(self, port):
393 input_port = port.user_data
394 self._upstream_iter = self._create_message_iterator(input_port)
395
396 def __next__(self):
397 return next(self._upstream_iter)
398
399 def _user_seek_beginning(self):
400 self._upstream_iter.seek_beginning()
401
402 def _user_can_seek_beginning(self):
403 return self._upstream_iter.can_seek_beginning()
404
405 class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
406 def __init__(self, config, params, obj):
407 input_port = self._add_input_port("in")
408 self._add_output_port("out", input_port)
409
410 graph = bt2.Graph()
411 src = graph.add_component(MySource, "src")
412 it = TestOutputPortMessageIterator(graph, src.output_ports["out"])
413
414 # Three times the initial ref count of `None` iterations should
415 # be enough to catch the bug even if there are small differences
416 # between configurations.
417 none_ref_count = sys.getrefcount(None) * 3
418
419 for i in range(none_ref_count):
420 with self.assertRaises(bt2.TryAgain):
421 next(it)
422
423 def test_error_in_iterator_with_cycle_after_having_created_upstream_iterator(self):
424 # Test a failure that triggered an abort in libbabeltrace2, in this situation:
425 #
426 # - The filter iterator creates an upstream iterator.
427 # - The filter iterator creates a reference cycle, including itself.
428 # - An exception is raised, causing the filter iterator's
429 # initialization method to fail.
430 class MySourceIter(bt2._UserMessageIterator):
431 pass
432
433 class MySource(bt2._UserSourceComponent, message_iterator_class=MySourceIter):
434 def __init__(self, config, params, obj):
435 self._add_output_port("out")
436
437 class MyFilterIter(bt2._UserMessageIterator):
438 def __init__(self, config, port):
439 # First, create an upstream iterator.
440 self._upstream_iter = self._create_message_iterator(
441 self._component._input_ports["in"]
442 )
443
444 # Then, voluntarily make a reference cycle that will keep this
445 # Python object alive, which will keep the upstream iterator
446 # Babeltrace object alive.
447 self._self = self
448
449 # Finally, raise an exception to make __init__ fail.
450 raise ValueError("woops")
451
452 class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
453 def __init__(self, config, params, obj):
454 self._in = self._add_input_port("in")
455 self._out = self._add_output_port("out")
456
457 class MySink(bt2._UserSinkComponent):
458 def __init__(self, config, params, obj):
459 self._input_port = self._add_input_port("in")
460
461 def _user_graph_is_configured(self):
462 self._upstream_iter = self._create_message_iterator(self._input_port)
463
464 def _user_consume(self):
465 # We should not reach this.
466 assert False
467
468 g = bt2.Graph()
469 src = g.add_component(MySource, "src")
470 flt = g.add_component(MyFilter, "flt")
471 snk = g.add_component(MySink, "snk")
472 g.connect_ports(src.output_ports["out"], flt.input_ports["in"])
473 g.connect_ports(flt.output_ports["out"], snk.input_ports["in"])
474
475 with self.assertRaisesRegex(bt2._Error, "ValueError: woops"):
476 g.run()
477
478
479 def _setup_seek_test(
480 sink_cls,
481 user_seek_beginning=None,
482 user_can_seek_beginning=None,
483 user_seek_ns_from_origin=None,
484 user_can_seek_ns_from_origin=None,
485 can_seek_forward=False,
486 ):
487 class MySourceIter(bt2._UserMessageIterator):
488 def __init__(self, config, port):
489 tc, sc, ec = port.user_data
490 trace = tc()
491 stream = trace.create_stream(sc)
492 packet = stream.create_packet()
493
494 self._msgs = [
495 self._create_stream_beginning_message(stream),
496 self._create_packet_beginning_message(packet),
497 self._create_event_message(ec, packet),
498 self._create_event_message(ec, packet),
499 self._create_packet_end_message(packet),
500 self._create_stream_end_message(stream),
501 ]
502 self._at = 0
503 config.can_seek_forward = can_seek_forward
504
505 def __next__(self):
506 if self._at < len(self._msgs):
507 msg = self._msgs[self._at]
508 self._at += 1
509 return msg
510 else:
511 raise StopIteration
512
513 if user_seek_beginning is not None:
514 MySourceIter._user_seek_beginning = user_seek_beginning
515
516 if user_can_seek_beginning is not None:
517 MySourceIter._user_can_seek_beginning = user_can_seek_beginning
518
519 if user_seek_ns_from_origin is not None:
520 MySourceIter._user_seek_ns_from_origin = user_seek_ns_from_origin
521
522 if user_can_seek_ns_from_origin is not None:
523 MySourceIter._user_can_seek_ns_from_origin = user_can_seek_ns_from_origin
524
525 class MySource(bt2._UserSourceComponent, message_iterator_class=MySourceIter):
526 def __init__(self, config, params, obj):
527 tc = self._create_trace_class()
528 sc = tc.create_stream_class(supports_packets=True)
529 ec = sc.create_event_class()
530
531 self._add_output_port("out", (tc, sc, ec))
532
533 class MyFilterIter(bt2._UserMessageIterator):
534 def __init__(self, config, port):
535 self._upstream_iter = self._create_message_iterator(
536 self._component._input_ports["in"]
537 )
538 config.can_seek_forward = self._upstream_iter.can_seek_forward
539
540 def __next__(self):
541 return next(self._upstream_iter)
542
543 def _user_can_seek_beginning(self):
544 return self._upstream_iter.can_seek_beginning()
545
546 def _user_seek_beginning(self):
547 self._upstream_iter.seek_beginning()
548
549 def _user_can_seek_ns_from_origin(self, ns_from_origin):
550 return self._upstream_iter.can_seek_ns_from_origin(ns_from_origin)
551
552 def _user_seek_ns_from_origin(self, ns_from_origin):
553 self._upstream_iter.seek_ns_from_origin(ns_from_origin)
554
555 class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
556 def __init__(self, config, params, obj):
557 self._add_input_port("in")
558 self._add_output_port("out")
559
560 return _create_graph(MySource, sink_cls, flt_comp_cls=MyFilter)
561
562
563 class UserMessageIteratorSeekBeginningTestCase(unittest.TestCase):
564 def test_can_seek_beginning_without_seek_beginning(self):
565 with self.assertRaisesRegex(
566 bt2._IncompleteUserClass,
567 "cannot create component class 'MySource': message iterator class implements _user_can_seek_beginning but not _user_seek_beginning",
568 ):
569 _setup_seek_test(SimpleSink, user_can_seek_beginning=lambda: None)
570
571 def test_can_seek_beginning(self):
572 class MySink(bt2._UserSinkComponent):
573 def __init__(self, config, params, obj):
574 self._add_input_port("in")
575
576 def _user_graph_is_configured(self):
577 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
578
579 def _user_consume(self):
580 nonlocal can_seek_beginning
581 can_seek_beginning = self._msg_iter.can_seek_beginning()
582
583 def _user_can_seek_beginning(self):
584 nonlocal input_port_iter_can_seek_beginning
585 return input_port_iter_can_seek_beginning
586
587 graph = _setup_seek_test(
588 MySink,
589 user_can_seek_beginning=_user_can_seek_beginning,
590 user_seek_beginning=lambda: None,
591 )
592
593 input_port_iter_can_seek_beginning = True
594 can_seek_beginning = None
595 graph.run_once()
596 self.assertIs(can_seek_beginning, True)
597
598 input_port_iter_can_seek_beginning = False
599 can_seek_beginning = None
600 graph.run_once()
601 self.assertIs(can_seek_beginning, False)
602
603 def test_no_can_seek_beginning_with_seek_beginning(self):
604 # Test an iterator without a _user_can_seek_beginning method, but with
605 # a _user_seek_beginning method.
606 class MySink(bt2._UserSinkComponent):
607 def __init__(self, config, params, obj):
608 self._add_input_port("in")
609
610 def _user_graph_is_configured(self):
611 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
612
613 def _user_consume(self):
614 nonlocal can_seek_beginning
615 can_seek_beginning = self._msg_iter.can_seek_beginning()
616
617 def _user_seek_beginning(self):
618 pass
619
620 graph = _setup_seek_test(MySink, user_seek_beginning=_user_seek_beginning)
621 can_seek_beginning = None
622 graph.run_once()
623 self.assertIs(can_seek_beginning, True)
624
625 def test_no_can_seek_beginning(self):
626 # Test an iterator without a _user_can_seek_beginning method, without
627 # a _user_seek_beginning method.
628 class MySink(bt2._UserSinkComponent):
629 def __init__(self, config, params, obj):
630 self._add_input_port("in")
631
632 def _user_graph_is_configured(self):
633 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
634
635 def _user_consume(self):
636 nonlocal can_seek_beginning
637 can_seek_beginning = self._msg_iter.can_seek_beginning()
638
639 graph = _setup_seek_test(MySink)
640 can_seek_beginning = None
641 graph.run_once()
642 self.assertIs(can_seek_beginning, False)
643
644 def test_can_seek_beginning_user_error(self):
645 class MySink(bt2._UserSinkComponent):
646 def __init__(self, config, params, obj):
647 self._add_input_port("in")
648
649 def _user_graph_is_configured(self):
650 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
651
652 def _user_consume(self):
653 # This is expected to raise.
654 self._msg_iter.can_seek_beginning()
655
656 def _user_can_seek_beginning(self):
657 raise ValueError("moustiquaire")
658
659 graph = _setup_seek_test(
660 MySink,
661 user_can_seek_beginning=_user_can_seek_beginning,
662 user_seek_beginning=lambda: None,
663 )
664
665 with self.assertRaises(bt2._Error) as ctx:
666 graph.run_once()
667
668 cause = ctx.exception[0]
669 self.assertIn("ValueError: moustiquaire", cause.message)
670
671 def test_can_seek_beginning_wrong_return_value(self):
672 class MySink(bt2._UserSinkComponent):
673 def __init__(self, config, params, obj):
674 self._add_input_port("in")
675
676 def _user_graph_is_configured(self):
677 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
678
679 def _user_consume(self):
680 # This is expected to raise.
681 self._msg_iter.can_seek_beginning()
682
683 def _user_can_seek_beginning(self):
684 return "Amqui"
685
686 graph = _setup_seek_test(
687 MySink,
688 user_can_seek_beginning=_user_can_seek_beginning,
689 user_seek_beginning=lambda: None,
690 )
691
692 with self.assertRaises(bt2._Error) as ctx:
693 graph.run_once()
694
695 cause = ctx.exception[0]
696 self.assertIn("TypeError: 'str' is not a 'bool' object", cause.message)
697
698 def test_seek_beginning(self):
699 class MySink(bt2._UserSinkComponent):
700 def __init__(self, config, params, obj):
701 self._add_input_port("in")
702
703 def _user_graph_is_configured(self):
704 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
705
706 def _user_consume(self):
707 nonlocal do_seek_beginning
708 nonlocal msg
709
710 if do_seek_beginning:
711 self._msg_iter.seek_beginning()
712 return
713
714 msg = next(self._msg_iter)
715
716 def _user_seek_beginning(self):
717 self._at = 0
718
719 msg = None
720 graph = _setup_seek_test(MySink, user_seek_beginning=_user_seek_beginning)
721
722 # Consume message.
723 do_seek_beginning = False
724 graph.run_once()
725 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
726
727 # Consume message.
728 graph.run_once()
729 self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
730
731 # Seek beginning.
732 do_seek_beginning = True
733 graph.run_once()
734
735 # Consume message.
736 do_seek_beginning = False
737 graph.run_once()
738 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
739
740 def test_seek_beginning_user_error(self):
741 class MySink(bt2._UserSinkComponent):
742 def __init__(self, config, params, obj):
743 self._add_input_port("in")
744
745 def _user_graph_is_configured(self):
746 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
747
748 def _user_consume(self):
749 self._msg_iter.seek_beginning()
750
751 def _user_seek_beginning(self):
752 raise ValueError("ouch")
753
754 graph = _setup_seek_test(MySink, user_seek_beginning=_user_seek_beginning)
755
756 with self.assertRaises(bt2._Error):
757 graph.run_once()
758
759
760 class UserMessageIteratorSeekNsFromOriginTestCase(unittest.TestCase):
761 def test_can_seek_ns_from_origin_without_seek_ns_from_origin(self):
762 # Test the case where:
763 #
764 # - can_seek_ns_from_origin: Returns True (don't really care, as long
765 # as it's provided)
766 # - seek_ns_from_origin provided: No
767 # - can the iterator seek beginning: Don't care
768 # - can the iterator seek forward: Don't care
769 for can_seek_ns_from_origin in (False, True):
770 for iter_can_seek_beginning in (False, True):
771 for iter_can_seek_forward in (False, True):
772 with self.assertRaisesRegex(
773 bt2._IncompleteUserClass,
774 "cannot create component class 'MySource': message iterator class implements _user_can_seek_ns_from_origin but not _user_seek_ns_from_origin",
775 ):
776 self._can_seek_ns_from_origin_test(
777 None,
778 user_can_seek_ns_from_origin_ret_val=True,
779 user_seek_ns_from_origin_provided=False,
780 iter_can_seek_beginning=iter_can_seek_beginning,
781 iter_can_seek_forward=iter_can_seek_forward,
782 )
783
784 def test_can_seek_ns_from_origin_returns_true(self):
785 # Test the case where:
786 #
787 # - can_seek_ns_from_origin: returns True
788 # - seek_ns_from_origin provided: Yes
789 # - can the iterator seek beginning: Don't care
790 # - can the iterator seek forward: Don't care
791 #
792 # We expect iter.can_seek_ns_from_origin to return True.
793 for iter_can_seek_beginning in (False, True):
794 for iter_can_seek_forward in (False, True):
795 self._can_seek_ns_from_origin_test(
796 expected_outcome=True,
797 user_can_seek_ns_from_origin_ret_val=True,
798 user_seek_ns_from_origin_provided=True,
799 iter_can_seek_beginning=iter_can_seek_beginning,
800 iter_can_seek_forward=iter_can_seek_forward,
801 )
802
803 def test_can_seek_ns_from_origin_returns_false_can_seek_beginning_forward_seekable(
804 self,
805 ):
806 # Test the case where:
807 #
808 # - can_seek_ns_from_origin: returns False
809 # - seek_ns_from_origin provided: Yes
810 # - can the iterator seek beginning: Yes
811 # - can the iterator seek forward: Yes
812 #
813 # We expect iter.can_seek_ns_from_origin to return True.
814 self._can_seek_ns_from_origin_test(
815 expected_outcome=True,
816 user_can_seek_ns_from_origin_ret_val=False,
817 user_seek_ns_from_origin_provided=True,
818 iter_can_seek_beginning=True,
819 iter_can_seek_forward=True,
820 )
821
822 def test_can_seek_ns_from_origin_returns_false_can_seek_beginning_not_forward_seekable(
823 self,
824 ):
825 # Test the case where:
826 #
827 # - can_seek_ns_from_origin: returns False
828 # - seek_ns_from_origin provided: Yes
829 # - can the iterator seek beginning: Yes
830 # - can the iterator seek forward: No
831 #
832 # We expect iter.can_seek_ns_from_origin to return False.
833 self._can_seek_ns_from_origin_test(
834 expected_outcome=False,
835 user_can_seek_ns_from_origin_ret_val=False,
836 user_seek_ns_from_origin_provided=True,
837 iter_can_seek_beginning=True,
838 iter_can_seek_forward=False,
839 )
840
841 def test_can_seek_ns_from_origin_returns_false_cant_seek_beginning_forward_seekable(
842 self,
843 ):
844 # Test the case where:
845 #
846 # - can_seek_ns_from_origin: returns False
847 # - seek_ns_from_origin provided: Yes
848 # - can the iterator seek beginning: No
849 # - can the iterator seek forward: Yes
850 #
851 # We expect iter.can_seek_ns_from_origin to return False.
852 self._can_seek_ns_from_origin_test(
853 expected_outcome=False,
854 user_can_seek_ns_from_origin_ret_val=False,
855 user_seek_ns_from_origin_provided=True,
856 iter_can_seek_beginning=False,
857 iter_can_seek_forward=True,
858 )
859
860 def test_can_seek_ns_from_origin_returns_false_cant_seek_beginning_not_forward_seekable(
861 self,
862 ):
863 # Test the case where:
864 #
865 # - can_seek_ns_from_origin: returns False
866 # - seek_ns_from_origin provided: Yes
867 # - can the iterator seek beginning: No
868 # - can the iterator seek forward: No
869 #
870 # We expect iter.can_seek_ns_from_origin to return False.
871 self._can_seek_ns_from_origin_test(
872 expected_outcome=False,
873 user_can_seek_ns_from_origin_ret_val=False,
874 user_seek_ns_from_origin_provided=True,
875 iter_can_seek_beginning=False,
876 iter_can_seek_forward=False,
877 )
878
879 def test_no_can_seek_ns_from_origin_seek_ns_from_origin(self):
880 # Test the case where:
881 #
882 # - can_seek_ns_from_origin: Not provided
883 # - seek_ns_from_origin provided: Yes
884 # - can the iterator seek beginning: Don't care
885 # - can the iterator seek forward: Don't care
886 #
887 # We expect iter.can_seek_ns_from_origin to return True.
888 for iter_can_seek_beginning in (False, True):
889 for iter_can_seek_forward in (False, True):
890 self._can_seek_ns_from_origin_test(
891 expected_outcome=True,
892 user_can_seek_ns_from_origin_ret_val=None,
893 user_seek_ns_from_origin_provided=True,
894 iter_can_seek_beginning=iter_can_seek_beginning,
895 iter_can_seek_forward=iter_can_seek_forward,
896 )
897
898 def test_no_can_seek_ns_from_origin_no_seek_ns_from_origin_can_seek_beginning_forward_seekable(
899 self,
900 ):
901 # Test the case where:
902 #
903 # - can_seek_ns_from_origin: Not provided
904 # - seek_ns_from_origin provided: Not provided
905 # - can the iterator seek beginning: Yes
906 # - can the iterator seek forward: Yes
907 #
908 # We expect iter.can_seek_ns_from_origin to return True.
909 self._can_seek_ns_from_origin_test(
910 expected_outcome=True,
911 user_can_seek_ns_from_origin_ret_val=None,
912 user_seek_ns_from_origin_provided=False,
913 iter_can_seek_beginning=True,
914 iter_can_seek_forward=True,
915 )
916
917 def test_no_can_seek_ns_from_origin_no_seek_ns_from_origin_can_seek_beginning_not_forward_seekable(
918 self,
919 ):
920 # Test the case where:
921 #
922 # - can_seek_ns_from_origin: Not provided
923 # - seek_ns_from_origin provided: Not provided
924 # - can the iterator seek beginning: Yes
925 # - can the iterator seek forward: No
926 #
927 # We expect iter.can_seek_ns_from_origin to return False.
928 self._can_seek_ns_from_origin_test(
929 expected_outcome=False,
930 user_can_seek_ns_from_origin_ret_val=None,
931 user_seek_ns_from_origin_provided=False,
932 iter_can_seek_beginning=True,
933 iter_can_seek_forward=False,
934 )
935
936 def test_no_can_seek_ns_from_origin_no_seek_ns_from_origin_cant_seek_beginning_forward_seekable(
937 self,
938 ):
939 # Test the case where:
940 #
941 # - can_seek_ns_from_origin: Not provided
942 # - seek_ns_from_origin provided: Not provided
943 # - can the iterator seek beginning: No
944 # - can the iterator seek forward: Yes
945 #
946 # We expect iter.can_seek_ns_from_origin to return False.
947 self._can_seek_ns_from_origin_test(
948 expected_outcome=False,
949 user_can_seek_ns_from_origin_ret_val=None,
950 user_seek_ns_from_origin_provided=False,
951 iter_can_seek_beginning=False,
952 iter_can_seek_forward=True,
953 )
954
955 def test_no_can_seek_ns_from_origin_no_seek_ns_from_origin_cant_seek_beginning_not_forward_seekable(
956 self,
957 ):
958 # Test the case where:
959 #
960 # - can_seek_ns_from_origin: Not provided
961 # - seek_ns_from_origin provided: Not provided
962 # - can the iterator seek beginning: No
963 # - can the iterator seek forward: No
964 #
965 # We expect iter.can_seek_ns_from_origin to return False.
966 self._can_seek_ns_from_origin_test(
967 expected_outcome=False,
968 user_can_seek_ns_from_origin_ret_val=None,
969 user_seek_ns_from_origin_provided=False,
970 iter_can_seek_beginning=False,
971 iter_can_seek_forward=False,
972 )
973
974 def _can_seek_ns_from_origin_test(
975 self,
976 expected_outcome,
977 user_can_seek_ns_from_origin_ret_val,
978 user_seek_ns_from_origin_provided,
979 iter_can_seek_beginning,
980 iter_can_seek_forward,
981 ):
982 class MySink(bt2._UserSinkComponent):
983 def __init__(self, config, params, obj):
984 self._add_input_port("in")
985
986 def _user_graph_is_configured(self):
987 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
988
989 def _user_consume(self):
990 nonlocal can_seek_ns_from_origin
991 can_seek_ns_from_origin = self._msg_iter.can_seek_ns_from_origin(
992 passed_ns_from_origin
993 )
994
995 if user_can_seek_ns_from_origin_ret_val is not None:
996
997 def user_can_seek_ns_from_origin(self, ns_from_origin):
998 nonlocal received_ns_from_origin
999 received_ns_from_origin = ns_from_origin
1000 return user_can_seek_ns_from_origin_ret_val
1001
1002 else:
1003 user_can_seek_ns_from_origin = None
1004
1005 if user_seek_ns_from_origin_provided:
1006
1007 def user_seek_ns_from_origin(self, ns_from_origin):
1008 pass
1009
1010 else:
1011 user_seek_ns_from_origin = None
1012
1013 if iter_can_seek_beginning:
1014
1015 def user_seek_beginning(self):
1016 pass
1017
1018 else:
1019 user_seek_beginning = None
1020
1021 graph = _setup_seek_test(
1022 MySink,
1023 user_can_seek_ns_from_origin=user_can_seek_ns_from_origin,
1024 user_seek_ns_from_origin=user_seek_ns_from_origin,
1025 user_seek_beginning=user_seek_beginning,
1026 can_seek_forward=iter_can_seek_forward,
1027 )
1028
1029 passed_ns_from_origin = 77
1030 received_ns_from_origin = None
1031 can_seek_ns_from_origin = None
1032 graph.run_once()
1033 self.assertIs(can_seek_ns_from_origin, expected_outcome)
1034
1035 if user_can_seek_ns_from_origin_ret_val is not None:
1036 self.assertEqual(received_ns_from_origin, passed_ns_from_origin)
1037
1038 def test_can_seek_ns_from_origin_user_error(self):
1039 class MySink(bt2._UserSinkComponent):
1040 def __init__(self, config, params, obj):
1041 self._add_input_port("in")
1042
1043 def _user_graph_is_configured(self):
1044 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
1045
1046 def _user_consume(self):
1047 # This is expected to raise.
1048 self._msg_iter.can_seek_ns_from_origin(2)
1049
1050 def _user_can_seek_ns_from_origin(self, ns_from_origin):
1051 raise ValueError("Joutel")
1052
1053 graph = _setup_seek_test(
1054 MySink,
1055 user_can_seek_ns_from_origin=_user_can_seek_ns_from_origin,
1056 user_seek_ns_from_origin=lambda: None,
1057 )
1058
1059 with self.assertRaises(bt2._Error) as ctx:
1060 graph.run_once()
1061
1062 cause = ctx.exception[0]
1063 self.assertIn("ValueError: Joutel", cause.message)
1064
1065 def test_can_seek_ns_from_origin_wrong_return_value(self):
1066 class MySink(bt2._UserSinkComponent):
1067 def __init__(self, config, params, obj):
1068 self._add_input_port("in")
1069
1070 def _user_graph_is_configured(self):
1071 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
1072
1073 def _user_consume(self):
1074 # This is expected to raise.
1075 self._msg_iter.can_seek_ns_from_origin(2)
1076
1077 def _user_can_seek_ns_from_origin(self, ns_from_origin):
1078 return "Nitchequon"
1079
1080 graph = _setup_seek_test(
1081 MySink,
1082 user_can_seek_ns_from_origin=_user_can_seek_ns_from_origin,
1083 user_seek_ns_from_origin=lambda: None,
1084 )
1085
1086 with self.assertRaises(bt2._Error) as ctx:
1087 graph.run_once()
1088
1089 cause = ctx.exception[0]
1090 self.assertIn("TypeError: 'str' is not a 'bool' object", cause.message)
1091
1092 def test_seek_ns_from_origin(self):
1093 class MySink(bt2._UserSinkComponent):
1094 def __init__(self, config, params, obj):
1095 self._add_input_port("in")
1096
1097 def _user_graph_is_configured(self):
1098 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
1099
1100 def _user_consume(self):
1101 self._msg_iter.seek_ns_from_origin(17)
1102
1103 def _user_seek_ns_from_origin(self, ns_from_origin):
1104 nonlocal actual_ns_from_origin
1105 actual_ns_from_origin = ns_from_origin
1106
1107 graph = _setup_seek_test(
1108 MySink, user_seek_ns_from_origin=_user_seek_ns_from_origin
1109 )
1110
1111 actual_ns_from_origin = None
1112 graph.run_once()
1113 self.assertEqual(actual_ns_from_origin, 17)
1114
1115
1116 if __name__ == "__main__":
1117 unittest.main()
This page took 0.084819 seconds and 4 git commands to generate.