Remove `skip-string-normalization` in Python formatter config
[babeltrace.git] / tests / bindings / python / bt2 / test_graph.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # Copyright (C) 2019 EfficiOS Inc.
4 #
5
6 import unittest
7 import bt2
8
9
10 class _MyIter(bt2._UserMessageIterator):
11 def __init__(self, config, self_output_port):
12 self._build_meta()
13 self._at = 0
14
15 def _build_meta(self):
16 self._tc = self._component._create_trace_class()
17 self._t = self._tc()
18 self._sc = self._tc.create_stream_class(supports_packets=True)
19 self._ec = self._sc.create_event_class(name="salut")
20 self._my_int_ft = self._tc.create_signed_integer_field_class(32)
21 payload_ft = self._tc.create_structure_field_class()
22 payload_ft += [("my_int", self._my_int_ft)]
23 self._ec.payload_field_type = payload_ft
24 self._stream = self._t.create_stream(self._sc)
25 self._packet = self._stream.create_packet()
26
27 def _create_event(self, value):
28 ev = self._ec()
29 ev.payload_field["my_int"] = value
30 ev.packet = self._packet
31 return ev
32
33
34 class GraphTestCase(unittest.TestCase):
35 def setUp(self):
36 self._graph = bt2.Graph()
37
38 def tearDown(self):
39 del self._graph
40
41 def test_create_default(self):
42 bt2.Graph()
43
44 def test_create_known_mip_version(self):
45 bt2.Graph(0)
46
47 def test_create_invalid_mip_version_type(self):
48 with self.assertRaises(TypeError):
49 bt2.Graph("")
50
51 def test_create_unknown_mip_version(self):
52 with self.assertRaisesRegex(ValueError, "unknown MIP version"):
53 bt2.Graph(1)
54
55 def test_default_interrupter(self):
56 interrupter = self._graph.default_interrupter
57 self.assertIs(type(interrupter), bt2.Interrupter)
58
59 def test_add_component_user_cls(self):
60 class MySink(bt2._UserSinkComponent):
61 def _user_consume(self):
62 pass
63
64 comp = self._graph.add_component(MySink, "salut")
65 self.assertEqual(comp.name, "salut")
66
67 def test_add_component_gen_cls(self):
68 class MySink(bt2._UserSinkComponent):
69 def _user_consume(self):
70 pass
71
72 comp = self._graph.add_component(MySink, "salut")
73 assert comp
74 comp2 = self._graph.add_component(comp.cls, "salut2")
75 self.assertEqual(comp2.name, "salut2")
76
77 def test_add_component_params(self):
78 comp_params = None
79
80 class MySink(bt2._UserSinkComponent):
81 def __init__(self, config, params, obj):
82 nonlocal comp_params
83 comp_params = params
84
85 def _user_consume(self):
86 pass
87
88 params = {"hello": 23, "path": "/path/to/stuff"}
89 self._graph.add_component(MySink, "salut", params)
90 self.assertEqual(params, comp_params)
91 del comp_params
92
93 def test_add_component_obj_python_comp_cls(self):
94 comp_obj = None
95
96 class MySink(bt2._UserSinkComponent):
97 def __init__(self, config, params, obj):
98 nonlocal comp_obj
99 comp_obj = obj
100
101 def _user_consume(self):
102 pass
103
104 obj = object()
105 self._graph.add_component(MySink, "salut", obj=obj)
106 self.assertIs(comp_obj, obj)
107 del comp_obj
108
109 def test_add_component_obj_none_python_comp_cls(self):
110 comp_obj = None
111
112 class MySink(bt2._UserSinkComponent):
113 def __init__(self, config, params, obj):
114 nonlocal comp_obj
115 comp_obj = obj
116
117 def _user_consume(self):
118 pass
119
120 self._graph.add_component(MySink, "salut")
121 self.assertIsNone(comp_obj)
122 del comp_obj
123
124 def test_add_component_obj_non_python_comp_cls(self):
125 plugin = bt2.find_plugin("text", find_in_user_dir=False, find_in_sys_dir=False)
126 assert plugin is not None
127 cc = plugin.source_component_classes["dmesg"]
128 assert cc is not None
129
130 with self.assertRaises(ValueError):
131 self._graph.add_component(cc, "salut", obj=57)
132
133 def test_add_component_invalid_cls_type(self):
134 with self.assertRaises(TypeError):
135 self._graph.add_component(int, "salut")
136
137 def test_add_component_invalid_logging_level_type(self):
138 class MySink(bt2._UserSinkComponent):
139 def _user_consume(self):
140 pass
141
142 with self.assertRaises(TypeError):
143 self._graph.add_component(MySink, "salut", logging_level="yo")
144
145 def test_add_component_invalid_logging_level_value(self):
146 class MySink(bt2._UserSinkComponent):
147 def _user_consume(self):
148 pass
149
150 with self.assertRaises(ValueError):
151 self._graph.add_component(MySink, "salut", logging_level=12345)
152
153 def test_add_component_invalid_params_type(self):
154 class MySink(bt2._UserSinkComponent):
155 def _user_consume(self):
156 pass
157
158 with self.assertRaises(TypeError):
159 self._graph.add_component(MySink, "salut", params=12)
160
161 def test_add_component_params_dict(self):
162 params_obj = None
163
164 class MySink(bt2._UserSinkComponent):
165 def __init__(self, config, params, obj):
166 nonlocal params_obj
167 params_obj = params
168
169 def _user_consume(self):
170 pass
171
172 params = {"plage": 12312}
173 self._graph.add_component(MySink, "salut", params=params)
174
175 # Check equality and not identity because `add_component()` method
176 # converts the Python `dict` to a `bt2.MapValue`.
177 self.assertEqual(params, params_obj)
178
179 def test_add_component_params_mapvalue(self):
180 params_obj = None
181
182 class MySink(bt2._UserSinkComponent):
183 def __init__(self, config, params, obj):
184 nonlocal params_obj
185 params_obj = params
186
187 def _user_consume(self):
188 pass
189
190 params = bt2.MapValue({"beachclub": "121"})
191 self._graph.add_component(MySink, "salut", params=params)
192
193 self.assertEqual(params, params_obj)
194
195 def test_add_component_logging_level(self):
196 class MySink(bt2._UserSinkComponent):
197 def _user_consume(self):
198 pass
199
200 comp = self._graph.add_component(
201 MySink, "salut", logging_level=bt2.LoggingLevel.DEBUG
202 )
203 self.assertEqual(comp.logging_level, bt2.LoggingLevel.DEBUG)
204
205 def test_connect_ports(self):
206 class MySource(
207 bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
208 ):
209 def __init__(self, config, params, obj):
210 self._add_output_port("out")
211
212 class MySink(bt2._UserSinkComponent):
213 def __init__(self, config, params, obj):
214 self._add_input_port("in")
215
216 def _user_consume(self):
217 raise bt2.Stop
218
219 src = self._graph.add_component(MySource, "src")
220 sink = self._graph.add_component(MySink, "sink")
221
222 conn = self._graph.connect_ports(
223 src.output_ports["out"], sink.input_ports["in"]
224 )
225 self.assertTrue(src.output_ports["out"].is_connected)
226 self.assertTrue(sink.input_ports["in"].is_connected)
227 self.assertEqual(src.output_ports["out"].connection.addr, conn.addr)
228 self.assertEqual(sink.input_ports["in"].connection.addr, conn.addr)
229
230 def test_connect_ports_invalid_direction(self):
231 class MySource(
232 bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
233 ):
234 def __init__(self, config, params, obj):
235 self._add_output_port("out")
236
237 class MySink(bt2._UserSinkComponent):
238 def __init__(self, config, params, obj):
239 self._add_input_port("in")
240
241 def _user_consume(self):
242 raise bt2.Stop
243
244 src = self._graph.add_component(MySource, "src")
245 sink = self._graph.add_component(MySink, "sink")
246
247 with self.assertRaises(TypeError):
248 self._graph.connect_ports(sink.input_ports["in"], src.output_ports["out"])
249
250 def test_add_interrupter(self):
251 class MyIter(bt2._UserMessageIterator):
252 def __next__(self):
253 raise TypeError
254
255 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
256 def __init__(self, config, params, obj):
257 self._add_output_port("out")
258
259 class MySink(bt2._UserSinkComponent):
260 def __init__(self, config, params, obj):
261 self._add_input_port("in")
262
263 def _user_consume(self):
264 next(self._msg_iter)
265
266 def _user_graph_is_configured(self):
267 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
268
269 # add two interrupters, set one of them
270 interrupter1 = bt2.Interrupter()
271 interrupter2 = bt2.Interrupter()
272 self._graph.add_interrupter(interrupter1)
273 src = self._graph.add_component(MySource, "src")
274 sink = self._graph.add_component(MySink, "sink")
275 self._graph.connect_ports(src.output_ports["out"], sink.input_ports["in"])
276 self._graph.add_interrupter(interrupter2)
277
278 with self.assertRaises(bt2._Error):
279 self._graph.run()
280
281 interrupter2.set()
282
283 with self.assertRaises(bt2.TryAgain):
284 self._graph.run()
285
286 interrupter2.reset()
287
288 with self.assertRaises(bt2._Error):
289 self._graph.run()
290
291 # Test that Graph.run() raises bt2.Interrupted if the graph gets
292 # interrupted during execution.
293 def test_interrupt_while_running(self):
294 class MyIter(_MyIter):
295 def __next__(self):
296 return self._create_stream_beginning_message(self._stream)
297
298 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
299 def __init__(self, config, params, obj):
300 self._add_output_port("out")
301
302 class MySink(bt2._UserSinkComponent):
303 def __init__(self, config, params, obj):
304 self._add_input_port("in")
305
306 def _user_consume(self):
307 # Pretend that somebody asynchronously interrupted the graph.
308 nonlocal graph
309 graph.default_interrupter.set()
310 return next(self._msg_iter)
311
312 def _user_graph_is_configured(self):
313 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
314
315 graph = self._graph
316 up = self._graph.add_component(MySource, "down")
317 down = self._graph.add_component(MySink, "up")
318 self._graph.connect_ports(up.output_ports["out"], down.input_ports["in"])
319
320 with self.assertRaises(bt2.TryAgain):
321 self._graph.run()
322
323 def test_run(self):
324 class MyIter(_MyIter):
325 def __next__(self):
326 if self._at == 9:
327 raise StopIteration
328
329 if self._at == 0:
330 msg = self._create_stream_beginning_message(self._stream)
331 elif self._at == 1:
332 msg = self._create_packet_beginning_message(self._packet)
333 elif self._at == 7:
334 msg = self._create_packet_end_message(self._packet)
335 elif self._at == 8:
336 msg = self._create_stream_end_message(self._stream)
337 else:
338 msg = self._create_event_message(self._ec, self._packet)
339
340 self._at += 1
341 return msg
342
343 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
344 def __init__(self, config, params, obj):
345 self._add_output_port("out")
346
347 class MySink(bt2._UserSinkComponent):
348 def __init__(self, config, params, obj):
349 self._input_port = self._add_input_port("in")
350 self._at = 0
351
352 def _user_consume(comp_self):
353 msg = next(comp_self._msg_iter)
354
355 if comp_self._at == 0:
356 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
357 elif comp_self._at == 1:
358 self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
359 elif comp_self._at >= 2 and comp_self._at <= 6:
360 self.assertIs(type(msg), bt2._EventMessageConst)
361 self.assertEqual(msg.event.cls.name, "salut")
362 elif comp_self._at == 7:
363 self.assertIs(type(msg), bt2._PacketEndMessageConst)
364 elif comp_self._at == 8:
365 self.assertIs(type(msg), bt2._StreamEndMessageConst)
366
367 comp_self._at += 1
368
369 def _user_graph_is_configured(self):
370 self._msg_iter = self._create_message_iterator(self._input_port)
371
372 src = self._graph.add_component(MySource, "src")
373 sink = self._graph.add_component(MySink, "sink")
374 self._graph.connect_ports(src.output_ports["out"], sink.input_ports["in"])
375 self._graph.run()
376
377 def test_run_once(self):
378 class MyIter(_MyIter):
379 pass
380
381 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
382 def __init__(self, config, params, obj):
383 self._add_output_port("out")
384
385 class MySink(bt2._UserSinkComponent):
386 def __init__(self, config, params, obj):
387 self._input_port = self._add_input_port("in")
388
389 def _user_consume(comp_self):
390 nonlocal run_count
391 run_count += 1
392 raise bt2.TryAgain
393
394 run_count = 0
395 src = self._graph.add_component(MySource, "src")
396 sink = self._graph.add_component(MySink, "sink")
397 self._graph.connect_ports(src.output_ports["out"], sink.input_ports["in"])
398
399 with self.assertRaises(bt2.TryAgain):
400 self._graph.run_once()
401
402 self.assertEqual(run_count, 1)
403
404 def test_run_once_stops(self):
405 class MyIter(_MyIter):
406 pass
407
408 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
409 def __init__(self, config, params, obj):
410 self._add_output_port("out")
411
412 class MySink(bt2._UserSinkComponent):
413 def __init__(self, config, params, obj):
414 self._input_port = self._add_input_port("in")
415
416 def _user_consume(comp_self):
417 raise bt2.Stop
418
419 src = self._graph.add_component(MySource, "src")
420 sink = self._graph.add_component(MySink, "sink")
421 self._graph.connect_ports(src.output_ports["out"], sink.input_ports["in"])
422
423 with self.assertRaises(bt2.Stop):
424 self._graph.run_once()
425
426 def test_run_again(self):
427 class MyIter(_MyIter):
428 def __next__(self):
429 if self._at == 3:
430 raise bt2.TryAgain
431
432 if self._at == 0:
433 msg = self._create_stream_beginning_message(self._stream)
434 elif self._at == 1:
435 msg = self._create_packet_beginning_message(self._packet)
436 elif self._at == 2:
437 msg = self._create_event_message(self._ec, self._packet)
438
439 self._at += 1
440 return msg
441
442 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
443 def __init__(self, config, params, obj):
444 self._add_output_port("out")
445
446 class MySink(bt2._UserSinkComponent):
447 def __init__(self, config, params, obj):
448 self._input_port = self._add_input_port("in")
449 self._at = 0
450
451 def _user_consume(comp_self):
452 msg = next(comp_self._msg_iter)
453 if comp_self._at == 0:
454 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
455 elif comp_self._at == 1:
456 self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
457 elif comp_self._at == 2:
458 self.assertIs(type(msg), bt2._EventMessageConst)
459 raise bt2.TryAgain
460 else:
461 pass
462
463 comp_self._at += 1
464
465 def _user_graph_is_configured(self):
466 self._msg_iter = self._create_message_iterator(self._input_port)
467
468 src = self._graph.add_component(MySource, "src")
469 sink = self._graph.add_component(MySink, "sink")
470 self._graph.connect_ports(src.output_ports["out"], sink.input_ports["in"])
471
472 with self.assertRaises(bt2.TryAgain):
473 self._graph.run()
474
475 def test_run_error(self):
476 raised_in_sink = False
477
478 class MyIter(_MyIter):
479 def __next__(self):
480 # If this gets called after the sink raised an exception, it is
481 # an error.
482 nonlocal raised_in_sink
483 assert raised_in_sink is False
484
485 if self._at == 0:
486 msg = self._create_stream_beginning_message(self._stream)
487 elif self._at == 1:
488 msg = self._create_packet_beginning_message(self._packet)
489 elif self._at == 2 or self._at == 3:
490 msg = self._create_event_message(self._ec, self._packet)
491 else:
492 raise bt2.TryAgain
493 self._at += 1
494 return msg
495
496 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
497 def __init__(self, config, params, obj):
498 self._add_output_port("out")
499
500 class MySink(bt2._UserSinkComponent):
501 def __init__(self, config, params, obj):
502 self._input_port = self._add_input_port("in")
503 self._at = 0
504
505 def _user_consume(comp_self):
506 msg = next(comp_self._msg_iter)
507 if comp_self._at == 0:
508 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
509 elif comp_self._at == 1:
510 self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
511 elif comp_self._at == 2:
512 self.assertIs(type(msg), bt2._EventMessageConst)
513 elif comp_self._at == 3:
514 nonlocal raised_in_sink
515 raised_in_sink = True
516 raise RuntimeError("error!")
517
518 comp_self._at += 1
519
520 def _user_graph_is_configured(self):
521 self._msg_iter = self._create_message_iterator(self._input_port)
522
523 src = self._graph.add_component(MySource, "src")
524 sink = self._graph.add_component(MySink, "sink")
525 self._graph.connect_ports(src.output_ports["out"], sink.input_ports["in"])
526
527 with self.assertRaises(bt2._Error):
528 self._graph.run()
529
530 def test_listeners(self):
531 class MySource(
532 bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
533 ):
534 def __init__(self, config, params, obj):
535 self._add_output_port("out")
536 self._add_output_port("zero")
537
538 class MySink(bt2._UserSinkComponent):
539 def __init__(self, config, params, obj):
540 self._add_input_port("in")
541
542 def _user_consume(self):
543 raise bt2.Stop
544
545 def _user_port_connected(self, port, other_port):
546 self._add_input_port("taste")
547
548 def port_added_listener(component, port):
549 nonlocal calls
550 calls.append((port_added_listener, component, port))
551
552 calls = []
553 self._graph.add_port_added_listener(port_added_listener)
554 src = self._graph.add_component(MySource, "src")
555 sink = self._graph.add_component(MySink, "sink")
556 self._graph.connect_ports(src.output_ports["out"], sink.input_ports["in"])
557
558 self.assertEqual(len(calls), 4)
559
560 self.assertIs(calls[0][0], port_added_listener)
561 self.assertEqual(calls[0][1].name, "src")
562 self.assertEqual(calls[0][2].name, "out")
563
564 self.assertIs(calls[1][0], port_added_listener)
565 self.assertEqual(calls[1][1].name, "src")
566 self.assertEqual(calls[1][2].name, "zero")
567
568 self.assertIs(calls[2][0], port_added_listener)
569 self.assertEqual(calls[2][1].name, "sink")
570 self.assertEqual(calls[2][2].name, "in")
571
572 self.assertIs(calls[3][0], port_added_listener)
573 self.assertEqual(calls[3][1].name, "sink")
574 self.assertEqual(calls[3][2].name, "taste")
575
576 def test_invalid_listeners(self):
577 class MySource(
578 bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
579 ):
580 def __init__(self, config, params, obj):
581 self._add_output_port("out")
582 self._add_output_port("zero")
583
584 class MySink(bt2._UserSinkComponent):
585 def __init__(self, config, params, obj):
586 self._add_input_port("in")
587
588 def _user_consume(self):
589 raise bt2.Stop
590
591 def _user_port_connected(self, port, other_port):
592 self._add_input_port("taste")
593
594 with self.assertRaises(TypeError):
595 self._graph.add_port_added_listener(1234)
596
597 def test_raise_in_component_init(self):
598 class MySink(bt2._UserSinkComponent):
599 def __init__(self, config, params, obj):
600 raise ValueError("oops!")
601
602 def _user_consume(self):
603 raise bt2.Stop
604
605 graph = bt2.Graph()
606
607 with self.assertRaises(bt2._Error):
608 graph.add_component(MySink, "comp")
609
610 def test_raise_in_port_added_listener(self):
611 class MySink(bt2._UserSinkComponent):
612 def __init__(self, config, params, obj):
613 self._add_input_port("in")
614
615 def _user_consume(self):
616 raise bt2.Stop
617
618 def port_added_listener(component, port):
619 raise ValueError("oh noes!")
620
621 graph = bt2.Graph()
622 graph.add_port_added_listener(port_added_listener)
623
624 with self.assertRaises(bt2._Error):
625 graph.add_component(MySink, "comp")
626
627
628 if __name__ == "__main__":
629 unittest.main()
This page took 0.044774 seconds and 4 git commands to generate.