tests: remove unnecessary message iterator classes
[babeltrace.git] / tests / bindings / python / bt2 / test_graph.py
1 #
2 # Copyright (C) 2019 EfficiOS Inc.
3 #
4 # This program is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License
6 # as published by the Free Software Foundation; only version 2
7 # of the License.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 #
18
19 import unittest
20 import bt2
21
22
23 class _MyIter(bt2._UserMessageIterator):
24 def __init__(self, config, self_output_port):
25 self._build_meta()
26 self._at = 0
27
28 def _build_meta(self):
29 self._tc = self._component._create_trace_class()
30 self._t = self._tc()
31 self._sc = self._tc.create_stream_class(supports_packets=True)
32 self._ec = self._sc.create_event_class(name='salut')
33 self._my_int_ft = self._tc.create_signed_integer_field_class(32)
34 payload_ft = self._tc.create_structure_field_class()
35 payload_ft += [('my_int', self._my_int_ft)]
36 self._ec.payload_field_type = payload_ft
37 self._stream = self._t.create_stream(self._sc)
38 self._packet = self._stream.create_packet()
39
40 def _create_event(self, value):
41 ev = self._ec()
42 ev.payload_field['my_int'] = value
43 ev.packet = self._packet
44 return ev
45
46
47 class GraphTestCase(unittest.TestCase):
48 def setUp(self):
49 self._graph = bt2.Graph()
50
51 def tearDown(self):
52 del self._graph
53
54 def test_create_default(self):
55 bt2.Graph()
56
57 def test_create_known_mip_version(self):
58 bt2.Graph(0)
59
60 def test_create_invalid_mip_version_type(self):
61 with self.assertRaises(TypeError):
62 bt2.Graph('')
63
64 def test_create_unknown_mip_version(self):
65 with self.assertRaisesRegex(ValueError, 'unknown MIP version'):
66 bt2.Graph(1)
67
68 def test_default_interrupter(self):
69 interrupter = self._graph.default_interrupter
70 self.assertIs(type(interrupter), bt2.Interrupter)
71
72 def test_add_component_user_cls(self):
73 class MySink(bt2._UserSinkComponent):
74 def _user_consume(self):
75 pass
76
77 comp = self._graph.add_component(MySink, 'salut')
78 self.assertEqual(comp.name, 'salut')
79
80 def test_add_component_gen_cls(self):
81 class MySink(bt2._UserSinkComponent):
82 def _user_consume(self):
83 pass
84
85 comp = self._graph.add_component(MySink, 'salut')
86 assert comp
87 comp2 = self._graph.add_component(comp.cls, 'salut2')
88 self.assertEqual(comp2.name, 'salut2')
89
90 def test_add_component_params(self):
91 comp_params = None
92
93 class MySink(bt2._UserSinkComponent):
94 def __init__(self, config, params, obj):
95 nonlocal comp_params
96 comp_params = params
97
98 def _user_consume(self):
99 pass
100
101 params = {'hello': 23, 'path': '/path/to/stuff'}
102 self._graph.add_component(MySink, 'salut', params)
103 self.assertEqual(params, comp_params)
104 del comp_params
105
106 def test_add_component_obj_python_comp_cls(self):
107 comp_obj = None
108
109 class MySink(bt2._UserSinkComponent):
110 def __init__(self, config, params, obj):
111 nonlocal comp_obj
112 comp_obj = obj
113
114 def _user_consume(self):
115 pass
116
117 obj = object()
118 self._graph.add_component(MySink, 'salut', obj=obj)
119 self.assertIs(comp_obj, obj)
120 del comp_obj
121
122 def test_add_component_obj_none_python_comp_cls(self):
123 comp_obj = None
124
125 class MySink(bt2._UserSinkComponent):
126 def __init__(self, config, params, obj):
127 nonlocal comp_obj
128 comp_obj = obj
129
130 def _user_consume(self):
131 pass
132
133 self._graph.add_component(MySink, 'salut')
134 self.assertIsNone(comp_obj)
135 del comp_obj
136
137 def test_add_component_obj_non_python_comp_cls(self):
138 plugin = bt2.find_plugin('text', find_in_user_dir=False, find_in_sys_dir=False)
139 assert plugin is not None
140 cc = plugin.source_component_classes['dmesg']
141 assert cc is not None
142
143 with self.assertRaises(ValueError):
144 self._graph.add_component(cc, 'salut', obj=57)
145
146 def test_add_component_invalid_cls_type(self):
147 with self.assertRaises(TypeError):
148 self._graph.add_component(int, 'salut')
149
150 def test_add_component_invalid_logging_level_type(self):
151 class MySink(bt2._UserSinkComponent):
152 def _user_consume(self):
153 pass
154
155 with self.assertRaises(TypeError):
156 self._graph.add_component(MySink, 'salut', logging_level='yo')
157
158 def test_add_component_invalid_logging_level_value(self):
159 class MySink(bt2._UserSinkComponent):
160 def _user_consume(self):
161 pass
162
163 with self.assertRaises(ValueError):
164 self._graph.add_component(MySink, 'salut', logging_level=12345)
165
166 def test_add_component_invalid_params_type(self):
167 class MySink(bt2._UserSinkComponent):
168 def _user_consume(self):
169 pass
170
171 with self.assertRaises(TypeError):
172 self._graph.add_component(MySink, 'salut', params=12)
173
174 def test_add_component_params_dict(self):
175 params_obj = None
176
177 class MySink(bt2._UserSinkComponent):
178 def __init__(self, config, params, obj):
179 nonlocal params_obj
180 params_obj = params
181
182 def _user_consume(self):
183 pass
184
185 params = {'plage': 12312}
186 self._graph.add_component(MySink, 'salut', params=params)
187
188 # Check equality and not identity because `add_component()` method
189 # converts the Python `dict` to a `bt2.MapValue`.
190 self.assertEqual(params, params_obj)
191
192 def test_add_component_params_mapvalue(self):
193 params_obj = None
194
195 class MySink(bt2._UserSinkComponent):
196 def __init__(self, config, params, obj):
197 nonlocal params_obj
198 params_obj = params
199
200 def _user_consume(self):
201 pass
202
203 params = bt2.MapValue({'beachclub': '121'})
204 self._graph.add_component(MySink, 'salut', params=params)
205
206 self.assertEqual(params, params_obj)
207
208 def test_add_component_logging_level(self):
209 class MySink(bt2._UserSinkComponent):
210 def _user_consume(self):
211 pass
212
213 comp = self._graph.add_component(
214 MySink, 'salut', logging_level=bt2.LoggingLevel.DEBUG
215 )
216 self.assertEqual(comp.logging_level, bt2.LoggingLevel.DEBUG)
217
218 def test_connect_ports(self):
219 class MySource(
220 bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
221 ):
222 def __init__(self, config, params, obj):
223 self._add_output_port('out')
224
225 class MySink(bt2._UserSinkComponent):
226 def __init__(self, config, params, obj):
227 self._add_input_port('in')
228
229 def _user_consume(self):
230 raise bt2.Stop
231
232 src = self._graph.add_component(MySource, 'src')
233 sink = self._graph.add_component(MySink, 'sink')
234
235 conn = self._graph.connect_ports(
236 src.output_ports['out'], sink.input_ports['in']
237 )
238 self.assertTrue(src.output_ports['out'].is_connected)
239 self.assertTrue(sink.input_ports['in'].is_connected)
240 self.assertEqual(src.output_ports['out'].connection.addr, conn.addr)
241 self.assertEqual(sink.input_ports['in'].connection.addr, conn.addr)
242
243 def test_connect_ports_invalid_direction(self):
244 class MySource(
245 bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
246 ):
247 def __init__(self, config, params, obj):
248 self._add_output_port('out')
249
250 class MySink(bt2._UserSinkComponent):
251 def __init__(self, config, params, obj):
252 self._add_input_port('in')
253
254 def _user_consume(self):
255 raise bt2.Stop
256
257 src = self._graph.add_component(MySource, 'src')
258 sink = self._graph.add_component(MySink, 'sink')
259
260 with self.assertRaises(TypeError):
261 self._graph.connect_ports(sink.input_ports['in'], src.output_ports['out'])
262
263 def test_add_interrupter(self):
264 class MyIter(bt2._UserMessageIterator):
265 def __next__(self):
266 raise TypeError
267
268 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
269 def __init__(self, config, params, obj):
270 self._add_output_port('out')
271
272 class MySink(bt2._UserSinkComponent):
273 def __init__(self, config, params, obj):
274 self._add_input_port('in')
275
276 def _user_consume(self):
277 next(self._msg_iter)
278
279 def _user_graph_is_configured(self):
280 self._msg_iter = self._create_input_port_message_iterator(
281 self._input_ports['in']
282 )
283
284 # add two interrupters, set one of them
285 interrupter1 = bt2.Interrupter()
286 interrupter2 = bt2.Interrupter()
287 self._graph.add_interrupter(interrupter1)
288 src = self._graph.add_component(MySource, 'src')
289 sink = self._graph.add_component(MySink, 'sink')
290 self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
291 self._graph.add_interrupter(interrupter2)
292
293 with self.assertRaises(bt2._Error):
294 self._graph.run()
295
296 interrupter2.set()
297
298 with self.assertRaises(bt2.TryAgain):
299 self._graph.run()
300
301 interrupter2.reset()
302
303 with self.assertRaises(bt2._Error):
304 self._graph.run()
305
306 # Test that Graph.run() raises bt2.Interrupted if the graph gets
307 # interrupted during execution.
308 def test_interrupt_while_running(self):
309 class MyIter(_MyIter):
310 def __next__(self):
311 return self._create_stream_beginning_message(self._stream)
312
313 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
314 def __init__(self, config, params, obj):
315 self._add_output_port('out')
316
317 class MySink(bt2._UserSinkComponent):
318 def __init__(self, config, params, obj):
319 self._add_input_port('in')
320
321 def _user_consume(self):
322 # Pretend that somebody asynchronously interrupted the graph.
323 nonlocal graph
324 graph.default_interrupter.set()
325 return next(self._msg_iter)
326
327 def _user_graph_is_configured(self):
328 self._msg_iter = self._create_input_port_message_iterator(
329 self._input_ports['in']
330 )
331
332 graph = self._graph
333 up = self._graph.add_component(MySource, 'down')
334 down = self._graph.add_component(MySink, 'up')
335 self._graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
336
337 with self.assertRaises(bt2.TryAgain):
338 self._graph.run()
339
340 def test_run(self):
341 class MyIter(_MyIter):
342 def __next__(self):
343 if self._at == 9:
344 raise StopIteration
345
346 if self._at == 0:
347 msg = self._create_stream_beginning_message(self._stream)
348 elif self._at == 1:
349 msg = self._create_packet_beginning_message(self._packet)
350 elif self._at == 7:
351 msg = self._create_packet_end_message(self._packet)
352 elif self._at == 8:
353 msg = self._create_stream_end_message(self._stream)
354 else:
355 msg = self._create_event_message(self._ec, self._packet)
356
357 self._at += 1
358 return msg
359
360 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
361 def __init__(self, config, params, obj):
362 self._add_output_port('out')
363
364 class MySink(bt2._UserSinkComponent):
365 def __init__(self, config, params, obj):
366 self._input_port = self._add_input_port('in')
367 self._at = 0
368
369 def _user_consume(comp_self):
370 msg = next(comp_self._msg_iter)
371
372 if comp_self._at == 0:
373 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
374 elif comp_self._at == 1:
375 self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
376 elif comp_self._at >= 2 and comp_self._at <= 6:
377 self.assertIs(type(msg), bt2._EventMessageConst)
378 self.assertEqual(msg.event.cls.name, 'salut')
379 elif comp_self._at == 7:
380 self.assertIs(type(msg), bt2._PacketEndMessageConst)
381 elif comp_self._at == 8:
382 self.assertIs(type(msg), bt2._StreamEndMessageConst)
383
384 comp_self._at += 1
385
386 def _user_graph_is_configured(self):
387 self._msg_iter = self._create_input_port_message_iterator(
388 self._input_port
389 )
390
391 src = self._graph.add_component(MySource, 'src')
392 sink = self._graph.add_component(MySink, 'sink')
393 self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
394 self._graph.run()
395
396 def test_run_once(self):
397 class MyIter(_MyIter):
398 pass
399
400 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
401 def __init__(self, config, params, obj):
402 self._add_output_port('out')
403
404 class MySink(bt2._UserSinkComponent):
405 def __init__(self, config, params, obj):
406 self._input_port = self._add_input_port('in')
407
408 def _user_consume(comp_self):
409 nonlocal run_count
410 run_count += 1
411 raise bt2.TryAgain
412
413 run_count = 0
414 src = self._graph.add_component(MySource, 'src')
415 sink = self._graph.add_component(MySink, 'sink')
416 self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
417
418 with self.assertRaises(bt2.TryAgain):
419 self._graph.run_once()
420
421 self.assertEqual(run_count, 1)
422
423 def test_run_once_stops(self):
424 class MyIter(_MyIter):
425 pass
426
427 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
428 def __init__(self, config, params, obj):
429 self._add_output_port('out')
430
431 class MySink(bt2._UserSinkComponent):
432 def __init__(self, config, params, obj):
433 self._input_port = self._add_input_port('in')
434
435 def _user_consume(comp_self):
436 raise bt2.Stop
437
438 src = self._graph.add_component(MySource, 'src')
439 sink = self._graph.add_component(MySink, 'sink')
440 self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
441
442 with self.assertRaises(bt2.Stop):
443 self._graph.run_once()
444
445 def test_run_again(self):
446 class MyIter(_MyIter):
447 def __next__(self):
448 if self._at == 3:
449 raise bt2.TryAgain
450
451 if self._at == 0:
452 msg = self._create_stream_beginning_message(self._stream)
453 elif self._at == 1:
454 msg = self._create_packet_beginning_message(self._packet)
455 elif self._at == 2:
456 msg = self._create_event_message(self._ec, self._packet)
457
458 self._at += 1
459 return msg
460
461 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
462 def __init__(self, config, params, obj):
463 self._add_output_port('out')
464
465 class MySink(bt2._UserSinkComponent):
466 def __init__(self, config, params, obj):
467 self._input_port = self._add_input_port('in')
468 self._at = 0
469
470 def _user_consume(comp_self):
471 msg = next(comp_self._msg_iter)
472 if comp_self._at == 0:
473 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
474 elif comp_self._at == 1:
475 self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
476 elif comp_self._at == 2:
477 self.assertIs(type(msg), bt2._EventMessageConst)
478 raise bt2.TryAgain
479 else:
480 pass
481
482 comp_self._at += 1
483
484 def _user_graph_is_configured(self):
485 self._msg_iter = self._create_input_port_message_iterator(
486 self._input_port
487 )
488
489 src = self._graph.add_component(MySource, 'src')
490 sink = self._graph.add_component(MySink, 'sink')
491 self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
492
493 with self.assertRaises(bt2.TryAgain):
494 self._graph.run()
495
496 def test_run_error(self):
497 raised_in_sink = False
498
499 class MyIter(_MyIter):
500 def __next__(self):
501 # If this gets called after the sink raised an exception, it is
502 # an error.
503 nonlocal raised_in_sink
504 assert raised_in_sink is False
505
506 if self._at == 0:
507 msg = self._create_stream_beginning_message(self._stream)
508 elif self._at == 1:
509 msg = self._create_packet_beginning_message(self._packet)
510 elif self._at == 2 or self._at == 3:
511 msg = self._create_event_message(self._ec, self._packet)
512 else:
513 raise bt2.TryAgain
514 self._at += 1
515 return msg
516
517 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
518 def __init__(self, config, params, obj):
519 self._add_output_port('out')
520
521 class MySink(bt2._UserSinkComponent):
522 def __init__(self, config, params, obj):
523 self._input_port = self._add_input_port('in')
524 self._at = 0
525
526 def _user_consume(comp_self):
527 msg = next(comp_self._msg_iter)
528 if comp_self._at == 0:
529 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
530 elif comp_self._at == 1:
531 self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
532 elif comp_self._at == 2:
533 self.assertIs(type(msg), bt2._EventMessageConst)
534 elif comp_self._at == 3:
535 nonlocal raised_in_sink
536 raised_in_sink = True
537 raise RuntimeError('error!')
538
539 comp_self._at += 1
540
541 def _user_graph_is_configured(self):
542 self._msg_iter = self._create_input_port_message_iterator(
543 self._input_port
544 )
545
546 src = self._graph.add_component(MySource, 'src')
547 sink = self._graph.add_component(MySink, 'sink')
548 self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
549
550 with self.assertRaises(bt2._Error):
551 self._graph.run()
552
553 def test_listeners(self):
554 class MySource(
555 bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
556 ):
557 def __init__(self, config, params, obj):
558 self._add_output_port('out')
559 self._add_output_port('zero')
560
561 class MySink(bt2._UserSinkComponent):
562 def __init__(self, config, params, obj):
563 self._add_input_port('in')
564
565 def _user_consume(self):
566 raise bt2.Stop
567
568 def _user_port_connected(self, port, other_port):
569 self._add_input_port('taste')
570
571 def port_added_listener(component, port):
572 nonlocal calls
573 calls.append((port_added_listener, component, port))
574
575 calls = []
576 self._graph.add_port_added_listener(port_added_listener)
577 src = self._graph.add_component(MySource, 'src')
578 sink = self._graph.add_component(MySink, 'sink')
579 self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
580
581 self.assertEqual(len(calls), 4)
582
583 self.assertIs(calls[0][0], port_added_listener)
584 self.assertEqual(calls[0][1].name, 'src')
585 self.assertEqual(calls[0][2].name, 'out')
586
587 self.assertIs(calls[1][0], port_added_listener)
588 self.assertEqual(calls[1][1].name, 'src')
589 self.assertEqual(calls[1][2].name, 'zero')
590
591 self.assertIs(calls[2][0], port_added_listener)
592 self.assertEqual(calls[2][1].name, 'sink')
593 self.assertEqual(calls[2][2].name, 'in')
594
595 self.assertIs(calls[3][0], port_added_listener)
596 self.assertEqual(calls[3][1].name, 'sink')
597 self.assertEqual(calls[3][2].name, 'taste')
598
599 def test_invalid_listeners(self):
600 class MySource(
601 bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
602 ):
603 def __init__(self, config, params, obj):
604 self._add_output_port('out')
605 self._add_output_port('zero')
606
607 class MySink(bt2._UserSinkComponent):
608 def __init__(self, config, params, obj):
609 self._add_input_port('in')
610
611 def _user_consume(self):
612 raise bt2.Stop
613
614 def _user_port_connected(self, port, other_port):
615 self._add_input_port('taste')
616
617 with self.assertRaises(TypeError):
618 self._graph.add_port_added_listener(1234)
619
620 def test_raise_in_component_init(self):
621 class MySink(bt2._UserSinkComponent):
622 def __init__(self, config, params, obj):
623 raise ValueError('oops!')
624
625 def _user_consume(self):
626 raise bt2.Stop
627
628 graph = bt2.Graph()
629
630 with self.assertRaises(bt2._Error):
631 graph.add_component(MySink, 'comp')
632
633 def test_raise_in_port_added_listener(self):
634 class MySink(bt2._UserSinkComponent):
635 def __init__(self, config, params, obj):
636 self._add_input_port('in')
637
638 def _user_consume(self):
639 raise bt2.Stop
640
641 def port_added_listener(component, port):
642 raise ValueError('oh noes!')
643
644 graph = bt2.Graph()
645 graph.add_port_added_listener(port_added_listener)
646
647 with self.assertRaises(bt2._Error):
648 graph.add_component(MySink, 'comp')
649
650
651 if __name__ == '__main__':
652 unittest.main()
This page took 0.043946 seconds and 4 git commands to generate.