347f8803dba0768e946e9b2de59e81f7fe78958f
[babeltrace.git] / tests / bindings / python / bt2 / test_graph.py
1 #
2 # Copyright (C) 2019 EfficiOS Inc.
3 #
4 # This program is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License
6 # as published by the Free Software Foundation; only version 2
7 # of the License.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 #
18
19 import unittest
20 import bt2
21
22
23 class _MyIter(bt2._UserMessageIterator):
24 def __init__(self, self_output_port):
25 self._build_meta()
26 self._at = 0
27
28 def _build_meta(self):
29 self._tc = self._component._create_trace_class()
30 self._t = self._tc()
31 self._sc = self._tc.create_stream_class(supports_packets=True)
32 self._ec = self._sc.create_event_class(name='salut')
33 self._my_int_ft = self._tc.create_signed_integer_field_class(32)
34 payload_ft = self._tc.create_structure_field_class()
35 payload_ft += [('my_int', self._my_int_ft)]
36 self._ec.payload_field_type = payload_ft
37 self._stream = self._t.create_stream(self._sc)
38 self._packet = self._stream.create_packet()
39
40 def _create_event(self, value):
41 ev = self._ec()
42 ev.payload_field['my_int'] = value
43 ev.packet = self._packet
44 return ev
45
46
47 class GraphTestCase(unittest.TestCase):
48 def setUp(self):
49 self._graph = bt2.Graph()
50
51 def tearDown(self):
52 del self._graph
53
54 def test_create_default(self):
55 bt2.Graph()
56
57 def test_create_known_mip_version(self):
58 bt2.Graph(0)
59
60 def test_create_invalid_mip_version_type(self):
61 with self.assertRaises(TypeError):
62 bt2.Graph('')
63
64 def test_create_unknown_mip_version(self):
65 with self.assertRaisesRegex(ValueError, 'unknown MIP version'):
66 bt2.Graph(1)
67
68 def test_add_component_user_cls(self):
69 class MySink(bt2._UserSinkComponent):
70 def _user_consume(self):
71 pass
72
73 comp = self._graph.add_component(MySink, 'salut')
74 self.assertEqual(comp.name, 'salut')
75
76 def test_add_component_gen_cls(self):
77 class MySink(bt2._UserSinkComponent):
78 def _user_consume(self):
79 pass
80
81 comp = self._graph.add_component(MySink, 'salut')
82 assert comp
83 comp2 = self._graph.add_component(comp.cls, 'salut2')
84 self.assertEqual(comp2.name, 'salut2')
85
86 def test_add_component_params(self):
87 comp_params = None
88
89 class MySink(bt2._UserSinkComponent):
90 def __init__(self, config, params, obj):
91 nonlocal comp_params
92 comp_params = params
93
94 def _user_consume(self):
95 pass
96
97 params = {'hello': 23, 'path': '/path/to/stuff'}
98 self._graph.add_component(MySink, 'salut', params)
99 self.assertEqual(params, comp_params)
100 del comp_params
101
102 def test_add_component_obj_python_comp_cls(self):
103 comp_obj = None
104
105 class MySink(bt2._UserSinkComponent):
106 def __init__(self, config, params, obj):
107 nonlocal comp_obj
108 comp_obj = obj
109
110 def _user_consume(self):
111 pass
112
113 obj = object()
114 self._graph.add_component(MySink, 'salut', obj=obj)
115 self.assertIs(comp_obj, obj)
116 del comp_obj
117
118 def test_add_component_obj_none_python_comp_cls(self):
119 comp_obj = None
120
121 class MySink(bt2._UserSinkComponent):
122 def __init__(self, config, params, obj):
123 nonlocal comp_obj
124 comp_obj = obj
125
126 def _user_consume(self):
127 pass
128
129 self._graph.add_component(MySink, 'salut')
130 self.assertIsNone(comp_obj)
131 del comp_obj
132
133 def test_add_component_obj_non_python_comp_cls(self):
134 plugin = bt2.find_plugin('text', find_in_user_dir=False, find_in_sys_dir=False)
135 assert plugin is not None
136 cc = plugin.source_component_classes['dmesg']
137 assert cc is not None
138
139 with self.assertRaises(ValueError):
140 self._graph.add_component(cc, 'salut', obj=57)
141
142 def test_add_component_invalid_cls_type(self):
143 with self.assertRaises(TypeError):
144 self._graph.add_component(int, 'salut')
145
146 def test_add_component_invalid_logging_level_type(self):
147 class MySink(bt2._UserSinkComponent):
148 def _user_consume(self):
149 pass
150
151 with self.assertRaises(TypeError):
152 self._graph.add_component(MySink, 'salut', logging_level='yo')
153
154 def test_add_component_invalid_logging_level_value(self):
155 class MySink(bt2._UserSinkComponent):
156 def _user_consume(self):
157 pass
158
159 with self.assertRaises(ValueError):
160 self._graph.add_component(MySink, 'salut', logging_level=12345)
161
162 def test_add_component_logging_level(self):
163 class MySink(bt2._UserSinkComponent):
164 def _user_consume(self):
165 pass
166
167 comp = self._graph.add_component(
168 MySink, 'salut', logging_level=bt2.LoggingLevel.DEBUG
169 )
170 self.assertEqual(comp.logging_level, bt2.LoggingLevel.DEBUG)
171
172 def test_connect_ports(self):
173 class MyIter(bt2._UserMessageIterator):
174 def __next__(self):
175 raise bt2.Stop
176
177 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
178 def __init__(self, config, params, obj):
179 self._add_output_port('out')
180
181 class MySink(bt2._UserSinkComponent):
182 def __init__(self, config, params, obj):
183 self._add_input_port('in')
184
185 def _user_consume(self):
186 raise bt2.Stop
187
188 src = self._graph.add_component(MySource, 'src')
189 sink = self._graph.add_component(MySink, 'sink')
190
191 conn = self._graph.connect_ports(
192 src.output_ports['out'], sink.input_ports['in']
193 )
194 self.assertTrue(src.output_ports['out'].is_connected)
195 self.assertTrue(sink.input_ports['in'].is_connected)
196 self.assertEqual(src.output_ports['out'].connection.addr, conn.addr)
197 self.assertEqual(sink.input_ports['in'].connection.addr, conn.addr)
198
199 def test_connect_ports_invalid_direction(self):
200 class MyIter(bt2._UserMessageIterator):
201 def __next__(self):
202 raise bt2.Stop
203
204 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
205 def __init__(self, config, params, obj):
206 self._add_output_port('out')
207
208 class MySink(bt2._UserSinkComponent):
209 def __init__(self, config, params, obj):
210 self._add_input_port('in')
211
212 def _user_consume(self):
213 raise bt2.Stop
214
215 src = self._graph.add_component(MySource, 'src')
216 sink = self._graph.add_component(MySink, 'sink')
217
218 with self.assertRaises(TypeError):
219 self._graph.connect_ports(sink.input_ports['in'], src.output_ports['out'])
220
221 def test_add_interrupter(self):
222 class MyIter(bt2._UserMessageIterator):
223 def __next__(self):
224 raise TypeError
225
226 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
227 def __init__(self, config, params, obj):
228 self._add_output_port('out')
229
230 class MySink(bt2._UserSinkComponent):
231 def __init__(self, config, params, obj):
232 self._add_input_port('in')
233
234 def _user_consume(self):
235 next(self._msg_iter)
236
237 def _user_graph_is_configured(self):
238 self._msg_iter = self._create_input_port_message_iterator(
239 self._input_ports['in']
240 )
241
242 # add two interrupters, set one of them
243 interrupter1 = bt2.Interrupter()
244 interrupter2 = bt2.Interrupter()
245 self._graph.add_interrupter(interrupter1)
246 src = self._graph.add_component(MySource, 'src')
247 sink = self._graph.add_component(MySink, 'sink')
248 self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
249 self._graph.add_interrupter(interrupter2)
250
251 with self.assertRaises(bt2._Error):
252 self._graph.run()
253
254 interrupter2.set()
255
256 with self.assertRaises(bt2.TryAgain):
257 self._graph.run()
258
259 interrupter2.reset()
260
261 with self.assertRaises(bt2._Error):
262 self._graph.run()
263
264 # Test that Graph.run() raises bt2.Interrupted if the graph gets
265 # interrupted during execution.
266 def test_interrupt_while_running(self):
267 class MyIter(_MyIter):
268 def __next__(self):
269 return self._create_stream_beginning_message(self._stream)
270
271 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
272 def __init__(self, config, params, obj):
273 self._add_output_port('out')
274
275 class MySink(bt2._UserSinkComponent):
276 def __init__(self, config, params, obj):
277 self._add_input_port('in')
278
279 def _user_consume(self):
280 # Pretend that somebody asynchronously interrupted the graph.
281 nonlocal graph
282 graph.interrupt()
283 return next(self._msg_iter)
284
285 def _user_graph_is_configured(self):
286 self._msg_iter = self._create_input_port_message_iterator(
287 self._input_ports['in']
288 )
289
290 graph = self._graph
291 up = self._graph.add_component(MySource, 'down')
292 down = self._graph.add_component(MySink, 'up')
293 self._graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
294
295 with self.assertRaises(bt2.TryAgain):
296 self._graph.run()
297
298 def test_run(self):
299 class MyIter(_MyIter):
300 def __next__(self):
301 if self._at == 9:
302 raise StopIteration
303
304 if self._at == 0:
305 msg = self._create_stream_beginning_message(self._stream)
306 elif self._at == 1:
307 msg = self._create_packet_beginning_message(self._packet)
308 elif self._at == 7:
309 msg = self._create_packet_end_message(self._packet)
310 elif self._at == 8:
311 msg = self._create_stream_end_message(self._stream)
312 else:
313 msg = self._create_event_message(self._ec, self._packet)
314
315 self._at += 1
316 return msg
317
318 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
319 def __init__(self, config, params, obj):
320 self._add_output_port('out')
321
322 class MySink(bt2._UserSinkComponent):
323 def __init__(self, config, params, obj):
324 self._input_port = self._add_input_port('in')
325 self._at = 0
326
327 def _user_consume(comp_self):
328 msg = next(comp_self._msg_iter)
329
330 if comp_self._at == 0:
331 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
332 elif comp_self._at == 1:
333 self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
334 elif comp_self._at >= 2 and comp_self._at <= 6:
335 self.assertIs(type(msg), bt2._EventMessageConst)
336 self.assertEqual(msg.event.cls.name, 'salut')
337 elif comp_self._at == 7:
338 self.assertIs(type(msg), bt2._PacketEndMessageConst)
339 elif comp_self._at == 8:
340 self.assertIs(type(msg), bt2._StreamEndMessageConst)
341
342 comp_self._at += 1
343
344 def _user_graph_is_configured(self):
345 self._msg_iter = self._create_input_port_message_iterator(
346 self._input_port
347 )
348
349 src = self._graph.add_component(MySource, 'src')
350 sink = self._graph.add_component(MySink, 'sink')
351 self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
352 self._graph.run()
353
354 def test_run_once(self):
355 class MyIter(_MyIter):
356 pass
357
358 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
359 def __init__(self, config, params, obj):
360 self._add_output_port('out')
361
362 class MySink(bt2._UserSinkComponent):
363 def __init__(self, config, params, obj):
364 self._input_port = self._add_input_port('in')
365
366 def _user_consume(comp_self):
367 nonlocal run_count
368 run_count += 1
369 raise bt2.TryAgain
370
371 run_count = 0
372 src = self._graph.add_component(MySource, 'src')
373 sink = self._graph.add_component(MySink, 'sink')
374 self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
375
376 with self.assertRaises(bt2.TryAgain):
377 self._graph.run_once()
378
379 self.assertEqual(run_count, 1)
380
381 def test_run_once_stops(self):
382 class MyIter(_MyIter):
383 pass
384
385 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
386 def __init__(self, config, params, obj):
387 self._add_output_port('out')
388
389 class MySink(bt2._UserSinkComponent):
390 def __init__(self, config, params, obj):
391 self._input_port = self._add_input_port('in')
392
393 def _user_consume(comp_self):
394 raise bt2.Stop
395
396 src = self._graph.add_component(MySource, 'src')
397 sink = self._graph.add_component(MySink, 'sink')
398 self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
399
400 with self.assertRaises(bt2.Stop):
401 self._graph.run_once()
402
403 def test_run_again(self):
404 class MyIter(_MyIter):
405 def __next__(self):
406 if self._at == 3:
407 raise bt2.TryAgain
408
409 if self._at == 0:
410 msg = self._create_stream_beginning_message(self._stream)
411 elif self._at == 1:
412 msg = self._create_packet_beginning_message(self._packet)
413 elif self._at == 2:
414 msg = self._create_event_message(self._ec, self._packet)
415
416 self._at += 1
417 return msg
418
419 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
420 def __init__(self, config, params, obj):
421 self._add_output_port('out')
422
423 class MySink(bt2._UserSinkComponent):
424 def __init__(self, config, params, obj):
425 self._input_port = self._add_input_port('in')
426 self._at = 0
427
428 def _user_consume(comp_self):
429 msg = next(comp_self._msg_iter)
430 if comp_self._at == 0:
431 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
432 elif comp_self._at == 1:
433 self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
434 elif comp_self._at == 2:
435 self.assertIs(type(msg), bt2._EventMessageConst)
436 raise bt2.TryAgain
437 else:
438 pass
439
440 comp_self._at += 1
441
442 def _user_graph_is_configured(self):
443 self._msg_iter = self._create_input_port_message_iterator(
444 self._input_port
445 )
446
447 src = self._graph.add_component(MySource, 'src')
448 sink = self._graph.add_component(MySink, 'sink')
449 self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
450
451 with self.assertRaises(bt2.TryAgain):
452 self._graph.run()
453
454 def test_run_error(self):
455 raised_in_sink = False
456
457 class MyIter(_MyIter):
458 def __next__(self):
459 # If this gets called after the sink raised an exception, it is
460 # an error.
461 nonlocal raised_in_sink
462 assert raised_in_sink is False
463
464 if self._at == 0:
465 msg = self._create_stream_beginning_message(self._stream)
466 elif self._at == 1:
467 msg = self._create_packet_beginning_message(self._packet)
468 elif self._at == 2 or self._at == 3:
469 msg = self._create_event_message(self._ec, self._packet)
470 else:
471 raise bt2.TryAgain
472 self._at += 1
473 return msg
474
475 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
476 def __init__(self, config, params, obj):
477 self._add_output_port('out')
478
479 class MySink(bt2._UserSinkComponent):
480 def __init__(self, config, params, obj):
481 self._input_port = self._add_input_port('in')
482 self._at = 0
483
484 def _user_consume(comp_self):
485 msg = next(comp_self._msg_iter)
486 if comp_self._at == 0:
487 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
488 elif comp_self._at == 1:
489 self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
490 elif comp_self._at == 2:
491 self.assertIs(type(msg), bt2._EventMessageConst)
492 elif comp_self._at == 3:
493 nonlocal raised_in_sink
494 raised_in_sink = True
495 raise RuntimeError('error!')
496
497 comp_self._at += 1
498
499 def _user_graph_is_configured(self):
500 self._msg_iter = self._create_input_port_message_iterator(
501 self._input_port
502 )
503
504 src = self._graph.add_component(MySource, 'src')
505 sink = self._graph.add_component(MySink, 'sink')
506 self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
507
508 with self.assertRaises(bt2._Error):
509 self._graph.run()
510
511 def test_listeners(self):
512 class MyIter(bt2._UserMessageIterator):
513 def __next__(self):
514 raise bt2.Stop
515
516 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
517 def __init__(self, config, params, obj):
518 self._add_output_port('out')
519 self._add_output_port('zero')
520
521 class MySink(bt2._UserSinkComponent):
522 def __init__(self, config, params, obj):
523 self._add_input_port('in')
524
525 def _user_consume(self):
526 raise bt2.Stop
527
528 def _user_port_connected(self, port, other_port):
529 self._add_input_port('taste')
530
531 def port_added_listener(component, port):
532 nonlocal calls
533 calls.append((port_added_listener, component, port))
534
535 def ports_connected_listener(
536 upstream_component, upstream_port, downstream_component, downstream_port
537 ):
538 nonlocal calls
539 calls.append(
540 (
541 ports_connected_listener,
542 upstream_component,
543 upstream_port,
544 downstream_component,
545 downstream_port,
546 )
547 )
548
549 calls = []
550 self._graph.add_port_added_listener(port_added_listener)
551 self._graph.add_ports_connected_listener(ports_connected_listener)
552 src = self._graph.add_component(MySource, 'src')
553 sink = self._graph.add_component(MySink, 'sink')
554 self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
555
556 self.assertEqual(len(calls), 5)
557
558 self.assertIs(calls[0][0], port_added_listener)
559 self.assertEqual(calls[0][1].name, 'src')
560 self.assertEqual(calls[0][2].name, 'out')
561
562 self.assertIs(calls[1][0], port_added_listener)
563 self.assertEqual(calls[1][1].name, 'src')
564 self.assertEqual(calls[1][2].name, 'zero')
565
566 self.assertIs(calls[2][0], port_added_listener)
567 self.assertEqual(calls[2][1].name, 'sink')
568 self.assertEqual(calls[2][2].name, 'in')
569
570 self.assertIs(calls[3][0], port_added_listener)
571 self.assertEqual(calls[3][1].name, 'sink')
572 self.assertEqual(calls[3][2].name, 'taste')
573
574 self.assertIs(calls[4][0], ports_connected_listener)
575 self.assertEqual(calls[4][1].name, 'src')
576 self.assertEqual(calls[4][2].name, 'out')
577 self.assertEqual(calls[4][3].name, 'sink')
578 self.assertEqual(calls[4][4].name, 'in')
579
580 def test_invalid_listeners(self):
581 class MyIter(bt2._UserMessageIterator):
582 def __next__(self):
583 raise bt2.Stop
584
585 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
586 def __init__(self, config, params, obj):
587 self._add_output_port('out')
588 self._add_output_port('zero')
589
590 class MySink(bt2._UserSinkComponent):
591 def __init__(self, config, params, obj):
592 self._add_input_port('in')
593
594 def _user_consume(self):
595 raise bt2.Stop
596
597 def _user_port_connected(self, port, other_port):
598 self._add_input_port('taste')
599
600 with self.assertRaises(TypeError):
601 self._graph.add_port_added_listener(1234)
602 with self.assertRaises(TypeError):
603 self._graph.add_ports_connected_listener(1234)
604
605 def test_raise_in_component_init(self):
606 class MySink(bt2._UserSinkComponent):
607 def __init__(self, config, params, obj):
608 raise ValueError('oops!')
609
610 def _user_consume(self):
611 raise bt2.Stop
612
613 graph = bt2.Graph()
614
615 with self.assertRaises(bt2._Error):
616 graph.add_component(MySink, 'comp')
617
618 def test_raise_in_port_added_listener(self):
619 class MySink(bt2._UserSinkComponent):
620 def __init__(self, config, params, obj):
621 self._add_input_port('in')
622
623 def _user_consume(self):
624 raise bt2.Stop
625
626 def port_added_listener(component, port):
627 raise ValueError('oh noes!')
628
629 graph = bt2.Graph()
630 graph.add_port_added_listener(port_added_listener)
631
632 with self.assertRaises(bt2._Error):
633 graph.add_component(MySink, 'comp')
634
635 def test_raise_in_ports_connected_listener(self):
636 class MyIter(bt2._UserMessageIterator):
637 def __next__(self):
638 raise bt2.Stop
639
640 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
641 def __init__(self, config, params, obj):
642 self._add_output_port('out')
643
644 class MySink(bt2._UserSinkComponent):
645 def __init__(self, config, params, obj):
646 self._add_input_port('in')
647
648 def _user_consume(self):
649 raise bt2.Stop
650
651 def ports_connected_listener(
652 upstream_component, upstream_port, downstream_component, downstream_port
653 ):
654 raise ValueError('oh noes!')
655
656 graph = bt2.Graph()
657 graph.add_ports_connected_listener(ports_connected_listener)
658 up = graph.add_component(MySource, 'down')
659 down = graph.add_component(MySink, 'up')
660
661 with self.assertRaises(bt2._Error):
662 graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
663
664
665 if __name__ == '__main__':
666 unittest.main()
This page took 0.042886 seconds and 3 git commands to generate.