bt2: remove BT CC entry from global HT in _UserComponentType.__del__()
[babeltrace.git] / tests / bindings / python / bt2 / test_graph.py
1 #
2 # Copyright (C) 2019 EfficiOS Inc.
3 #
4 # This program is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License
6 # as published by the Free Software Foundation; only version 2
7 # of the License.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 #
18
19 from bt2 import value
20 import collections
21 import unittest
22 import copy
23 import bt2
24
25
26 class _MyIter(bt2._UserMessageIterator):
27 def __init__(self, self_output_port):
28 self._build_meta()
29 self._at = 0
30
31 def _build_meta(self):
32 self._tc = self._component._create_trace_class()
33 self._t = self._tc()
34 self._sc = self._tc.create_stream_class(supports_packets=True)
35 self._ec = self._sc.create_event_class(name='salut')
36 self._my_int_ft = self._tc.create_signed_integer_field_class(32)
37 payload_ft = self._tc.create_structure_field_class()
38 payload_ft += [('my_int', self._my_int_ft)]
39 self._ec.payload_field_type = payload_ft
40 self._stream = self._t.create_stream(self._sc)
41 self._packet = self._stream.create_packet()
42
43 def _create_event(self, value):
44 ev = self._ec()
45 ev.payload_field['my_int'] = value
46 ev.packet = self._packet
47 return ev
48
49
50 class GraphTestCase(unittest.TestCase):
51 def setUp(self):
52 self._graph = bt2.Graph()
53
54 def tearDown(self):
55 del self._graph
56
57 def test_create_empty(self):
58 graph = bt2.Graph()
59
60 def test_add_component_user_cls(self):
61 class MySink(bt2._UserSinkComponent):
62 def _user_consume(self):
63 pass
64
65 comp = self._graph.add_component(MySink, 'salut')
66 self.assertEqual(comp.name, 'salut')
67
68 def test_add_component_gen_cls(self):
69 class MySink(bt2._UserSinkComponent):
70 def _user_consume(self):
71 pass
72
73 comp = self._graph.add_component(MySink, 'salut')
74 assert comp
75 comp2 = self._graph.add_component(comp.cls, 'salut2')
76 self.assertEqual(comp2.name, 'salut2')
77
78 def test_add_component_params(self):
79 comp_params = None
80
81 class MySink(bt2._UserSinkComponent):
82 def __init__(self, params):
83 nonlocal comp_params
84 comp_params = params
85
86 def _user_consume(self):
87 pass
88
89 params = {'hello': 23, 'path': '/path/to/stuff'}
90 comp = self._graph.add_component(MySink, 'salut', params)
91 self.assertEqual(params, comp_params)
92 del comp_params
93
94 def test_add_component_invalid_cls_type(self):
95 with self.assertRaises(TypeError):
96 self._graph.add_component(int, 'salut')
97
98 def test_add_component_invalid_logging_level_type(self):
99 class MySink(bt2._UserSinkComponent):
100 def _user_consume(self):
101 pass
102
103 with self.assertRaises(TypeError):
104 self._graph.add_component(MySink, 'salut', logging_level='yo')
105
106 def test_add_component_invalid_logging_level_value(self):
107 class MySink(bt2._UserSinkComponent):
108 def _user_consume(self):
109 pass
110
111 with self.assertRaises(ValueError):
112 self._graph.add_component(MySink, 'salut', logging_level=12345)
113
114 def test_add_component_logging_level(self):
115 class MySink(bt2._UserSinkComponent):
116 def _user_consume(self):
117 pass
118
119 comp = self._graph.add_component(
120 MySink, 'salut', logging_level=bt2.LoggingLevel.DEBUG
121 )
122 self.assertEqual(comp.logging_level, bt2.LoggingLevel.DEBUG)
123
124 def test_connect_ports(self):
125 class MyIter(bt2._UserMessageIterator):
126 def __next__(self):
127 raise bt2.Stop
128
129 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
130 def __init__(self, params):
131 self._add_output_port('out')
132
133 class MySink(bt2._UserSinkComponent):
134 def __init__(self, params):
135 self._add_input_port('in')
136
137 def _user_consume(self):
138 raise bt2.Stop
139
140 src = self._graph.add_component(MySource, 'src')
141 sink = self._graph.add_component(MySink, 'sink')
142
143 conn = self._graph.connect_ports(
144 src.output_ports['out'], sink.input_ports['in']
145 )
146 self.assertTrue(src.output_ports['out'].is_connected)
147 self.assertTrue(sink.input_ports['in'].is_connected)
148 self.assertEqual(src.output_ports['out'].connection.addr, conn.addr)
149 self.assertEqual(sink.input_ports['in'].connection.addr, conn.addr)
150
151 def test_connect_ports_invalid_direction(self):
152 class MyIter(bt2._UserMessageIterator):
153 def __next__(self):
154 raise bt2.Stop
155
156 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
157 def __init__(self, params):
158 self._add_output_port('out')
159
160 class MySink(bt2._UserSinkComponent):
161 def __init__(self, params):
162 self._add_input_port('in')
163
164 def _user_consume(self):
165 raise bt2.Stop
166
167 src = self._graph.add_component(MySource, 'src')
168 sink = self._graph.add_component(MySink, 'sink')
169
170 with self.assertRaises(TypeError):
171 conn = self._graph.connect_ports(
172 sink.input_ports['in'], src.output_ports['out']
173 )
174
175 def test_add_interrupter(self):
176 class MyIter(bt2._UserMessageIterator):
177 def __next__(self):
178 raise TypeError
179
180 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
181 def __init__(self, params):
182 self._add_output_port('out')
183
184 class MySink(bt2._UserSinkComponent):
185 def __init__(self, params):
186 self._add_input_port('in')
187
188 def _user_consume(self):
189 next(self._msg_iter)
190
191 def _user_graph_is_configured(self):
192 self._msg_iter = self._create_input_port_message_iterator(
193 self._input_ports['in']
194 )
195
196 # add two interrupters, set one of them
197 interrupter1 = bt2.Interrupter()
198 interrupter2 = bt2.Interrupter()
199 self._graph.add_interrupter(interrupter1)
200 src = self._graph.add_component(MySource, 'src')
201 sink = self._graph.add_component(MySink, 'sink')
202 self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
203 self._graph.add_interrupter(interrupter2)
204
205 with self.assertRaises(bt2._Error):
206 self._graph.run()
207
208 interrupter2.set()
209
210 with self.assertRaises(bt2.TryAgain):
211 self._graph.run()
212
213 interrupter2.reset()
214
215 with self.assertRaises(bt2._Error):
216 self._graph.run()
217
218 # Test that Graph.run() raises bt2.Interrupted if the graph gets
219 # interrupted during execution.
220 def test_interrupt_while_running(self):
221 class MyIter(_MyIter):
222 def __next__(self):
223 return self._create_stream_beginning_message(self._stream)
224
225 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
226 def __init__(self, params):
227 self._add_output_port('out')
228
229 class MySink(bt2._UserSinkComponent):
230 def __init__(self, params):
231 self._add_input_port('in')
232
233 def _user_consume(self):
234 # Pretend that somebody asynchronously interrupted the graph.
235 nonlocal graph
236 graph.interrupt()
237 return next(self._msg_iter)
238
239 def _user_graph_is_configured(self):
240 self._msg_iter = self._create_input_port_message_iterator(
241 self._input_ports['in']
242 )
243
244 graph = self._graph
245 up = self._graph.add_component(MySource, 'down')
246 down = self._graph.add_component(MySink, 'up')
247 self._graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
248
249 with self.assertRaises(bt2.TryAgain):
250 self._graph.run()
251
252 def test_run(self):
253 class MyIter(_MyIter):
254 def __next__(self):
255 if self._at == 9:
256 raise StopIteration
257
258 if self._at == 0:
259 msg = self._create_stream_beginning_message(self._stream)
260 elif self._at == 1:
261 msg = self._create_packet_beginning_message(self._packet)
262 elif self._at == 7:
263 msg = self._create_packet_end_message(self._packet)
264 elif self._at == 8:
265 msg = self._create_stream_end_message(self._stream)
266 else:
267 msg = self._create_event_message(self._ec, self._packet)
268
269 self._at += 1
270 return msg
271
272 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
273 def __init__(self, params):
274 self._add_output_port('out')
275
276 class MySink(bt2._UserSinkComponent):
277 def __init__(self, params):
278 self._input_port = self._add_input_port('in')
279 self._at = 0
280
281 def _user_consume(comp_self):
282 msg = next(comp_self._msg_iter)
283
284 if comp_self._at == 0:
285 self.assertIsInstance(msg, bt2._StreamBeginningMessage)
286 elif comp_self._at == 1:
287 self.assertIsInstance(msg, bt2._PacketBeginningMessage)
288 elif comp_self._at >= 2 and comp_self._at <= 6:
289 self.assertIsInstance(msg, bt2._EventMessage)
290 self.assertEqual(msg.event.cls.name, 'salut')
291 elif comp_self._at == 7:
292 self.assertIsInstance(msg, bt2._PacketEndMessage)
293 elif comp_self._at == 8:
294 self.assertIsInstance(msg, bt2._StreamEndMessage)
295
296 comp_self._at += 1
297
298 def _user_graph_is_configured(self):
299 self._msg_iter = self._create_input_port_message_iterator(
300 self._input_port
301 )
302
303 src = self._graph.add_component(MySource, 'src')
304 sink = self._graph.add_component(MySink, 'sink')
305 conn = self._graph.connect_ports(
306 src.output_ports['out'], sink.input_ports['in']
307 )
308 self._graph.run()
309
310 def test_run_again(self):
311 class MyIter(_MyIter):
312 def __next__(self):
313 if self._at == 3:
314 raise bt2.TryAgain
315
316 if self._at == 0:
317 msg = self._create_stream_beginning_message(self._stream)
318 elif self._at == 1:
319 msg = self._create_packet_beginning_message(self._packet)
320 elif self._at == 2:
321 msg = self._create_event_message(self._ec, self._packet)
322
323 self._at += 1
324 return msg
325
326 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
327 def __init__(self, params):
328 self._add_output_port('out')
329
330 class MySink(bt2._UserSinkComponent):
331 def __init__(self, params):
332 self._input_port = self._add_input_port('in')
333 self._at = 0
334
335 def _user_consume(comp_self):
336 msg = next(comp_self._msg_iter)
337 if comp_self._at == 0:
338 self.assertIsInstance(msg, bt2._StreamBeginningMessage)
339 elif comp_self._at == 1:
340 self.assertIsInstance(msg, bt2._PacketBeginningMessage)
341 elif comp_self._at == 2:
342 self.assertIsInstance(msg, bt2._EventMessage)
343 raise bt2.TryAgain
344 else:
345 pass
346
347 comp_self._at += 1
348
349 def _user_graph_is_configured(self):
350 self._msg_iter = self._create_input_port_message_iterator(
351 self._input_port
352 )
353
354 src = self._graph.add_component(MySource, 'src')
355 sink = self._graph.add_component(MySink, 'sink')
356 conn = self._graph.connect_ports(
357 src.output_ports['out'], sink.input_ports['in']
358 )
359
360 with self.assertRaises(bt2.TryAgain):
361 self._graph.run()
362
363 def test_run_error(self):
364 raised_in_sink = False
365
366 class MyIter(_MyIter):
367 def __next__(self):
368 # If this gets called after the sink raised an exception, it is
369 # an error.
370 nonlocal raised_in_sink
371 assert raised_in_sink is False
372
373 if self._at == 0:
374 msg = self._create_stream_beginning_message(self._stream)
375 elif self._at == 1:
376 msg = self._create_packet_beginning_message(self._packet)
377 elif self._at == 2 or self._at == 3:
378 msg = self._create_event_message(self._ec, self._packet)
379 else:
380 raise bt2.TryAgain
381 self._at += 1
382 return msg
383
384 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
385 def __init__(self, params):
386 self._add_output_port('out')
387
388 class MySink(bt2._UserSinkComponent):
389 def __init__(self, params):
390 self._input_port = self._add_input_port('in')
391 self._at = 0
392
393 def _user_consume(comp_self):
394 msg = next(comp_self._msg_iter)
395 if comp_self._at == 0:
396 self.assertIsInstance(msg, bt2._StreamBeginningMessage)
397 elif comp_self._at == 1:
398 self.assertIsInstance(msg, bt2._PacketBeginningMessage)
399 elif comp_self._at == 2:
400 self.assertIsInstance(msg, bt2._EventMessage)
401 elif comp_self._at == 3:
402 nonlocal raised_in_sink
403 raised_in_sink = True
404 raise RuntimeError('error!')
405
406 comp_self._at += 1
407
408 def _user_graph_is_configured(self):
409 self._msg_iter = self._create_input_port_message_iterator(
410 self._input_port
411 )
412
413 src = self._graph.add_component(MySource, 'src')
414 sink = self._graph.add_component(MySink, 'sink')
415 conn = self._graph.connect_ports(
416 src.output_ports['out'], sink.input_ports['in']
417 )
418
419 with self.assertRaises(bt2._Error):
420 self._graph.run()
421
422 def test_listeners(self):
423 class MyIter(bt2._UserMessageIterator):
424 def __next__(self):
425 raise bt2.Stop
426
427 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
428 def __init__(self, params):
429 self._add_output_port('out')
430 self._add_output_port('zero')
431
432 class MySink(bt2._UserSinkComponent):
433 def __init__(self, params):
434 self._add_input_port('in')
435
436 def _user_consume(self):
437 raise bt2.Stop
438
439 def _user_port_connected(self, port, other_port):
440 self._add_input_port('taste')
441
442 def port_added_listener(component, port):
443 nonlocal calls
444 calls.append((port_added_listener, component, port))
445
446 def ports_connected_listener(
447 upstream_component, upstream_port, downstream_component, downstream_port
448 ):
449 nonlocal calls
450 calls.append(
451 (
452 ports_connected_listener,
453 upstream_component,
454 upstream_port,
455 downstream_component,
456 downstream_port,
457 )
458 )
459
460 calls = []
461 self._graph.add_port_added_listener(port_added_listener)
462 self._graph.add_ports_connected_listener(ports_connected_listener)
463 src = self._graph.add_component(MySource, 'src')
464 sink = self._graph.add_component(MySink, 'sink')
465 self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
466
467 self.assertEqual(len(calls), 5)
468
469 self.assertIs(calls[0][0], port_added_listener)
470 self.assertEqual(calls[0][1].name, 'src')
471 self.assertEqual(calls[0][2].name, 'out')
472
473 self.assertIs(calls[1][0], port_added_listener)
474 self.assertEqual(calls[1][1].name, 'src')
475 self.assertEqual(calls[1][2].name, 'zero')
476
477 self.assertIs(calls[2][0], port_added_listener)
478 self.assertEqual(calls[2][1].name, 'sink')
479 self.assertEqual(calls[2][2].name, 'in')
480
481 self.assertIs(calls[3][0], port_added_listener)
482 self.assertEqual(calls[3][1].name, 'sink')
483 self.assertEqual(calls[3][2].name, 'taste')
484
485 self.assertIs(calls[4][0], ports_connected_listener)
486 self.assertEqual(calls[4][1].name, 'src')
487 self.assertEqual(calls[4][2].name, 'out')
488 self.assertEqual(calls[4][3].name, 'sink')
489 self.assertEqual(calls[4][4].name, 'in')
490
491 def test_invalid_listeners(self):
492 class MyIter(bt2._UserMessageIterator):
493 def __next__(self):
494 raise bt2.Stop
495
496 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
497 def __init__(self, params):
498 self._add_output_port('out')
499 self._add_output_port('zero')
500
501 class MySink(bt2._UserSinkComponent):
502 def __init__(self, params):
503 self._add_input_port('in')
504
505 def _user_consume(self):
506 raise bt2.Stop
507
508 def _user_port_connected(self, port, other_port):
509 self._add_input_port('taste')
510
511 with self.assertRaises(TypeError):
512 self._graph.add_port_added_listener(1234)
513 with self.assertRaises(TypeError):
514 self._graph.add_ports_connected_listener(1234)
515
516 def test_raise_in_component_init(self):
517 class MySink(bt2._UserSinkComponent):
518 def __init__(self, params):
519 raise ValueError('oops!')
520
521 def _user_consume(self):
522 raise bt2.Stop
523
524 graph = bt2.Graph()
525
526 with self.assertRaises(bt2._Error):
527 graph.add_component(MySink, 'comp')
528
529 def test_raise_in_port_added_listener(self):
530 class MySink(bt2._UserSinkComponent):
531 def __init__(self, params):
532 self._add_input_port('in')
533
534 def _user_consume(self):
535 raise bt2.Stop
536
537 def port_added_listener(component, port):
538 raise ValueError('oh noes!')
539
540 graph = bt2.Graph()
541 graph.add_port_added_listener(port_added_listener)
542
543 with self.assertRaises(bt2._Error):
544 graph.add_component(MySink, 'comp')
545
546 def test_raise_in_ports_connected_listener(self):
547 class MyIter(bt2._UserMessageIterator):
548 def __next__(self):
549 raise bt2.Stop
550
551 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
552 def __init__(self, params):
553 self._add_output_port('out')
554
555 class MySink(bt2._UserSinkComponent):
556 def __init__(self, params):
557 self._add_input_port('in')
558
559 def _user_consume(self):
560 raise bt2.Stop
561
562 def ports_connected_listener(
563 upstream_component, upstream_port, downstream_component, downstream_port
564 ):
565 raise ValueError('oh noes!')
566
567 graph = bt2.Graph()
568 graph.add_ports_connected_listener(ports_connected_listener)
569 up = graph.add_component(MySource, 'down')
570 down = graph.add_component(MySink, 'up')
571
572 with self.assertRaises(bt2._Error):
573 graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
This page took 0.042015 seconds and 5 git commands to generate.