Move to kernel style SPDX license identifiers
[babeltrace.git] / tests / bindings / python / bt2 / test_graph.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # Copyright (C) 2019 EfficiOS Inc.
4 #
5
6 import unittest
7 import bt2
8
9
10 class _MyIter(bt2._UserMessageIterator):
11 def __init__(self, config, self_output_port):
12 self._build_meta()
13 self._at = 0
14
15 def _build_meta(self):
16 self._tc = self._component._create_trace_class()
17 self._t = self._tc()
18 self._sc = self._tc.create_stream_class(supports_packets=True)
19 self._ec = self._sc.create_event_class(name='salut')
20 self._my_int_ft = self._tc.create_signed_integer_field_class(32)
21 payload_ft = self._tc.create_structure_field_class()
22 payload_ft += [('my_int', self._my_int_ft)]
23 self._ec.payload_field_type = payload_ft
24 self._stream = self._t.create_stream(self._sc)
25 self._packet = self._stream.create_packet()
26
27 def _create_event(self, value):
28 ev = self._ec()
29 ev.payload_field['my_int'] = value
30 ev.packet = self._packet
31 return ev
32
33
34 class GraphTestCase(unittest.TestCase):
35 def setUp(self):
36 self._graph = bt2.Graph()
37
38 def tearDown(self):
39 del self._graph
40
41 def test_create_default(self):
42 bt2.Graph()
43
44 def test_create_known_mip_version(self):
45 bt2.Graph(0)
46
47 def test_create_invalid_mip_version_type(self):
48 with self.assertRaises(TypeError):
49 bt2.Graph('')
50
51 def test_create_unknown_mip_version(self):
52 with self.assertRaisesRegex(ValueError, 'unknown MIP version'):
53 bt2.Graph(1)
54
55 def test_default_interrupter(self):
56 interrupter = self._graph.default_interrupter
57 self.assertIs(type(interrupter), bt2.Interrupter)
58
59 def test_add_component_user_cls(self):
60 class MySink(bt2._UserSinkComponent):
61 def _user_consume(self):
62 pass
63
64 comp = self._graph.add_component(MySink, 'salut')
65 self.assertEqual(comp.name, 'salut')
66
67 def test_add_component_gen_cls(self):
68 class MySink(bt2._UserSinkComponent):
69 def _user_consume(self):
70 pass
71
72 comp = self._graph.add_component(MySink, 'salut')
73 assert comp
74 comp2 = self._graph.add_component(comp.cls, 'salut2')
75 self.assertEqual(comp2.name, 'salut2')
76
77 def test_add_component_params(self):
78 comp_params = None
79
80 class MySink(bt2._UserSinkComponent):
81 def __init__(self, config, params, obj):
82 nonlocal comp_params
83 comp_params = params
84
85 def _user_consume(self):
86 pass
87
88 params = {'hello': 23, 'path': '/path/to/stuff'}
89 self._graph.add_component(MySink, 'salut', params)
90 self.assertEqual(params, comp_params)
91 del comp_params
92
93 def test_add_component_obj_python_comp_cls(self):
94 comp_obj = None
95
96 class MySink(bt2._UserSinkComponent):
97 def __init__(self, config, params, obj):
98 nonlocal comp_obj
99 comp_obj = obj
100
101 def _user_consume(self):
102 pass
103
104 obj = object()
105 self._graph.add_component(MySink, 'salut', obj=obj)
106 self.assertIs(comp_obj, obj)
107 del comp_obj
108
109 def test_add_component_obj_none_python_comp_cls(self):
110 comp_obj = None
111
112 class MySink(bt2._UserSinkComponent):
113 def __init__(self, config, params, obj):
114 nonlocal comp_obj
115 comp_obj = obj
116
117 def _user_consume(self):
118 pass
119
120 self._graph.add_component(MySink, 'salut')
121 self.assertIsNone(comp_obj)
122 del comp_obj
123
124 def test_add_component_obj_non_python_comp_cls(self):
125 plugin = bt2.find_plugin('text', find_in_user_dir=False, find_in_sys_dir=False)
126 assert plugin is not None
127 cc = plugin.source_component_classes['dmesg']
128 assert cc is not None
129
130 with self.assertRaises(ValueError):
131 self._graph.add_component(cc, 'salut', obj=57)
132
133 def test_add_component_invalid_cls_type(self):
134 with self.assertRaises(TypeError):
135 self._graph.add_component(int, 'salut')
136
137 def test_add_component_invalid_logging_level_type(self):
138 class MySink(bt2._UserSinkComponent):
139 def _user_consume(self):
140 pass
141
142 with self.assertRaises(TypeError):
143 self._graph.add_component(MySink, 'salut', logging_level='yo')
144
145 def test_add_component_invalid_logging_level_value(self):
146 class MySink(bt2._UserSinkComponent):
147 def _user_consume(self):
148 pass
149
150 with self.assertRaises(ValueError):
151 self._graph.add_component(MySink, 'salut', logging_level=12345)
152
153 def test_add_component_invalid_params_type(self):
154 class MySink(bt2._UserSinkComponent):
155 def _user_consume(self):
156 pass
157
158 with self.assertRaises(TypeError):
159 self._graph.add_component(MySink, 'salut', params=12)
160
161 def test_add_component_params_dict(self):
162 params_obj = None
163
164 class MySink(bt2._UserSinkComponent):
165 def __init__(self, config, params, obj):
166 nonlocal params_obj
167 params_obj = params
168
169 def _user_consume(self):
170 pass
171
172 params = {'plage': 12312}
173 self._graph.add_component(MySink, 'salut', params=params)
174
175 # Check equality and not identity because `add_component()` method
176 # converts the Python `dict` to a `bt2.MapValue`.
177 self.assertEqual(params, params_obj)
178
179 def test_add_component_params_mapvalue(self):
180 params_obj = None
181
182 class MySink(bt2._UserSinkComponent):
183 def __init__(self, config, params, obj):
184 nonlocal params_obj
185 params_obj = params
186
187 def _user_consume(self):
188 pass
189
190 params = bt2.MapValue({'beachclub': '121'})
191 self._graph.add_component(MySink, 'salut', params=params)
192
193 self.assertEqual(params, params_obj)
194
195 def test_add_component_logging_level(self):
196 class MySink(bt2._UserSinkComponent):
197 def _user_consume(self):
198 pass
199
200 comp = self._graph.add_component(
201 MySink, 'salut', logging_level=bt2.LoggingLevel.DEBUG
202 )
203 self.assertEqual(comp.logging_level, bt2.LoggingLevel.DEBUG)
204
205 def test_connect_ports(self):
206 class MySource(
207 bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
208 ):
209 def __init__(self, config, params, obj):
210 self._add_output_port('out')
211
212 class MySink(bt2._UserSinkComponent):
213 def __init__(self, config, params, obj):
214 self._add_input_port('in')
215
216 def _user_consume(self):
217 raise bt2.Stop
218
219 src = self._graph.add_component(MySource, 'src')
220 sink = self._graph.add_component(MySink, 'sink')
221
222 conn = self._graph.connect_ports(
223 src.output_ports['out'], sink.input_ports['in']
224 )
225 self.assertTrue(src.output_ports['out'].is_connected)
226 self.assertTrue(sink.input_ports['in'].is_connected)
227 self.assertEqual(src.output_ports['out'].connection.addr, conn.addr)
228 self.assertEqual(sink.input_ports['in'].connection.addr, conn.addr)
229
230 def test_connect_ports_invalid_direction(self):
231 class MySource(
232 bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
233 ):
234 def __init__(self, config, params, obj):
235 self._add_output_port('out')
236
237 class MySink(bt2._UserSinkComponent):
238 def __init__(self, config, params, obj):
239 self._add_input_port('in')
240
241 def _user_consume(self):
242 raise bt2.Stop
243
244 src = self._graph.add_component(MySource, 'src')
245 sink = self._graph.add_component(MySink, 'sink')
246
247 with self.assertRaises(TypeError):
248 self._graph.connect_ports(sink.input_ports['in'], src.output_ports['out'])
249
250 def test_add_interrupter(self):
251 class MyIter(bt2._UserMessageIterator):
252 def __next__(self):
253 raise TypeError
254
255 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
256 def __init__(self, config, params, obj):
257 self._add_output_port('out')
258
259 class MySink(bt2._UserSinkComponent):
260 def __init__(self, config, params, obj):
261 self._add_input_port('in')
262
263 def _user_consume(self):
264 next(self._msg_iter)
265
266 def _user_graph_is_configured(self):
267 self._msg_iter = self._create_message_iterator(self._input_ports['in'])
268
269 # add two interrupters, set one of them
270 interrupter1 = bt2.Interrupter()
271 interrupter2 = bt2.Interrupter()
272 self._graph.add_interrupter(interrupter1)
273 src = self._graph.add_component(MySource, 'src')
274 sink = self._graph.add_component(MySink, 'sink')
275 self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
276 self._graph.add_interrupter(interrupter2)
277
278 with self.assertRaises(bt2._Error):
279 self._graph.run()
280
281 interrupter2.set()
282
283 with self.assertRaises(bt2.TryAgain):
284 self._graph.run()
285
286 interrupter2.reset()
287
288 with self.assertRaises(bt2._Error):
289 self._graph.run()
290
291 # Test that Graph.run() raises bt2.Interrupted if the graph gets
292 # interrupted during execution.
293 def test_interrupt_while_running(self):
294 class MyIter(_MyIter):
295 def __next__(self):
296 return self._create_stream_beginning_message(self._stream)
297
298 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
299 def __init__(self, config, params, obj):
300 self._add_output_port('out')
301
302 class MySink(bt2._UserSinkComponent):
303 def __init__(self, config, params, obj):
304 self._add_input_port('in')
305
306 def _user_consume(self):
307 # Pretend that somebody asynchronously interrupted the graph.
308 nonlocal graph
309 graph.default_interrupter.set()
310 return next(self._msg_iter)
311
312 def _user_graph_is_configured(self):
313 self._msg_iter = self._create_message_iterator(self._input_ports['in'])
314
315 graph = self._graph
316 up = self._graph.add_component(MySource, 'down')
317 down = self._graph.add_component(MySink, 'up')
318 self._graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
319
320 with self.assertRaises(bt2.TryAgain):
321 self._graph.run()
322
323 def test_run(self):
324 class MyIter(_MyIter):
325 def __next__(self):
326 if self._at == 9:
327 raise StopIteration
328
329 if self._at == 0:
330 msg = self._create_stream_beginning_message(self._stream)
331 elif self._at == 1:
332 msg = self._create_packet_beginning_message(self._packet)
333 elif self._at == 7:
334 msg = self._create_packet_end_message(self._packet)
335 elif self._at == 8:
336 msg = self._create_stream_end_message(self._stream)
337 else:
338 msg = self._create_event_message(self._ec, self._packet)
339
340 self._at += 1
341 return msg
342
343 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
344 def __init__(self, config, params, obj):
345 self._add_output_port('out')
346
347 class MySink(bt2._UserSinkComponent):
348 def __init__(self, config, params, obj):
349 self._input_port = self._add_input_port('in')
350 self._at = 0
351
352 def _user_consume(comp_self):
353 msg = next(comp_self._msg_iter)
354
355 if comp_self._at == 0:
356 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
357 elif comp_self._at == 1:
358 self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
359 elif comp_self._at >= 2 and comp_self._at <= 6:
360 self.assertIs(type(msg), bt2._EventMessageConst)
361 self.assertEqual(msg.event.cls.name, 'salut')
362 elif comp_self._at == 7:
363 self.assertIs(type(msg), bt2._PacketEndMessageConst)
364 elif comp_self._at == 8:
365 self.assertIs(type(msg), bt2._StreamEndMessageConst)
366
367 comp_self._at += 1
368
369 def _user_graph_is_configured(self):
370 self._msg_iter = self._create_message_iterator(self._input_port)
371
372 src = self._graph.add_component(MySource, 'src')
373 sink = self._graph.add_component(MySink, 'sink')
374 self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
375 self._graph.run()
376
377 def test_run_once(self):
378 class MyIter(_MyIter):
379 pass
380
381 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
382 def __init__(self, config, params, obj):
383 self._add_output_port('out')
384
385 class MySink(bt2._UserSinkComponent):
386 def __init__(self, config, params, obj):
387 self._input_port = self._add_input_port('in')
388
389 def _user_consume(comp_self):
390 nonlocal run_count
391 run_count += 1
392 raise bt2.TryAgain
393
394 run_count = 0
395 src = self._graph.add_component(MySource, 'src')
396 sink = self._graph.add_component(MySink, 'sink')
397 self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
398
399 with self.assertRaises(bt2.TryAgain):
400 self._graph.run_once()
401
402 self.assertEqual(run_count, 1)
403
404 def test_run_once_stops(self):
405 class MyIter(_MyIter):
406 pass
407
408 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
409 def __init__(self, config, params, obj):
410 self._add_output_port('out')
411
412 class MySink(bt2._UserSinkComponent):
413 def __init__(self, config, params, obj):
414 self._input_port = self._add_input_port('in')
415
416 def _user_consume(comp_self):
417 raise bt2.Stop
418
419 src = self._graph.add_component(MySource, 'src')
420 sink = self._graph.add_component(MySink, 'sink')
421 self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
422
423 with self.assertRaises(bt2.Stop):
424 self._graph.run_once()
425
426 def test_run_again(self):
427 class MyIter(_MyIter):
428 def __next__(self):
429 if self._at == 3:
430 raise bt2.TryAgain
431
432 if self._at == 0:
433 msg = self._create_stream_beginning_message(self._stream)
434 elif self._at == 1:
435 msg = self._create_packet_beginning_message(self._packet)
436 elif self._at == 2:
437 msg = self._create_event_message(self._ec, self._packet)
438
439 self._at += 1
440 return msg
441
442 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
443 def __init__(self, config, params, obj):
444 self._add_output_port('out')
445
446 class MySink(bt2._UserSinkComponent):
447 def __init__(self, config, params, obj):
448 self._input_port = self._add_input_port('in')
449 self._at = 0
450
451 def _user_consume(comp_self):
452 msg = next(comp_self._msg_iter)
453 if comp_self._at == 0:
454 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
455 elif comp_self._at == 1:
456 self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
457 elif comp_self._at == 2:
458 self.assertIs(type(msg), bt2._EventMessageConst)
459 raise bt2.TryAgain
460 else:
461 pass
462
463 comp_self._at += 1
464
465 def _user_graph_is_configured(self):
466 self._msg_iter = self._create_message_iterator(self._input_port)
467
468 src = self._graph.add_component(MySource, 'src')
469 sink = self._graph.add_component(MySink, 'sink')
470 self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
471
472 with self.assertRaises(bt2.TryAgain):
473 self._graph.run()
474
475 def test_run_error(self):
476 raised_in_sink = False
477
478 class MyIter(_MyIter):
479 def __next__(self):
480 # If this gets called after the sink raised an exception, it is
481 # an error.
482 nonlocal raised_in_sink
483 assert raised_in_sink is False
484
485 if self._at == 0:
486 msg = self._create_stream_beginning_message(self._stream)
487 elif self._at == 1:
488 msg = self._create_packet_beginning_message(self._packet)
489 elif self._at == 2 or self._at == 3:
490 msg = self._create_event_message(self._ec, self._packet)
491 else:
492 raise bt2.TryAgain
493 self._at += 1
494 return msg
495
496 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
497 def __init__(self, config, params, obj):
498 self._add_output_port('out')
499
500 class MySink(bt2._UserSinkComponent):
501 def __init__(self, config, params, obj):
502 self._input_port = self._add_input_port('in')
503 self._at = 0
504
505 def _user_consume(comp_self):
506 msg = next(comp_self._msg_iter)
507 if comp_self._at == 0:
508 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
509 elif comp_self._at == 1:
510 self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
511 elif comp_self._at == 2:
512 self.assertIs(type(msg), bt2._EventMessageConst)
513 elif comp_self._at == 3:
514 nonlocal raised_in_sink
515 raised_in_sink = True
516 raise RuntimeError('error!')
517
518 comp_self._at += 1
519
520 def _user_graph_is_configured(self):
521 self._msg_iter = self._create_message_iterator(self._input_port)
522
523 src = self._graph.add_component(MySource, 'src')
524 sink = self._graph.add_component(MySink, 'sink')
525 self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
526
527 with self.assertRaises(bt2._Error):
528 self._graph.run()
529
530 def test_listeners(self):
531 class MySource(
532 bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
533 ):
534 def __init__(self, config, params, obj):
535 self._add_output_port('out')
536 self._add_output_port('zero')
537
538 class MySink(bt2._UserSinkComponent):
539 def __init__(self, config, params, obj):
540 self._add_input_port('in')
541
542 def _user_consume(self):
543 raise bt2.Stop
544
545 def _user_port_connected(self, port, other_port):
546 self._add_input_port('taste')
547
548 def port_added_listener(component, port):
549 nonlocal calls
550 calls.append((port_added_listener, component, port))
551
552 calls = []
553 self._graph.add_port_added_listener(port_added_listener)
554 src = self._graph.add_component(MySource, 'src')
555 sink = self._graph.add_component(MySink, 'sink')
556 self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
557
558 self.assertEqual(len(calls), 4)
559
560 self.assertIs(calls[0][0], port_added_listener)
561 self.assertEqual(calls[0][1].name, 'src')
562 self.assertEqual(calls[0][2].name, 'out')
563
564 self.assertIs(calls[1][0], port_added_listener)
565 self.assertEqual(calls[1][1].name, 'src')
566 self.assertEqual(calls[1][2].name, 'zero')
567
568 self.assertIs(calls[2][0], port_added_listener)
569 self.assertEqual(calls[2][1].name, 'sink')
570 self.assertEqual(calls[2][2].name, 'in')
571
572 self.assertIs(calls[3][0], port_added_listener)
573 self.assertEqual(calls[3][1].name, 'sink')
574 self.assertEqual(calls[3][2].name, 'taste')
575
576 def test_invalid_listeners(self):
577 class MySource(
578 bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
579 ):
580 def __init__(self, config, params, obj):
581 self._add_output_port('out')
582 self._add_output_port('zero')
583
584 class MySink(bt2._UserSinkComponent):
585 def __init__(self, config, params, obj):
586 self._add_input_port('in')
587
588 def _user_consume(self):
589 raise bt2.Stop
590
591 def _user_port_connected(self, port, other_port):
592 self._add_input_port('taste')
593
594 with self.assertRaises(TypeError):
595 self._graph.add_port_added_listener(1234)
596
597 def test_raise_in_component_init(self):
598 class MySink(bt2._UserSinkComponent):
599 def __init__(self, config, params, obj):
600 raise ValueError('oops!')
601
602 def _user_consume(self):
603 raise bt2.Stop
604
605 graph = bt2.Graph()
606
607 with self.assertRaises(bt2._Error):
608 graph.add_component(MySink, 'comp')
609
610 def test_raise_in_port_added_listener(self):
611 class MySink(bt2._UserSinkComponent):
612 def __init__(self, config, params, obj):
613 self._add_input_port('in')
614
615 def _user_consume(self):
616 raise bt2.Stop
617
618 def port_added_listener(component, port):
619 raise ValueError('oh noes!')
620
621 graph = bt2.Graph()
622 graph.add_port_added_listener(port_added_listener)
623
624 with self.assertRaises(bt2._Error):
625 graph.add_component(MySink, 'comp')
626
627
628 if __name__ == '__main__':
629 unittest.main()
This page took 0.044345 seconds and 5 git commands to generate.