bt2: pass custom Python object to Python component's __init__()
[babeltrace.git] / tests / bindings / python / bt2 / test_graph.py
1 #
2 # Copyright (C) 2019 EfficiOS Inc.
3 #
4 # This program is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License
6 # as published by the Free Software Foundation; only version 2
7 # of the License.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 #
18
19 from bt2 import value
20 import collections
21 import unittest
22 import copy
23 import bt2
24
25
26 class _MyIter(bt2._UserMessageIterator):
27 def __init__(self, self_output_port):
28 self._build_meta()
29 self._at = 0
30
31 def _build_meta(self):
32 self._tc = self._component._create_trace_class()
33 self._t = self._tc()
34 self._sc = self._tc.create_stream_class(supports_packets=True)
35 self._ec = self._sc.create_event_class(name='salut')
36 self._my_int_ft = self._tc.create_signed_integer_field_class(32)
37 payload_ft = self._tc.create_structure_field_class()
38 payload_ft += [('my_int', self._my_int_ft)]
39 self._ec.payload_field_type = payload_ft
40 self._stream = self._t.create_stream(self._sc)
41 self._packet = self._stream.create_packet()
42
43 def _create_event(self, value):
44 ev = self._ec()
45 ev.payload_field['my_int'] = value
46 ev.packet = self._packet
47 return ev
48
49
50 class GraphTestCase(unittest.TestCase):
51 def setUp(self):
52 self._graph = bt2.Graph()
53
54 def tearDown(self):
55 del self._graph
56
57 def test_create_empty(self):
58 graph = bt2.Graph()
59
60 def test_add_component_user_cls(self):
61 class MySink(bt2._UserSinkComponent):
62 def _user_consume(self):
63 pass
64
65 comp = self._graph.add_component(MySink, 'salut')
66 self.assertEqual(comp.name, 'salut')
67
68 def test_add_component_gen_cls(self):
69 class MySink(bt2._UserSinkComponent):
70 def _user_consume(self):
71 pass
72
73 comp = self._graph.add_component(MySink, 'salut')
74 assert comp
75 comp2 = self._graph.add_component(comp.cls, 'salut2')
76 self.assertEqual(comp2.name, 'salut2')
77
78 def test_add_component_params(self):
79 comp_params = None
80
81 class MySink(bt2._UserSinkComponent):
82 def __init__(self, params, obj):
83 nonlocal comp_params
84 comp_params = params
85
86 def _user_consume(self):
87 pass
88
89 params = {'hello': 23, 'path': '/path/to/stuff'}
90 comp = self._graph.add_component(MySink, 'salut', params)
91 self.assertEqual(params, comp_params)
92 del comp_params
93
94 def test_add_component_obj_python_comp_cls(self):
95 comp_obj = None
96
97 class MySink(bt2._UserSinkComponent):
98 def __init__(self, params, obj):
99 nonlocal comp_obj
100 comp_obj = obj
101
102 def _user_consume(self):
103 pass
104
105 obj = object()
106 comp = self._graph.add_component(MySink, 'salut', obj=obj)
107 self.assertIs(comp_obj, obj)
108 del comp_obj
109
110 def test_add_component_obj_none_python_comp_cls(self):
111 comp_obj = None
112
113 class MySink(bt2._UserSinkComponent):
114 def __init__(self, params, obj):
115 nonlocal comp_obj
116 comp_obj = obj
117
118 def _user_consume(self):
119 pass
120
121 comp = self._graph.add_component(MySink, 'salut')
122 self.assertIsNone(comp_obj)
123 del comp_obj
124
125 def test_add_component_obj_non_python_comp_cls(self):
126 comp_obj = None
127
128 plugin = bt2.find_plugin('text', find_in_user_dir=False, find_in_sys_dir=False)
129 assert plugin is not None
130 cc = plugin.source_component_classes['dmesg']
131 assert cc is not None
132
133 with self.assertRaises(ValueError):
134 comp = self._graph.add_component(cc, 'salut', obj=57)
135
136 def test_add_component_invalid_cls_type(self):
137 with self.assertRaises(TypeError):
138 self._graph.add_component(int, 'salut')
139
140 def test_add_component_invalid_logging_level_type(self):
141 class MySink(bt2._UserSinkComponent):
142 def _user_consume(self):
143 pass
144
145 with self.assertRaises(TypeError):
146 self._graph.add_component(MySink, 'salut', logging_level='yo')
147
148 def test_add_component_invalid_logging_level_value(self):
149 class MySink(bt2._UserSinkComponent):
150 def _user_consume(self):
151 pass
152
153 with self.assertRaises(ValueError):
154 self._graph.add_component(MySink, 'salut', logging_level=12345)
155
156 def test_add_component_logging_level(self):
157 class MySink(bt2._UserSinkComponent):
158 def _user_consume(self):
159 pass
160
161 comp = self._graph.add_component(
162 MySink, 'salut', logging_level=bt2.LoggingLevel.DEBUG
163 )
164 self.assertEqual(comp.logging_level, bt2.LoggingLevel.DEBUG)
165
166 def test_connect_ports(self):
167 class MyIter(bt2._UserMessageIterator):
168 def __next__(self):
169 raise bt2.Stop
170
171 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
172 def __init__(self, params, obj):
173 self._add_output_port('out')
174
175 class MySink(bt2._UserSinkComponent):
176 def __init__(self, params, obj):
177 self._add_input_port('in')
178
179 def _user_consume(self):
180 raise bt2.Stop
181
182 src = self._graph.add_component(MySource, 'src')
183 sink = self._graph.add_component(MySink, 'sink')
184
185 conn = self._graph.connect_ports(
186 src.output_ports['out'], sink.input_ports['in']
187 )
188 self.assertTrue(src.output_ports['out'].is_connected)
189 self.assertTrue(sink.input_ports['in'].is_connected)
190 self.assertEqual(src.output_ports['out'].connection.addr, conn.addr)
191 self.assertEqual(sink.input_ports['in'].connection.addr, conn.addr)
192
193 def test_connect_ports_invalid_direction(self):
194 class MyIter(bt2._UserMessageIterator):
195 def __next__(self):
196 raise bt2.Stop
197
198 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
199 def __init__(self, params, obj):
200 self._add_output_port('out')
201
202 class MySink(bt2._UserSinkComponent):
203 def __init__(self, params, obj):
204 self._add_input_port('in')
205
206 def _user_consume(self):
207 raise bt2.Stop
208
209 src = self._graph.add_component(MySource, 'src')
210 sink = self._graph.add_component(MySink, 'sink')
211
212 with self.assertRaises(TypeError):
213 conn = self._graph.connect_ports(
214 sink.input_ports['in'], src.output_ports['out']
215 )
216
217 def test_add_interrupter(self):
218 class MyIter(bt2._UserMessageIterator):
219 def __next__(self):
220 raise TypeError
221
222 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
223 def __init__(self, params, obj):
224 self._add_output_port('out')
225
226 class MySink(bt2._UserSinkComponent):
227 def __init__(self, params, obj):
228 self._add_input_port('in')
229
230 def _user_consume(self):
231 next(self._msg_iter)
232
233 def _user_graph_is_configured(self):
234 self._msg_iter = self._create_input_port_message_iterator(
235 self._input_ports['in']
236 )
237
238 # add two interrupters, set one of them
239 interrupter1 = bt2.Interrupter()
240 interrupter2 = bt2.Interrupter()
241 self._graph.add_interrupter(interrupter1)
242 src = self._graph.add_component(MySource, 'src')
243 sink = self._graph.add_component(MySink, 'sink')
244 self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
245 self._graph.add_interrupter(interrupter2)
246
247 with self.assertRaises(bt2._Error):
248 self._graph.run()
249
250 interrupter2.set()
251
252 with self.assertRaises(bt2.TryAgain):
253 self._graph.run()
254
255 interrupter2.reset()
256
257 with self.assertRaises(bt2._Error):
258 self._graph.run()
259
260 # Test that Graph.run() raises bt2.Interrupted if the graph gets
261 # interrupted during execution.
262 def test_interrupt_while_running(self):
263 class MyIter(_MyIter):
264 def __next__(self):
265 return self._create_stream_beginning_message(self._stream)
266
267 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
268 def __init__(self, params, obj):
269 self._add_output_port('out')
270
271 class MySink(bt2._UserSinkComponent):
272 def __init__(self, params, obj):
273 self._add_input_port('in')
274
275 def _user_consume(self):
276 # Pretend that somebody asynchronously interrupted the graph.
277 nonlocal graph
278 graph.interrupt()
279 return next(self._msg_iter)
280
281 def _user_graph_is_configured(self):
282 self._msg_iter = self._create_input_port_message_iterator(
283 self._input_ports['in']
284 )
285
286 graph = self._graph
287 up = self._graph.add_component(MySource, 'down')
288 down = self._graph.add_component(MySink, 'up')
289 self._graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
290
291 with self.assertRaises(bt2.TryAgain):
292 self._graph.run()
293
294 def test_run(self):
295 class MyIter(_MyIter):
296 def __next__(self):
297 if self._at == 9:
298 raise StopIteration
299
300 if self._at == 0:
301 msg = self._create_stream_beginning_message(self._stream)
302 elif self._at == 1:
303 msg = self._create_packet_beginning_message(self._packet)
304 elif self._at == 7:
305 msg = self._create_packet_end_message(self._packet)
306 elif self._at == 8:
307 msg = self._create_stream_end_message(self._stream)
308 else:
309 msg = self._create_event_message(self._ec, self._packet)
310
311 self._at += 1
312 return msg
313
314 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
315 def __init__(self, params, obj):
316 self._add_output_port('out')
317
318 class MySink(bt2._UserSinkComponent):
319 def __init__(self, params, obj):
320 self._input_port = self._add_input_port('in')
321 self._at = 0
322
323 def _user_consume(comp_self):
324 msg = next(comp_self._msg_iter)
325
326 if comp_self._at == 0:
327 self.assertIsInstance(msg, bt2._StreamBeginningMessage)
328 elif comp_self._at == 1:
329 self.assertIsInstance(msg, bt2._PacketBeginningMessage)
330 elif comp_self._at >= 2 and comp_self._at <= 6:
331 self.assertIsInstance(msg, bt2._EventMessage)
332 self.assertEqual(msg.event.cls.name, 'salut')
333 elif comp_self._at == 7:
334 self.assertIsInstance(msg, bt2._PacketEndMessage)
335 elif comp_self._at == 8:
336 self.assertIsInstance(msg, bt2._StreamEndMessage)
337
338 comp_self._at += 1
339
340 def _user_graph_is_configured(self):
341 self._msg_iter = self._create_input_port_message_iterator(
342 self._input_port
343 )
344
345 src = self._graph.add_component(MySource, 'src')
346 sink = self._graph.add_component(MySink, 'sink')
347 conn = self._graph.connect_ports(
348 src.output_ports['out'], sink.input_ports['in']
349 )
350 self._graph.run()
351
352 def test_run_again(self):
353 class MyIter(_MyIter):
354 def __next__(self):
355 if self._at == 3:
356 raise bt2.TryAgain
357
358 if self._at == 0:
359 msg = self._create_stream_beginning_message(self._stream)
360 elif self._at == 1:
361 msg = self._create_packet_beginning_message(self._packet)
362 elif self._at == 2:
363 msg = self._create_event_message(self._ec, self._packet)
364
365 self._at += 1
366 return msg
367
368 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
369 def __init__(self, params, obj):
370 self._add_output_port('out')
371
372 class MySink(bt2._UserSinkComponent):
373 def __init__(self, params, obj):
374 self._input_port = self._add_input_port('in')
375 self._at = 0
376
377 def _user_consume(comp_self):
378 msg = next(comp_self._msg_iter)
379 if comp_self._at == 0:
380 self.assertIsInstance(msg, bt2._StreamBeginningMessage)
381 elif comp_self._at == 1:
382 self.assertIsInstance(msg, bt2._PacketBeginningMessage)
383 elif comp_self._at == 2:
384 self.assertIsInstance(msg, bt2._EventMessage)
385 raise bt2.TryAgain
386 else:
387 pass
388
389 comp_self._at += 1
390
391 def _user_graph_is_configured(self):
392 self._msg_iter = self._create_input_port_message_iterator(
393 self._input_port
394 )
395
396 src = self._graph.add_component(MySource, 'src')
397 sink = self._graph.add_component(MySink, 'sink')
398 conn = self._graph.connect_ports(
399 src.output_ports['out'], sink.input_ports['in']
400 )
401
402 with self.assertRaises(bt2.TryAgain):
403 self._graph.run()
404
405 def test_run_error(self):
406 raised_in_sink = False
407
408 class MyIter(_MyIter):
409 def __next__(self):
410 # If this gets called after the sink raised an exception, it is
411 # an error.
412 nonlocal raised_in_sink
413 assert raised_in_sink is False
414
415 if self._at == 0:
416 msg = self._create_stream_beginning_message(self._stream)
417 elif self._at == 1:
418 msg = self._create_packet_beginning_message(self._packet)
419 elif self._at == 2 or self._at == 3:
420 msg = self._create_event_message(self._ec, self._packet)
421 else:
422 raise bt2.TryAgain
423 self._at += 1
424 return msg
425
426 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
427 def __init__(self, params, obj):
428 self._add_output_port('out')
429
430 class MySink(bt2._UserSinkComponent):
431 def __init__(self, params, obj):
432 self._input_port = self._add_input_port('in')
433 self._at = 0
434
435 def _user_consume(comp_self):
436 msg = next(comp_self._msg_iter)
437 if comp_self._at == 0:
438 self.assertIsInstance(msg, bt2._StreamBeginningMessage)
439 elif comp_self._at == 1:
440 self.assertIsInstance(msg, bt2._PacketBeginningMessage)
441 elif comp_self._at == 2:
442 self.assertIsInstance(msg, bt2._EventMessage)
443 elif comp_self._at == 3:
444 nonlocal raised_in_sink
445 raised_in_sink = True
446 raise RuntimeError('error!')
447
448 comp_self._at += 1
449
450 def _user_graph_is_configured(self):
451 self._msg_iter = self._create_input_port_message_iterator(
452 self._input_port
453 )
454
455 src = self._graph.add_component(MySource, 'src')
456 sink = self._graph.add_component(MySink, 'sink')
457 conn = self._graph.connect_ports(
458 src.output_ports['out'], sink.input_ports['in']
459 )
460
461 with self.assertRaises(bt2._Error):
462 self._graph.run()
463
464 def test_listeners(self):
465 class MyIter(bt2._UserMessageIterator):
466 def __next__(self):
467 raise bt2.Stop
468
469 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
470 def __init__(self, params, obj):
471 self._add_output_port('out')
472 self._add_output_port('zero')
473
474 class MySink(bt2._UserSinkComponent):
475 def __init__(self, params, obj):
476 self._add_input_port('in')
477
478 def _user_consume(self):
479 raise bt2.Stop
480
481 def _user_port_connected(self, port, other_port):
482 self._add_input_port('taste')
483
484 def port_added_listener(component, port):
485 nonlocal calls
486 calls.append((port_added_listener, component, port))
487
488 def ports_connected_listener(
489 upstream_component, upstream_port, downstream_component, downstream_port
490 ):
491 nonlocal calls
492 calls.append(
493 (
494 ports_connected_listener,
495 upstream_component,
496 upstream_port,
497 downstream_component,
498 downstream_port,
499 )
500 )
501
502 calls = []
503 self._graph.add_port_added_listener(port_added_listener)
504 self._graph.add_ports_connected_listener(ports_connected_listener)
505 src = self._graph.add_component(MySource, 'src')
506 sink = self._graph.add_component(MySink, 'sink')
507 self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
508
509 self.assertEqual(len(calls), 5)
510
511 self.assertIs(calls[0][0], port_added_listener)
512 self.assertEqual(calls[0][1].name, 'src')
513 self.assertEqual(calls[0][2].name, 'out')
514
515 self.assertIs(calls[1][0], port_added_listener)
516 self.assertEqual(calls[1][1].name, 'src')
517 self.assertEqual(calls[1][2].name, 'zero')
518
519 self.assertIs(calls[2][0], port_added_listener)
520 self.assertEqual(calls[2][1].name, 'sink')
521 self.assertEqual(calls[2][2].name, 'in')
522
523 self.assertIs(calls[3][0], port_added_listener)
524 self.assertEqual(calls[3][1].name, 'sink')
525 self.assertEqual(calls[3][2].name, 'taste')
526
527 self.assertIs(calls[4][0], ports_connected_listener)
528 self.assertEqual(calls[4][1].name, 'src')
529 self.assertEqual(calls[4][2].name, 'out')
530 self.assertEqual(calls[4][3].name, 'sink')
531 self.assertEqual(calls[4][4].name, 'in')
532
533 def test_invalid_listeners(self):
534 class MyIter(bt2._UserMessageIterator):
535 def __next__(self):
536 raise bt2.Stop
537
538 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
539 def __init__(self, params, obj):
540 self._add_output_port('out')
541 self._add_output_port('zero')
542
543 class MySink(bt2._UserSinkComponent):
544 def __init__(self, params, obj):
545 self._add_input_port('in')
546
547 def _user_consume(self):
548 raise bt2.Stop
549
550 def _user_port_connected(self, port, other_port):
551 self._add_input_port('taste')
552
553 with self.assertRaises(TypeError):
554 self._graph.add_port_added_listener(1234)
555 with self.assertRaises(TypeError):
556 self._graph.add_ports_connected_listener(1234)
557
558 def test_raise_in_component_init(self):
559 class MySink(bt2._UserSinkComponent):
560 def __init__(self, params, obj):
561 raise ValueError('oops!')
562
563 def _user_consume(self):
564 raise bt2.Stop
565
566 graph = bt2.Graph()
567
568 with self.assertRaises(bt2._Error):
569 graph.add_component(MySink, 'comp')
570
571 def test_raise_in_port_added_listener(self):
572 class MySink(bt2._UserSinkComponent):
573 def __init__(self, params, obj):
574 self._add_input_port('in')
575
576 def _user_consume(self):
577 raise bt2.Stop
578
579 def port_added_listener(component, port):
580 raise ValueError('oh noes!')
581
582 graph = bt2.Graph()
583 graph.add_port_added_listener(port_added_listener)
584
585 with self.assertRaises(bt2._Error):
586 graph.add_component(MySink, 'comp')
587
588 def test_raise_in_ports_connected_listener(self):
589 class MyIter(bt2._UserMessageIterator):
590 def __next__(self):
591 raise bt2.Stop
592
593 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
594 def __init__(self, params, obj):
595 self._add_output_port('out')
596
597 class MySink(bt2._UserSinkComponent):
598 def __init__(self, params, obj):
599 self._add_input_port('in')
600
601 def _user_consume(self):
602 raise bt2.Stop
603
604 def ports_connected_listener(
605 upstream_component, upstream_port, downstream_component, downstream_port
606 ):
607 raise ValueError('oh noes!')
608
609 graph = bt2.Graph()
610 graph.add_ports_connected_listener(ports_connected_listener)
611 up = graph.add_component(MySource, 'down')
612 down = graph.add_component(MySink, 'up')
613
614 with self.assertRaises(bt2._Error):
615 graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
This page took 0.06109 seconds and 4 git commands to generate.