cpp-common/bt2c/fmt.hpp: use `wise_enum::string_type` in `EnableIfIsWiseEnum` definition
[babeltrace.git] / tests / bindings / python / bt2 / test_graph.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # Copyright (C) 2019 EfficiOS Inc.
4 #
5
6 import unittest
7
8 import bt2
9
10
11 class _MyIter(bt2._UserMessageIterator):
12 def __init__(self, config, self_output_port):
13 self._build_meta()
14 self._at = 0
15
16 def _build_meta(self):
17 self._tc = self._component._create_trace_class()
18 self._t = self._tc()
19 self._sc = self._tc.create_stream_class(supports_packets=True)
20 self._ec = self._sc.create_event_class(name="salut")
21 self._my_int_ft = self._tc.create_signed_integer_field_class(32)
22 payload_ft = self._tc.create_structure_field_class()
23 payload_ft += [("my_int", self._my_int_ft)]
24 self._ec.payload_field_type = payload_ft
25 self._stream = self._t.create_stream(self._sc)
26 self._packet = self._stream.create_packet()
27
28 def _create_event(self, value):
29 ev = self._ec()
30 ev.payload_field["my_int"] = value
31 ev.packet = self._packet
32 return ev
33
34
35 class GraphTestCase(unittest.TestCase):
36 def setUp(self):
37 self._graph = bt2.Graph()
38
39 def tearDown(self):
40 del self._graph
41
42 def test_create_default(self):
43 bt2.Graph()
44
45 def test_create_known_mip_version(self):
46 bt2.Graph(0)
47
48 def test_create_invalid_mip_version_type(self):
49 with self.assertRaises(TypeError):
50 bt2.Graph("")
51
52 def test_create_unknown_mip_version(self):
53 with self.assertRaisesRegex(ValueError, "unknown MIP version"):
54 bt2.Graph(1)
55
56 def test_default_interrupter(self):
57 interrupter = self._graph.default_interrupter
58 self.assertIs(type(interrupter), bt2.Interrupter)
59
60 def test_add_component_user_cls(self):
61 class MySink(bt2._UserSinkComponent):
62 def _user_consume(self):
63 pass
64
65 comp = self._graph.add_component(MySink, "salut")
66 self.assertEqual(comp.name, "salut")
67
68 def test_add_component_gen_cls(self):
69 class MySink(bt2._UserSinkComponent):
70 def _user_consume(self):
71 pass
72
73 comp = self._graph.add_component(MySink, "salut")
74 assert comp
75 comp2 = self._graph.add_component(comp.cls, "salut2")
76 self.assertEqual(comp2.name, "salut2")
77
78 def test_add_component_params(self):
79 comp_params = None
80
81 class MySink(bt2._UserSinkComponent):
82 def __init__(self, config, params, obj):
83 nonlocal comp_params
84 comp_params = params
85
86 def _user_consume(self):
87 pass
88
89 params = {"hello": 23, "path": "/path/to/stuff"}
90 self._graph.add_component(MySink, "salut", params)
91 self.assertEqual(params, comp_params)
92 del comp_params
93
94 def test_add_component_obj_python_comp_cls(self):
95 comp_obj = None
96
97 class MySink(bt2._UserSinkComponent):
98 def __init__(self, config, params, obj):
99 nonlocal comp_obj
100 comp_obj = obj
101
102 def _user_consume(self):
103 pass
104
105 obj = object()
106 self._graph.add_component(MySink, "salut", obj=obj)
107 self.assertIs(comp_obj, obj)
108 del comp_obj
109
110 def test_add_component_obj_none_python_comp_cls(self):
111 comp_obj = None
112
113 class MySink(bt2._UserSinkComponent):
114 def __init__(self, config, params, obj):
115 nonlocal comp_obj
116 comp_obj = obj
117
118 def _user_consume(self):
119 pass
120
121 self._graph.add_component(MySink, "salut")
122 self.assertIsNone(comp_obj)
123 del comp_obj
124
125 def test_add_component_obj_non_python_comp_cls(self):
126 plugin = bt2.find_plugin("text", find_in_user_dir=False, find_in_sys_dir=False)
127 assert plugin is not None
128 cc = plugin.source_component_classes["dmesg"]
129 assert cc is not None
130
131 with self.assertRaises(ValueError):
132 self._graph.add_component(cc, "salut", obj=57)
133
134 def test_add_component_invalid_cls_type(self):
135 with self.assertRaises(TypeError):
136 self._graph.add_component(int, "salut")
137
138 def test_add_component_invalid_logging_level_type(self):
139 class MySink(bt2._UserSinkComponent):
140 def _user_consume(self):
141 pass
142
143 with self.assertRaises(TypeError):
144 self._graph.add_component(MySink, "salut", logging_level="yo")
145
146 def test_add_component_invalid_logging_level_value(self):
147 class MySink(bt2._UserSinkComponent):
148 def _user_consume(self):
149 pass
150
151 with self.assertRaises(ValueError):
152 self._graph.add_component(MySink, "salut", logging_level=12345)
153
154 def test_add_component_invalid_params_type(self):
155 class MySink(bt2._UserSinkComponent):
156 def _user_consume(self):
157 pass
158
159 with self.assertRaises(TypeError):
160 self._graph.add_component(MySink, "salut", params=12)
161
162 def test_add_component_params_dict(self):
163 params_obj = None
164
165 class MySink(bt2._UserSinkComponent):
166 def __init__(self, config, params, obj):
167 nonlocal params_obj
168 params_obj = params
169
170 def _user_consume(self):
171 pass
172
173 params = {"plage": 12312}
174 self._graph.add_component(MySink, "salut", params=params)
175
176 # Check equality and not identity because `add_component()` method
177 # converts the Python `dict` to a `bt2.MapValue`.
178 self.assertEqual(params, params_obj)
179
180 def test_add_component_params_mapvalue(self):
181 params_obj = None
182
183 class MySink(bt2._UserSinkComponent):
184 def __init__(self, config, params, obj):
185 nonlocal params_obj
186 params_obj = params
187
188 def _user_consume(self):
189 pass
190
191 params = bt2.MapValue({"beachclub": "121"})
192 self._graph.add_component(MySink, "salut", params=params)
193
194 self.assertEqual(params, params_obj)
195
196 def test_add_component_logging_level(self):
197 class MySink(bt2._UserSinkComponent):
198 def _user_consume(self):
199 pass
200
201 comp = self._graph.add_component(
202 MySink, "salut", logging_level=bt2.LoggingLevel.DEBUG
203 )
204 self.assertEqual(comp.logging_level, bt2.LoggingLevel.DEBUG)
205
206 def test_connect_ports(self):
207 class MySource(
208 bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
209 ):
210 def __init__(self, config, params, obj):
211 self._add_output_port("out")
212
213 class MySink(bt2._UserSinkComponent):
214 def __init__(self, config, params, obj):
215 self._add_input_port("in")
216
217 def _user_consume(self):
218 raise bt2.Stop
219
220 src = self._graph.add_component(MySource, "src")
221 sink = self._graph.add_component(MySink, "sink")
222
223 conn = self._graph.connect_ports(
224 src.output_ports["out"], sink.input_ports["in"]
225 )
226 self.assertTrue(src.output_ports["out"].is_connected)
227 self.assertTrue(sink.input_ports["in"].is_connected)
228 self.assertEqual(src.output_ports["out"].connection.addr, conn.addr)
229 self.assertEqual(sink.input_ports["in"].connection.addr, conn.addr)
230
231 def test_connect_ports_invalid_direction(self):
232 class MySource(
233 bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
234 ):
235 def __init__(self, config, params, obj):
236 self._add_output_port("out")
237
238 class MySink(bt2._UserSinkComponent):
239 def __init__(self, config, params, obj):
240 self._add_input_port("in")
241
242 def _user_consume(self):
243 raise bt2.Stop
244
245 src = self._graph.add_component(MySource, "src")
246 sink = self._graph.add_component(MySink, "sink")
247
248 with self.assertRaises(TypeError):
249 self._graph.connect_ports(sink.input_ports["in"], src.output_ports["out"])
250
251 def test_add_interrupter(self):
252 class MyIter(bt2._UserMessageIterator):
253 def __next__(self):
254 raise TypeError
255
256 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
257 def __init__(self, config, params, obj):
258 self._add_output_port("out")
259
260 class MySink(bt2._UserSinkComponent):
261 def __init__(self, config, params, obj):
262 self._add_input_port("in")
263
264 def _user_consume(self):
265 next(self._msg_iter)
266
267 def _user_graph_is_configured(self):
268 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
269
270 # add two interrupters, set one of them
271 interrupter1 = bt2.Interrupter()
272 interrupter2 = bt2.Interrupter()
273 self._graph.add_interrupter(interrupter1)
274 src = self._graph.add_component(MySource, "src")
275 sink = self._graph.add_component(MySink, "sink")
276 self._graph.connect_ports(src.output_ports["out"], sink.input_ports["in"])
277 self._graph.add_interrupter(interrupter2)
278
279 with self.assertRaises(bt2._Error):
280 self._graph.run()
281
282 interrupter2.set()
283
284 with self.assertRaises(bt2.TryAgain):
285 self._graph.run()
286
287 interrupter2.reset()
288
289 with self.assertRaises(bt2._Error):
290 self._graph.run()
291
292 # Test that Graph.run() raises bt2.Interrupted if the graph gets
293 # interrupted during execution.
294 def test_interrupt_while_running(self):
295 class MyIter(_MyIter):
296 def __next__(self):
297 return self._create_stream_beginning_message(self._stream)
298
299 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
300 def __init__(self, config, params, obj):
301 self._add_output_port("out")
302
303 class MySink(bt2._UserSinkComponent):
304 def __init__(self, config, params, obj):
305 self._add_input_port("in")
306
307 def _user_consume(self):
308 # Pretend that somebody asynchronously interrupted the graph.
309 nonlocal graph
310 graph.default_interrupter.set()
311 return next(self._msg_iter)
312
313 def _user_graph_is_configured(self):
314 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
315
316 graph = self._graph
317 up = self._graph.add_component(MySource, "down")
318 down = self._graph.add_component(MySink, "up")
319 self._graph.connect_ports(up.output_ports["out"], down.input_ports["in"])
320
321 with self.assertRaises(bt2.TryAgain):
322 self._graph.run()
323
324 def test_run(self):
325 class MyIter(_MyIter):
326 def __next__(self):
327 if self._at == 9:
328 raise StopIteration
329
330 if self._at == 0:
331 msg = self._create_stream_beginning_message(self._stream)
332 elif self._at == 1:
333 msg = self._create_packet_beginning_message(self._packet)
334 elif self._at == 7:
335 msg = self._create_packet_end_message(self._packet)
336 elif self._at == 8:
337 msg = self._create_stream_end_message(self._stream)
338 else:
339 msg = self._create_event_message(self._ec, self._packet)
340
341 self._at += 1
342 return msg
343
344 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
345 def __init__(self, config, params, obj):
346 self._add_output_port("out")
347
348 class MySink(bt2._UserSinkComponent):
349 def __init__(self, config, params, obj):
350 self._input_port = self._add_input_port("in")
351 self._at = 0
352
353 def _user_consume(comp_self):
354 msg = next(comp_self._msg_iter)
355
356 if comp_self._at == 0:
357 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
358 elif comp_self._at == 1:
359 self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
360 elif comp_self._at >= 2 and comp_self._at <= 6:
361 self.assertIs(type(msg), bt2._EventMessageConst)
362 self.assertEqual(msg.event.cls.name, "salut")
363 elif comp_self._at == 7:
364 self.assertIs(type(msg), bt2._PacketEndMessageConst)
365 elif comp_self._at == 8:
366 self.assertIs(type(msg), bt2._StreamEndMessageConst)
367
368 comp_self._at += 1
369
370 def _user_graph_is_configured(self):
371 self._msg_iter = self._create_message_iterator(self._input_port)
372
373 src = self._graph.add_component(MySource, "src")
374 sink = self._graph.add_component(MySink, "sink")
375 self._graph.connect_ports(src.output_ports["out"], sink.input_ports["in"])
376 self._graph.run()
377
378 def test_run_once(self):
379 class MyIter(_MyIter):
380 pass
381
382 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
383 def __init__(self, config, params, obj):
384 self._add_output_port("out")
385
386 class MySink(bt2._UserSinkComponent):
387 def __init__(self, config, params, obj):
388 self._input_port = self._add_input_port("in")
389
390 def _user_consume(comp_self):
391 nonlocal run_count
392 run_count += 1
393 raise bt2.TryAgain
394
395 run_count = 0
396 src = self._graph.add_component(MySource, "src")
397 sink = self._graph.add_component(MySink, "sink")
398 self._graph.connect_ports(src.output_ports["out"], sink.input_ports["in"])
399
400 with self.assertRaises(bt2.TryAgain):
401 self._graph.run_once()
402
403 self.assertEqual(run_count, 1)
404
405 def test_run_once_stops(self):
406 class MyIter(_MyIter):
407 pass
408
409 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
410 def __init__(self, config, params, obj):
411 self._add_output_port("out")
412
413 class MySink(bt2._UserSinkComponent):
414 def __init__(self, config, params, obj):
415 self._input_port = self._add_input_port("in")
416
417 def _user_consume(comp_self):
418 raise bt2.Stop
419
420 src = self._graph.add_component(MySource, "src")
421 sink = self._graph.add_component(MySink, "sink")
422 self._graph.connect_ports(src.output_ports["out"], sink.input_ports["in"])
423
424 with self.assertRaises(bt2.Stop):
425 self._graph.run_once()
426
427 def test_run_again(self):
428 class MyIter(_MyIter):
429 def __next__(self):
430 if self._at == 3:
431 raise bt2.TryAgain
432
433 if self._at == 0:
434 msg = self._create_stream_beginning_message(self._stream)
435 elif self._at == 1:
436 msg = self._create_packet_beginning_message(self._packet)
437 elif self._at == 2:
438 msg = self._create_event_message(self._ec, self._packet)
439
440 self._at += 1
441 return msg
442
443 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
444 def __init__(self, config, params, obj):
445 self._add_output_port("out")
446
447 class MySink(bt2._UserSinkComponent):
448 def __init__(self, config, params, obj):
449 self._input_port = self._add_input_port("in")
450 self._at = 0
451
452 def _user_consume(comp_self):
453 msg = next(comp_self._msg_iter)
454 if comp_self._at == 0:
455 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
456 elif comp_self._at == 1:
457 self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
458 elif comp_self._at == 2:
459 self.assertIs(type(msg), bt2._EventMessageConst)
460 raise bt2.TryAgain
461 else:
462 pass
463
464 comp_self._at += 1
465
466 def _user_graph_is_configured(self):
467 self._msg_iter = self._create_message_iterator(self._input_port)
468
469 src = self._graph.add_component(MySource, "src")
470 sink = self._graph.add_component(MySink, "sink")
471 self._graph.connect_ports(src.output_ports["out"], sink.input_ports["in"])
472
473 with self.assertRaises(bt2.TryAgain):
474 self._graph.run()
475
476 def test_run_error(self):
477 raised_in_sink = False
478
479 class MyIter(_MyIter):
480 def __next__(self):
481 # If this gets called after the sink raised an exception, it is
482 # an error.
483 nonlocal raised_in_sink
484 assert raised_in_sink is False
485
486 if self._at == 0:
487 msg = self._create_stream_beginning_message(self._stream)
488 elif self._at == 1:
489 msg = self._create_packet_beginning_message(self._packet)
490 elif self._at == 2 or self._at == 3:
491 msg = self._create_event_message(self._ec, self._packet)
492 else:
493 raise bt2.TryAgain
494 self._at += 1
495 return msg
496
497 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
498 def __init__(self, config, params, obj):
499 self._add_output_port("out")
500
501 class MySink(bt2._UserSinkComponent):
502 def __init__(self, config, params, obj):
503 self._input_port = self._add_input_port("in")
504 self._at = 0
505
506 def _user_consume(comp_self):
507 msg = next(comp_self._msg_iter)
508 if comp_self._at == 0:
509 self.assertIs(type(msg), bt2._StreamBeginningMessageConst)
510 elif comp_self._at == 1:
511 self.assertIs(type(msg), bt2._PacketBeginningMessageConst)
512 elif comp_self._at == 2:
513 self.assertIs(type(msg), bt2._EventMessageConst)
514 elif comp_self._at == 3:
515 nonlocal raised_in_sink
516 raised_in_sink = True
517 raise RuntimeError("error!")
518
519 comp_self._at += 1
520
521 def _user_graph_is_configured(self):
522 self._msg_iter = self._create_message_iterator(self._input_port)
523
524 src = self._graph.add_component(MySource, "src")
525 sink = self._graph.add_component(MySink, "sink")
526 self._graph.connect_ports(src.output_ports["out"], sink.input_ports["in"])
527
528 with self.assertRaises(bt2._Error):
529 self._graph.run()
530
531 def test_listeners(self):
532 class MySource(
533 bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
534 ):
535 def __init__(self, config, params, obj):
536 self._add_output_port("out")
537 self._add_output_port("zero")
538
539 class MySink(bt2._UserSinkComponent):
540 def __init__(self, config, params, obj):
541 self._add_input_port("in")
542
543 def _user_consume(self):
544 raise bt2.Stop
545
546 def _user_port_connected(self, port, other_port):
547 self._add_input_port("taste")
548
549 def port_added_listener(component, port):
550 nonlocal calls
551 calls.append((port_added_listener, component, port))
552
553 calls = []
554 self._graph.add_port_added_listener(port_added_listener)
555 src = self._graph.add_component(MySource, "src")
556 sink = self._graph.add_component(MySink, "sink")
557 self._graph.connect_ports(src.output_ports["out"], sink.input_ports["in"])
558
559 self.assertEqual(len(calls), 4)
560
561 self.assertIs(calls[0][0], port_added_listener)
562 self.assertEqual(calls[0][1].name, "src")
563 self.assertEqual(calls[0][2].name, "out")
564
565 self.assertIs(calls[1][0], port_added_listener)
566 self.assertEqual(calls[1][1].name, "src")
567 self.assertEqual(calls[1][2].name, "zero")
568
569 self.assertIs(calls[2][0], port_added_listener)
570 self.assertEqual(calls[2][1].name, "sink")
571 self.assertEqual(calls[2][2].name, "in")
572
573 self.assertIs(calls[3][0], port_added_listener)
574 self.assertEqual(calls[3][1].name, "sink")
575 self.assertEqual(calls[3][2].name, "taste")
576
577 def test_invalid_listeners(self):
578 class MySource(
579 bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
580 ):
581 def __init__(self, config, params, obj):
582 self._add_output_port("out")
583 self._add_output_port("zero")
584
585 class MySink(bt2._UserSinkComponent):
586 def __init__(self, config, params, obj):
587 self._add_input_port("in")
588
589 def _user_consume(self):
590 raise bt2.Stop
591
592 def _user_port_connected(self, port, other_port):
593 self._add_input_port("taste")
594
595 with self.assertRaises(TypeError):
596 self._graph.add_port_added_listener(1234)
597
598 def test_raise_in_component_init(self):
599 class MySink(bt2._UserSinkComponent):
600 def __init__(self, config, params, obj):
601 raise ValueError("oops!")
602
603 def _user_consume(self):
604 raise bt2.Stop
605
606 graph = bt2.Graph()
607
608 with self.assertRaises(bt2._Error):
609 graph.add_component(MySink, "comp")
610
611 def test_raise_in_port_added_listener(self):
612 class MySink(bt2._UserSinkComponent):
613 def __init__(self, config, params, obj):
614 self._add_input_port("in")
615
616 def _user_consume(self):
617 raise bt2.Stop
618
619 def port_added_listener(component, port):
620 raise ValueError("oh noes!")
621
622 graph = bt2.Graph()
623 graph.add_port_added_listener(port_added_listener)
624
625 with self.assertRaises(bt2._Error):
626 graph.add_component(MySink, "comp")
627
628
629 if __name__ == "__main__":
630 unittest.main()
This page took 0.042128 seconds and 4 git commands to generate.