Remove `skip-string-normalization` in Python formatter config
[babeltrace.git] / tests / bindings / python / bt2 / test_message_iterator.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # Copyright (C) 2019 EfficiOS Inc.
4 #
5
6 import unittest
7 import bt2
8 import sys
9 from utils import TestOutputPortMessageIterator
10 from bt2 import port as bt2_port
11 from bt2 import message_iterator as bt2_message_iterator
12
13
14 class SimpleSink(bt2._UserSinkComponent):
15 # Straightforward sink that creates one input port (`in`) and consumes from
16 # it.
17
18 def __init__(self, config, params, obj):
19 self._add_input_port("in")
20
21 def _user_consume(self):
22 next(self._msg_iter)
23
24 def _user_graph_is_configured(self):
25 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
26
27
28 def _create_graph(src_comp_cls, sink_comp_cls, flt_comp_cls=None):
29 graph = bt2.Graph()
30
31 src_comp = graph.add_component(src_comp_cls, "src")
32 sink_comp = graph.add_component(sink_comp_cls, "sink")
33
34 if flt_comp_cls is not None:
35 flt_comp = graph.add_component(flt_comp_cls, "flt")
36 graph.connect_ports(src_comp.output_ports["out"], flt_comp.input_ports["in"])
37 graph.connect_ports(flt_comp.output_ports["out"], sink_comp.input_ports["in"])
38 else:
39 graph.connect_ports(src_comp.output_ports["out"], sink_comp.input_ports["in"])
40
41 return graph
42
43
44 class UserMessageIteratorTestCase(unittest.TestCase):
45 def test_init(self):
46 the_output_port_from_source = None
47 the_output_port_from_iter = None
48
49 class MyIter(bt2._UserMessageIterator):
50 def __init__(self, config, self_port_output):
51 nonlocal initialized
52 nonlocal the_output_port_from_iter
53 initialized = True
54 the_output_port_from_iter = self_port_output
55
56 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
57 def __init__(self, config, params, obj):
58 nonlocal the_output_port_from_source
59 the_output_port_from_source = self._add_output_port("out", "user data")
60
61 initialized = False
62 graph = _create_graph(MySource, SimpleSink)
63 graph.run()
64 self.assertTrue(initialized)
65 self.assertEqual(
66 the_output_port_from_source.addr, the_output_port_from_iter.addr
67 )
68 self.assertEqual(the_output_port_from_iter.user_data, "user data")
69
70 def test_create_from_message_iterator(self):
71 class MySourceIter(bt2._UserMessageIterator):
72 def __init__(self, config, self_port_output):
73 nonlocal src_iter_initialized
74 src_iter_initialized = True
75
76 class MySource(bt2._UserSourceComponent, message_iterator_class=MySourceIter):
77 def __init__(self, config, params, obj):
78 self._add_output_port("out")
79
80 class MyFilterIter(bt2._UserMessageIterator):
81 def __init__(self, config, self_port_output):
82 nonlocal flt_iter_initialized
83 flt_iter_initialized = True
84 self._up_iter = self._create_message_iterator(
85 self._component._input_ports["in"]
86 )
87
88 def __next__(self):
89 return next(self._up_iter)
90
91 class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
92 def __init__(self, config, params, obj):
93 self._add_input_port("in")
94 self._add_output_port("out")
95
96 src_iter_initialized = False
97 flt_iter_initialized = False
98 graph = _create_graph(MySource, SimpleSink, MyFilter)
99 graph.run()
100 self.assertTrue(src_iter_initialized)
101 self.assertTrue(flt_iter_initialized)
102
103 # Test that creating a message iterator from a sink component on a
104 # non-connected inport port raises.
105 def test_create_from_sink_component_unconnected_port_raises(self):
106 class MySink(bt2._UserSinkComponent):
107 def __init__(comp_self, config, params, obj):
108 comp_self._input_port = comp_self._add_input_port("in")
109
110 def _user_graph_is_configured(comp_self):
111 with self.assertRaisesRegex(ValueError, "input port is not connected"):
112 comp_self._create_message_iterator(comp_self._input_port)
113
114 nonlocal seen
115 seen = True
116
117 def _user_consume(self):
118 raise bt2.Stop
119
120 seen = False
121 graph = bt2.Graph()
122 graph.add_component(MySink, "snk")
123 graph.run()
124 self.assertTrue(seen)
125
126 # Test that creating a message iterator from a message iteartor on a
127 # non-connected inport port raises.
128 def test_create_from_message_iterator_unconnected_port_raises(self):
129 class MyFilterIter(bt2._UserMessageIterator):
130 def __init__(iter_self, config, port):
131 input_port = iter_self._component._input_ports["in"]
132
133 with self.assertRaisesRegex(ValueError, "input port is not connected"):
134 iter_self._create_message_iterator(input_port)
135
136 nonlocal seen
137 seen = True
138
139 class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
140 def __init__(comp_self, config, params, obj):
141 comp_self._add_input_port("in")
142 comp_self._add_output_port("out")
143
144 class MySink(bt2._UserSinkComponent):
145 def __init__(comp_self, config, params, obj):
146 comp_self._input_port = comp_self._add_input_port("in")
147
148 def _user_graph_is_configured(comp_self):
149 comp_self._input_iter = comp_self._create_message_iterator(
150 comp_self._input_port
151 )
152
153 def _user_consume(self):
154 raise bt2.Stop
155
156 seen = False
157 graph = bt2.Graph()
158 flt = graph.add_component(MyFilter, "flt")
159 snk = graph.add_component(MySink, "snk")
160 graph.connect_ports(flt.output_ports["out"], snk.input_ports["in"])
161 graph.run()
162 self.assertTrue(seen)
163
164 def test_create_user_error(self):
165 # This tests both error handling by
166 # _UserSinkComponent._create_message_iterator
167 # and _UserMessageIterator._create_message_iterator, as they
168 # are both used in the graph.
169 class MySourceIter(bt2._UserMessageIterator):
170 def __init__(self, config, self_port_output):
171 raise ValueError("Very bad error")
172
173 class MySource(bt2._UserSourceComponent, message_iterator_class=MySourceIter):
174 def __init__(self, config, params, obj):
175 self._add_output_port("out")
176
177 class MyFilterIter(bt2._UserMessageIterator):
178 def __init__(self, config, self_port_output):
179 # This is expected to raise because of the error in
180 # MySourceIter.__init__.
181 self._create_message_iterator(self._component._input_ports["in"])
182
183 class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
184 def __init__(self, config, params, obj):
185 self._add_input_port("in")
186 self._add_output_port("out")
187
188 graph = _create_graph(MySource, SimpleSink, MyFilter)
189
190 with self.assertRaises(bt2._Error) as ctx:
191 graph.run()
192
193 exc = ctx.exception
194 cause = exc[0]
195
196 self.assertIsInstance(cause, bt2._MessageIteratorErrorCause)
197 self.assertEqual(cause.component_name, "src")
198 self.assertEqual(cause.component_output_port_name, "out")
199 self.assertIn("ValueError: Very bad error", cause.message)
200
201 def test_finalize(self):
202 class MyIter(bt2._UserMessageIterator):
203 def _user_finalize(self):
204 nonlocal finalized
205 finalized = True
206
207 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
208 def __init__(self, config, params, obj):
209 self._add_output_port("out")
210
211 finalized = False
212 graph = _create_graph(MySource, SimpleSink)
213 graph.run()
214 del graph
215 self.assertTrue(finalized)
216
217 def test_config_parameter(self):
218 class MyIter(bt2._UserMessageIterator):
219 def __init__(self, config, port):
220 nonlocal config_type
221 config_type = type(config)
222
223 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
224 def __init__(self, config, params, obj):
225 self._add_output_port("out")
226
227 config_type = None
228 graph = _create_graph(MySource, SimpleSink)
229 graph.run()
230 self.assertIs(config_type, bt2_message_iterator._MessageIteratorConfiguration)
231
232 def _test_config_can_seek_forward(self, set_can_seek_forward):
233 class MyIter(bt2._UserMessageIterator):
234 def __init__(self, config, port):
235 if set_can_seek_forward:
236 config.can_seek_forward = True
237
238 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
239 def __init__(self, config, params, obj):
240 self._add_output_port("out")
241
242 class MySink(bt2._UserSinkComponent):
243 def __init__(self, config, params, obj):
244 self._add_input_port("in")
245
246 def _user_graph_is_configured(self):
247 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
248
249 def _user_consume(self):
250 nonlocal can_seek_forward
251 can_seek_forward = self._msg_iter.can_seek_forward
252
253 can_seek_forward = None
254 graph = _create_graph(MySource, MySink)
255 graph.run_once()
256 self.assertIs(can_seek_forward, set_can_seek_forward)
257
258 def test_config_can_seek_forward_default(self):
259 self._test_config_can_seek_forward(False)
260
261 def test_config_can_seek_forward(self):
262 self._test_config_can_seek_forward(True)
263
264 def test_config_can_seek_forward_wrong_type(self):
265 class MyIter(bt2._UserMessageIterator):
266 def __init__(self, config, port):
267 config.can_seek_forward = 1
268
269 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
270 def __init__(self, config, params, obj):
271 self._add_output_port("out")
272
273 graph = _create_graph(MySource, SimpleSink)
274 with self.assertRaises(bt2._Error) as ctx:
275 graph.run()
276
277 root_cause = ctx.exception[0]
278 self.assertIn("TypeError: 'int' is not a 'bool' object", root_cause.message)
279
280 def test_component(self):
281 class MyIter(bt2._UserMessageIterator):
282 def __init__(self, config, self_port_output):
283 nonlocal salut
284 salut = self._component._salut
285
286 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
287 def __init__(self, config, params, obj):
288 self._add_output_port("out")
289 self._salut = 23
290
291 salut = None
292 graph = _create_graph(MySource, SimpleSink)
293 graph.run()
294 self.assertEqual(salut, 23)
295
296 def test_port(self):
297 class MyIter(bt2._UserMessageIterator):
298 def __init__(self_iter, config, self_port_output):
299 nonlocal called
300 called = True
301 port = self_iter._port
302 self.assertIs(type(self_port_output), bt2_port._UserComponentOutputPort)
303 self.assertIs(type(port), bt2_port._UserComponentOutputPort)
304 self.assertEqual(self_port_output.addr, port.addr)
305
306 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
307 def __init__(self, config, params, obj):
308 self._add_output_port("out")
309
310 called = False
311 graph = _create_graph(MySource, SimpleSink)
312 graph.run()
313 self.assertTrue(called)
314
315 def test_addr(self):
316 class MyIter(bt2._UserMessageIterator):
317 def __init__(self, config, self_port_output):
318 nonlocal addr
319 addr = self.addr
320
321 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
322 def __init__(self, config, params, obj):
323 self._add_output_port("out")
324
325 addr = None
326 graph = _create_graph(MySource, SimpleSink)
327 graph.run()
328 self.assertIsNotNone(addr)
329 self.assertNotEqual(addr, 0)
330
331 # Test that messages returned by _UserMessageIterator.__next__ remain valid
332 # and can be re-used.
333 def test_reuse_message(self):
334 class MyIter(bt2._UserMessageIterator):
335 def __init__(self, config, port):
336 tc, sc, ec = port.user_data
337 trace = tc()
338 stream = trace.create_stream(sc)
339 packet = stream.create_packet()
340
341 # This message will be returned twice by __next__.
342 event_message = self._create_event_message(ec, packet)
343
344 self._msgs = [
345 self._create_stream_beginning_message(stream),
346 self._create_packet_beginning_message(packet),
347 event_message,
348 event_message,
349 ]
350
351 def __next__(self):
352 return self._msgs.pop(0)
353
354 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
355 def __init__(self, config, params, obj):
356 tc = self._create_trace_class()
357 sc = tc.create_stream_class(supports_packets=True)
358 ec = sc.create_event_class()
359 self._add_output_port("out", (tc, sc, ec))
360
361 graph = bt2.Graph()
362 src = graph.add_component(MySource, "src")
363 it = TestOutputPortMessageIterator(graph, src.output_ports["out"])
364
365 # Skip beginning messages.
366 msg = next(it)
367 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
368 msg = next(it)
369 self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
370
371 msg_ev1 = next(it)
372 msg_ev2 = next(it)
373
374 self.assertIs(type(msg_ev1), bt2._EventMessageConst)
375 self.assertIs(type(msg_ev2), bt2._EventMessageConst)
376 self.assertEqual(msg_ev1.addr, msg_ev2.addr)
377
378 # Try consuming many times from an iterator that always returns TryAgain.
379 # This verifies that we are not missing an incref of Py_None, making the
380 # refcount of Py_None reach 0.
381 def test_try_again_many_times(self):
382 class MyIter(bt2._UserMessageIterator):
383 def __next__(self):
384 raise bt2.TryAgain
385
386 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
387 def __init__(self, config, params, obj):
388 self._add_output_port("out")
389
390 class MyFilterIter(bt2._UserMessageIterator):
391 def __init__(self, port):
392 input_port = port.user_data
393 self._upstream_iter = self._create_message_iterator(input_port)
394
395 def __next__(self):
396 return next(self._upstream_iter)
397
398 def _user_seek_beginning(self):
399 self._upstream_iter.seek_beginning()
400
401 def _user_can_seek_beginning(self):
402 return self._upstream_iter.can_seek_beginning()
403
404 class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
405 def __init__(self, config, params, obj):
406 input_port = self._add_input_port("in")
407 self._add_output_port("out", input_port)
408
409 graph = bt2.Graph()
410 src = graph.add_component(MySource, "src")
411 it = TestOutputPortMessageIterator(graph, src.output_ports["out"])
412
413 # Three times the initial ref count of `None` iterations should
414 # be enough to catch the bug even if there are small differences
415 # between configurations.
416 none_ref_count = sys.getrefcount(None) * 3
417
418 for i in range(none_ref_count):
419 with self.assertRaises(bt2.TryAgain):
420 next(it)
421
422 def test_error_in_iterator_with_cycle_after_having_created_upstream_iterator(self):
423 # Test a failure that triggered an abort in libbabeltrace2, in this situation:
424 #
425 # - The filter iterator creates an upstream iterator.
426 # - The filter iterator creates a reference cycle, including itself.
427 # - An exception is raised, causing the filter iterator's
428 # initialization method to fail.
429 class MySourceIter(bt2._UserMessageIterator):
430 pass
431
432 class MySource(bt2._UserSourceComponent, message_iterator_class=MySourceIter):
433 def __init__(self, config, params, obj):
434 self._add_output_port("out")
435
436 class MyFilterIter(bt2._UserMessageIterator):
437 def __init__(self, config, port):
438 # First, create an upstream iterator.
439 self._upstream_iter = self._create_message_iterator(
440 self._component._input_ports["in"]
441 )
442
443 # Then, voluntarily make a reference cycle that will keep this
444 # Python object alive, which will keep the upstream iterator
445 # Babeltrace object alive.
446 self._self = self
447
448 # Finally, raise an exception to make __init__ fail.
449 raise ValueError("woops")
450
451 class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
452 def __init__(self, config, params, obj):
453 self._in = self._add_input_port("in")
454 self._out = self._add_output_port("out")
455
456 class MySink(bt2._UserSinkComponent):
457 def __init__(self, config, params, obj):
458 self._input_port = self._add_input_port("in")
459
460 def _user_graph_is_configured(self):
461 self._upstream_iter = self._create_message_iterator(self._input_port)
462
463 def _user_consume(self):
464 # We should not reach this.
465 assert False
466
467 g = bt2.Graph()
468 src = g.add_component(MySource, "src")
469 flt = g.add_component(MyFilter, "flt")
470 snk = g.add_component(MySink, "snk")
471 g.connect_ports(src.output_ports["out"], flt.input_ports["in"])
472 g.connect_ports(flt.output_ports["out"], snk.input_ports["in"])
473
474 with self.assertRaisesRegex(bt2._Error, "ValueError: woops"):
475 g.run()
476
477
478 def _setup_seek_test(
479 sink_cls,
480 user_seek_beginning=None,
481 user_can_seek_beginning=None,
482 user_seek_ns_from_origin=None,
483 user_can_seek_ns_from_origin=None,
484 can_seek_forward=False,
485 ):
486 class MySourceIter(bt2._UserMessageIterator):
487 def __init__(self, config, port):
488 tc, sc, ec = port.user_data
489 trace = tc()
490 stream = trace.create_stream(sc)
491 packet = stream.create_packet()
492
493 self._msgs = [
494 self._create_stream_beginning_message(stream),
495 self._create_packet_beginning_message(packet),
496 self._create_event_message(ec, packet),
497 self._create_event_message(ec, packet),
498 self._create_packet_end_message(packet),
499 self._create_stream_end_message(stream),
500 ]
501 self._at = 0
502 config.can_seek_forward = can_seek_forward
503
504 def __next__(self):
505 if self._at < len(self._msgs):
506 msg = self._msgs[self._at]
507 self._at += 1
508 return msg
509 else:
510 raise StopIteration
511
512 if user_seek_beginning is not None:
513 MySourceIter._user_seek_beginning = user_seek_beginning
514
515 if user_can_seek_beginning is not None:
516 MySourceIter._user_can_seek_beginning = user_can_seek_beginning
517
518 if user_seek_ns_from_origin is not None:
519 MySourceIter._user_seek_ns_from_origin = user_seek_ns_from_origin
520
521 if user_can_seek_ns_from_origin is not None:
522 MySourceIter._user_can_seek_ns_from_origin = user_can_seek_ns_from_origin
523
524 class MySource(bt2._UserSourceComponent, message_iterator_class=MySourceIter):
525 def __init__(self, config, params, obj):
526 tc = self._create_trace_class()
527 sc = tc.create_stream_class(supports_packets=True)
528 ec = sc.create_event_class()
529
530 self._add_output_port("out", (tc, sc, ec))
531
532 class MyFilterIter(bt2._UserMessageIterator):
533 def __init__(self, config, port):
534 self._upstream_iter = self._create_message_iterator(
535 self._component._input_ports["in"]
536 )
537 config.can_seek_forward = self._upstream_iter.can_seek_forward
538
539 def __next__(self):
540 return next(self._upstream_iter)
541
542 def _user_can_seek_beginning(self):
543 return self._upstream_iter.can_seek_beginning()
544
545 def _user_seek_beginning(self):
546 self._upstream_iter.seek_beginning()
547
548 def _user_can_seek_ns_from_origin(self, ns_from_origin):
549 return self._upstream_iter.can_seek_ns_from_origin(ns_from_origin)
550
551 def _user_seek_ns_from_origin(self, ns_from_origin):
552 self._upstream_iter.seek_ns_from_origin(ns_from_origin)
553
554 class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
555 def __init__(self, config, params, obj):
556 self._add_input_port("in")
557 self._add_output_port("out")
558
559 return _create_graph(MySource, sink_cls, flt_comp_cls=MyFilter)
560
561
562 class UserMessageIteratorSeekBeginningTestCase(unittest.TestCase):
563 def test_can_seek_beginning_without_seek_beginning(self):
564 with self.assertRaisesRegex(
565 bt2._IncompleteUserClass,
566 "cannot create component class 'MySource': message iterator class implements _user_can_seek_beginning but not _user_seek_beginning",
567 ):
568 _setup_seek_test(SimpleSink, user_can_seek_beginning=lambda: None)
569
570 def test_can_seek_beginning(self):
571 class MySink(bt2._UserSinkComponent):
572 def __init__(self, config, params, obj):
573 self._add_input_port("in")
574
575 def _user_graph_is_configured(self):
576 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
577
578 def _user_consume(self):
579 nonlocal can_seek_beginning
580 can_seek_beginning = self._msg_iter.can_seek_beginning()
581
582 def _user_can_seek_beginning(self):
583 nonlocal input_port_iter_can_seek_beginning
584 return input_port_iter_can_seek_beginning
585
586 graph = _setup_seek_test(
587 MySink,
588 user_can_seek_beginning=_user_can_seek_beginning,
589 user_seek_beginning=lambda: None,
590 )
591
592 input_port_iter_can_seek_beginning = True
593 can_seek_beginning = None
594 graph.run_once()
595 self.assertIs(can_seek_beginning, True)
596
597 input_port_iter_can_seek_beginning = False
598 can_seek_beginning = None
599 graph.run_once()
600 self.assertIs(can_seek_beginning, False)
601
602 def test_no_can_seek_beginning_with_seek_beginning(self):
603 # Test an iterator without a _user_can_seek_beginning method, but with
604 # a _user_seek_beginning method.
605 class MySink(bt2._UserSinkComponent):
606 def __init__(self, config, params, obj):
607 self._add_input_port("in")
608
609 def _user_graph_is_configured(self):
610 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
611
612 def _user_consume(self):
613 nonlocal can_seek_beginning
614 can_seek_beginning = self._msg_iter.can_seek_beginning()
615
616 def _user_seek_beginning(self):
617 pass
618
619 graph = _setup_seek_test(MySink, user_seek_beginning=_user_seek_beginning)
620 can_seek_beginning = None
621 graph.run_once()
622 self.assertIs(can_seek_beginning, True)
623
624 def test_no_can_seek_beginning(self):
625 # Test an iterator without a _user_can_seek_beginning method, without
626 # a _user_seek_beginning method.
627 class MySink(bt2._UserSinkComponent):
628 def __init__(self, config, params, obj):
629 self._add_input_port("in")
630
631 def _user_graph_is_configured(self):
632 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
633
634 def _user_consume(self):
635 nonlocal can_seek_beginning
636 can_seek_beginning = self._msg_iter.can_seek_beginning()
637
638 graph = _setup_seek_test(MySink)
639 can_seek_beginning = None
640 graph.run_once()
641 self.assertIs(can_seek_beginning, False)
642
643 def test_can_seek_beginning_user_error(self):
644 class MySink(bt2._UserSinkComponent):
645 def __init__(self, config, params, obj):
646 self._add_input_port("in")
647
648 def _user_graph_is_configured(self):
649 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
650
651 def _user_consume(self):
652 # This is expected to raise.
653 self._msg_iter.can_seek_beginning()
654
655 def _user_can_seek_beginning(self):
656 raise ValueError("moustiquaire")
657
658 graph = _setup_seek_test(
659 MySink,
660 user_can_seek_beginning=_user_can_seek_beginning,
661 user_seek_beginning=lambda: None,
662 )
663
664 with self.assertRaises(bt2._Error) as ctx:
665 graph.run_once()
666
667 cause = ctx.exception[0]
668 self.assertIn("ValueError: moustiquaire", cause.message)
669
670 def test_can_seek_beginning_wrong_return_value(self):
671 class MySink(bt2._UserSinkComponent):
672 def __init__(self, config, params, obj):
673 self._add_input_port("in")
674
675 def _user_graph_is_configured(self):
676 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
677
678 def _user_consume(self):
679 # This is expected to raise.
680 self._msg_iter.can_seek_beginning()
681
682 def _user_can_seek_beginning(self):
683 return "Amqui"
684
685 graph = _setup_seek_test(
686 MySink,
687 user_can_seek_beginning=_user_can_seek_beginning,
688 user_seek_beginning=lambda: None,
689 )
690
691 with self.assertRaises(bt2._Error) as ctx:
692 graph.run_once()
693
694 cause = ctx.exception[0]
695 self.assertIn("TypeError: 'str' is not a 'bool' object", cause.message)
696
697 def test_seek_beginning(self):
698 class MySink(bt2._UserSinkComponent):
699 def __init__(self, config, params, obj):
700 self._add_input_port("in")
701
702 def _user_graph_is_configured(self):
703 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
704
705 def _user_consume(self):
706 nonlocal do_seek_beginning
707 nonlocal msg
708
709 if do_seek_beginning:
710 self._msg_iter.seek_beginning()
711 return
712
713 msg = next(self._msg_iter)
714
715 def _user_seek_beginning(self):
716 self._at = 0
717
718 msg = None
719 graph = _setup_seek_test(MySink, user_seek_beginning=_user_seek_beginning)
720
721 # Consume message.
722 do_seek_beginning = False
723 graph.run_once()
724 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
725
726 # Consume message.
727 graph.run_once()
728 self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
729
730 # Seek beginning.
731 do_seek_beginning = True
732 graph.run_once()
733
734 # Consume message.
735 do_seek_beginning = False
736 graph.run_once()
737 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
738
739 def test_seek_beginning_user_error(self):
740 class MySink(bt2._UserSinkComponent):
741 def __init__(self, config, params, obj):
742 self._add_input_port("in")
743
744 def _user_graph_is_configured(self):
745 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
746
747 def _user_consume(self):
748 self._msg_iter.seek_beginning()
749
750 def _user_seek_beginning(self):
751 raise ValueError("ouch")
752
753 graph = _setup_seek_test(MySink, user_seek_beginning=_user_seek_beginning)
754
755 with self.assertRaises(bt2._Error):
756 graph.run_once()
757
758
759 class UserMessageIteratorSeekNsFromOriginTestCase(unittest.TestCase):
760 def test_can_seek_ns_from_origin_without_seek_ns_from_origin(self):
761 # Test the case where:
762 #
763 # - can_seek_ns_from_origin: Returns True (don't really care, as long
764 # as it's provided)
765 # - seek_ns_from_origin provided: No
766 # - can the iterator seek beginning: Don't care
767 # - can the iterator seek forward: Don't care
768 for can_seek_ns_from_origin in (False, True):
769 for iter_can_seek_beginning in (False, True):
770 for iter_can_seek_forward in (False, True):
771 with self.assertRaisesRegex(
772 bt2._IncompleteUserClass,
773 "cannot create component class 'MySource': message iterator class implements _user_can_seek_ns_from_origin but not _user_seek_ns_from_origin",
774 ):
775 self._can_seek_ns_from_origin_test(
776 None,
777 user_can_seek_ns_from_origin_ret_val=True,
778 user_seek_ns_from_origin_provided=False,
779 iter_can_seek_beginning=iter_can_seek_beginning,
780 iter_can_seek_forward=iter_can_seek_forward,
781 )
782
783 def test_can_seek_ns_from_origin_returns_true(self):
784 # Test the case where:
785 #
786 # - can_seek_ns_from_origin: returns True
787 # - seek_ns_from_origin provided: Yes
788 # - can the iterator seek beginning: Don't care
789 # - can the iterator seek forward: Don't care
790 #
791 # We expect iter.can_seek_ns_from_origin to return True.
792 for iter_can_seek_beginning in (False, True):
793 for iter_can_seek_forward in (False, True):
794 self._can_seek_ns_from_origin_test(
795 expected_outcome=True,
796 user_can_seek_ns_from_origin_ret_val=True,
797 user_seek_ns_from_origin_provided=True,
798 iter_can_seek_beginning=iter_can_seek_beginning,
799 iter_can_seek_forward=iter_can_seek_forward,
800 )
801
802 def test_can_seek_ns_from_origin_returns_false_can_seek_beginning_forward_seekable(
803 self,
804 ):
805 # Test the case where:
806 #
807 # - can_seek_ns_from_origin: returns False
808 # - seek_ns_from_origin provided: Yes
809 # - can the iterator seek beginning: Yes
810 # - can the iterator seek forward: Yes
811 #
812 # We expect iter.can_seek_ns_from_origin to return True.
813 self._can_seek_ns_from_origin_test(
814 expected_outcome=True,
815 user_can_seek_ns_from_origin_ret_val=False,
816 user_seek_ns_from_origin_provided=True,
817 iter_can_seek_beginning=True,
818 iter_can_seek_forward=True,
819 )
820
821 def test_can_seek_ns_from_origin_returns_false_can_seek_beginning_not_forward_seekable(
822 self,
823 ):
824 # Test the case where:
825 #
826 # - can_seek_ns_from_origin: returns False
827 # - seek_ns_from_origin provided: Yes
828 # - can the iterator seek beginning: Yes
829 # - can the iterator seek forward: No
830 #
831 # We expect iter.can_seek_ns_from_origin to return False.
832 self._can_seek_ns_from_origin_test(
833 expected_outcome=False,
834 user_can_seek_ns_from_origin_ret_val=False,
835 user_seek_ns_from_origin_provided=True,
836 iter_can_seek_beginning=True,
837 iter_can_seek_forward=False,
838 )
839
840 def test_can_seek_ns_from_origin_returns_false_cant_seek_beginning_forward_seekable(
841 self,
842 ):
843 # Test the case where:
844 #
845 # - can_seek_ns_from_origin: returns False
846 # - seek_ns_from_origin provided: Yes
847 # - can the iterator seek beginning: No
848 # - can the iterator seek forward: Yes
849 #
850 # We expect iter.can_seek_ns_from_origin to return False.
851 self._can_seek_ns_from_origin_test(
852 expected_outcome=False,
853 user_can_seek_ns_from_origin_ret_val=False,
854 user_seek_ns_from_origin_provided=True,
855 iter_can_seek_beginning=False,
856 iter_can_seek_forward=True,
857 )
858
859 def test_can_seek_ns_from_origin_returns_false_cant_seek_beginning_not_forward_seekable(
860 self,
861 ):
862 # Test the case where:
863 #
864 # - can_seek_ns_from_origin: returns False
865 # - seek_ns_from_origin provided: Yes
866 # - can the iterator seek beginning: No
867 # - can the iterator seek forward: No
868 #
869 # We expect iter.can_seek_ns_from_origin to return False.
870 self._can_seek_ns_from_origin_test(
871 expected_outcome=False,
872 user_can_seek_ns_from_origin_ret_val=False,
873 user_seek_ns_from_origin_provided=True,
874 iter_can_seek_beginning=False,
875 iter_can_seek_forward=False,
876 )
877
878 def test_no_can_seek_ns_from_origin_seek_ns_from_origin(self):
879 # Test the case where:
880 #
881 # - can_seek_ns_from_origin: Not provided
882 # - seek_ns_from_origin provided: Yes
883 # - can the iterator seek beginning: Don't care
884 # - can the iterator seek forward: Don't care
885 #
886 # We expect iter.can_seek_ns_from_origin to return True.
887 for iter_can_seek_beginning in (False, True):
888 for iter_can_seek_forward in (False, True):
889 self._can_seek_ns_from_origin_test(
890 expected_outcome=True,
891 user_can_seek_ns_from_origin_ret_val=None,
892 user_seek_ns_from_origin_provided=True,
893 iter_can_seek_beginning=iter_can_seek_beginning,
894 iter_can_seek_forward=iter_can_seek_forward,
895 )
896
897 def test_no_can_seek_ns_from_origin_no_seek_ns_from_origin_can_seek_beginning_forward_seekable(
898 self,
899 ):
900 # Test the case where:
901 #
902 # - can_seek_ns_from_origin: Not provided
903 # - seek_ns_from_origin provided: Not provided
904 # - can the iterator seek beginning: Yes
905 # - can the iterator seek forward: Yes
906 #
907 # We expect iter.can_seek_ns_from_origin to return True.
908 self._can_seek_ns_from_origin_test(
909 expected_outcome=True,
910 user_can_seek_ns_from_origin_ret_val=None,
911 user_seek_ns_from_origin_provided=False,
912 iter_can_seek_beginning=True,
913 iter_can_seek_forward=True,
914 )
915
916 def test_no_can_seek_ns_from_origin_no_seek_ns_from_origin_can_seek_beginning_not_forward_seekable(
917 self,
918 ):
919 # Test the case where:
920 #
921 # - can_seek_ns_from_origin: Not provided
922 # - seek_ns_from_origin provided: Not provided
923 # - can the iterator seek beginning: Yes
924 # - can the iterator seek forward: No
925 #
926 # We expect iter.can_seek_ns_from_origin to return False.
927 self._can_seek_ns_from_origin_test(
928 expected_outcome=False,
929 user_can_seek_ns_from_origin_ret_val=None,
930 user_seek_ns_from_origin_provided=False,
931 iter_can_seek_beginning=True,
932 iter_can_seek_forward=False,
933 )
934
935 def test_no_can_seek_ns_from_origin_no_seek_ns_from_origin_cant_seek_beginning_forward_seekable(
936 self,
937 ):
938 # Test the case where:
939 #
940 # - can_seek_ns_from_origin: Not provided
941 # - seek_ns_from_origin provided: Not provided
942 # - can the iterator seek beginning: No
943 # - can the iterator seek forward: Yes
944 #
945 # We expect iter.can_seek_ns_from_origin to return False.
946 self._can_seek_ns_from_origin_test(
947 expected_outcome=False,
948 user_can_seek_ns_from_origin_ret_val=None,
949 user_seek_ns_from_origin_provided=False,
950 iter_can_seek_beginning=False,
951 iter_can_seek_forward=True,
952 )
953
954 def test_no_can_seek_ns_from_origin_no_seek_ns_from_origin_cant_seek_beginning_not_forward_seekable(
955 self,
956 ):
957 # Test the case where:
958 #
959 # - can_seek_ns_from_origin: Not provided
960 # - seek_ns_from_origin provided: Not provided
961 # - can the iterator seek beginning: No
962 # - can the iterator seek forward: No
963 #
964 # We expect iter.can_seek_ns_from_origin to return False.
965 self._can_seek_ns_from_origin_test(
966 expected_outcome=False,
967 user_can_seek_ns_from_origin_ret_val=None,
968 user_seek_ns_from_origin_provided=False,
969 iter_can_seek_beginning=False,
970 iter_can_seek_forward=False,
971 )
972
973 def _can_seek_ns_from_origin_test(
974 self,
975 expected_outcome,
976 user_can_seek_ns_from_origin_ret_val,
977 user_seek_ns_from_origin_provided,
978 iter_can_seek_beginning,
979 iter_can_seek_forward,
980 ):
981 class MySink(bt2._UserSinkComponent):
982 def __init__(self, config, params, obj):
983 self._add_input_port("in")
984
985 def _user_graph_is_configured(self):
986 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
987
988 def _user_consume(self):
989 nonlocal can_seek_ns_from_origin
990 can_seek_ns_from_origin = self._msg_iter.can_seek_ns_from_origin(
991 passed_ns_from_origin
992 )
993
994 if user_can_seek_ns_from_origin_ret_val is not None:
995
996 def user_can_seek_ns_from_origin(self, ns_from_origin):
997 nonlocal received_ns_from_origin
998 received_ns_from_origin = ns_from_origin
999 return user_can_seek_ns_from_origin_ret_val
1000
1001 else:
1002 user_can_seek_ns_from_origin = None
1003
1004 if user_seek_ns_from_origin_provided:
1005
1006 def user_seek_ns_from_origin(self, ns_from_origin):
1007 pass
1008
1009 else:
1010 user_seek_ns_from_origin = None
1011
1012 if iter_can_seek_beginning:
1013
1014 def user_seek_beginning(self):
1015 pass
1016
1017 else:
1018 user_seek_beginning = None
1019
1020 graph = _setup_seek_test(
1021 MySink,
1022 user_can_seek_ns_from_origin=user_can_seek_ns_from_origin,
1023 user_seek_ns_from_origin=user_seek_ns_from_origin,
1024 user_seek_beginning=user_seek_beginning,
1025 can_seek_forward=iter_can_seek_forward,
1026 )
1027
1028 passed_ns_from_origin = 77
1029 received_ns_from_origin = None
1030 can_seek_ns_from_origin = None
1031 graph.run_once()
1032 self.assertIs(can_seek_ns_from_origin, expected_outcome)
1033
1034 if user_can_seek_ns_from_origin_ret_val is not None:
1035 self.assertEqual(received_ns_from_origin, passed_ns_from_origin)
1036
1037 def test_can_seek_ns_from_origin_user_error(self):
1038 class MySink(bt2._UserSinkComponent):
1039 def __init__(self, config, params, obj):
1040 self._add_input_port("in")
1041
1042 def _user_graph_is_configured(self):
1043 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
1044
1045 def _user_consume(self):
1046 # This is expected to raise.
1047 self._msg_iter.can_seek_ns_from_origin(2)
1048
1049 def _user_can_seek_ns_from_origin(self, ns_from_origin):
1050 raise ValueError("Joutel")
1051
1052 graph = _setup_seek_test(
1053 MySink,
1054 user_can_seek_ns_from_origin=_user_can_seek_ns_from_origin,
1055 user_seek_ns_from_origin=lambda: None,
1056 )
1057
1058 with self.assertRaises(bt2._Error) as ctx:
1059 graph.run_once()
1060
1061 cause = ctx.exception[0]
1062 self.assertIn("ValueError: Joutel", cause.message)
1063
1064 def test_can_seek_ns_from_origin_wrong_return_value(self):
1065 class MySink(bt2._UserSinkComponent):
1066 def __init__(self, config, params, obj):
1067 self._add_input_port("in")
1068
1069 def _user_graph_is_configured(self):
1070 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
1071
1072 def _user_consume(self):
1073 # This is expected to raise.
1074 self._msg_iter.can_seek_ns_from_origin(2)
1075
1076 def _user_can_seek_ns_from_origin(self, ns_from_origin):
1077 return "Nitchequon"
1078
1079 graph = _setup_seek_test(
1080 MySink,
1081 user_can_seek_ns_from_origin=_user_can_seek_ns_from_origin,
1082 user_seek_ns_from_origin=lambda: None,
1083 )
1084
1085 with self.assertRaises(bt2._Error) as ctx:
1086 graph.run_once()
1087
1088 cause = ctx.exception[0]
1089 self.assertIn("TypeError: 'str' is not a 'bool' object", cause.message)
1090
1091 def test_seek_ns_from_origin(self):
1092 class MySink(bt2._UserSinkComponent):
1093 def __init__(self, config, params, obj):
1094 self._add_input_port("in")
1095
1096 def _user_graph_is_configured(self):
1097 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
1098
1099 def _user_consume(self):
1100 self._msg_iter.seek_ns_from_origin(17)
1101
1102 def _user_seek_ns_from_origin(self, ns_from_origin):
1103 nonlocal actual_ns_from_origin
1104 actual_ns_from_origin = ns_from_origin
1105
1106 graph = _setup_seek_test(
1107 MySink, user_seek_ns_from_origin=_user_seek_ns_from_origin
1108 )
1109
1110 actual_ns_from_origin = None
1111 graph.run_once()
1112 self.assertEqual(actual_ns_from_origin, 17)
1113
1114
1115 if __name__ == "__main__":
1116 unittest.main()
This page took 0.083626 seconds and 4 git commands to generate.