da9c209ff68e44b76c7896df01321e127fdb0821
[babeltrace.git] / tests / bindings / python / bt2 / test_graph.py
1 #
2 # Copyright (C) 2019 EfficiOS Inc.
3 #
4 # This program is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License
6 # as published by the Free Software Foundation; only version 2
7 # of the License.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 #
18
19 from bt2 import value
20 import collections
21 import unittest
22 import copy
23 import bt2
24
25
26 class _MyIter(bt2._UserMessageIterator):
27 def __init__(self, self_output_port):
28 self._build_meta()
29 self._at = 0
30
31 def _build_meta(self):
32 self._tc = self._component._create_trace_class()
33 self._t = self._tc()
34 self._sc = self._tc.create_stream_class(supports_packets=True)
35 self._ec = self._sc.create_event_class(name='salut')
36 self._my_int_ft = self._tc.create_signed_integer_field_class(32)
37 payload_ft = self._tc.create_structure_field_class()
38 payload_ft += [
39 ('my_int', self._my_int_ft),
40 ]
41 self._ec.payload_field_type = payload_ft
42 self._stream = self._t.create_stream(self._sc)
43 self._packet = self._stream.create_packet()
44
45 def _create_event(self, value):
46 ev = self._ec()
47 ev.payload_field['my_int'] = value
48 ev.packet = self._packet
49 return ev
50
51
52 class GraphTestCase(unittest.TestCase):
53 def setUp(self):
54 self._graph = bt2.Graph()
55
56 def tearDown(self):
57 del self._graph
58
59 def test_create_empty(self):
60 graph = bt2.Graph()
61
62 def test_add_component_user_cls(self):
63 class MySink(bt2._UserSinkComponent):
64 def _consume(self):
65 pass
66 def _graph_is_configured(self):
67 pass
68
69 comp = self._graph.add_component(MySink, 'salut')
70 self.assertEqual(comp.name, 'salut')
71
72 def test_add_component_gen_cls(self):
73 class MySink(bt2._UserSinkComponent):
74 def _consume(self):
75 pass
76 def _graph_is_configured(self):
77 pass
78
79 comp = self._graph.add_component(MySink, 'salut')
80 assert comp
81 comp2 = self._graph.add_component(comp.cls, 'salut2')
82 self.assertEqual(comp2.name, 'salut2')
83
84 def test_add_component_params(self):
85 comp_params = None
86
87 class MySink(bt2._UserSinkComponent):
88 def __init__(self, params):
89 nonlocal comp_params
90 comp_params = params
91
92 def _consume(self):
93 pass
94 def _graph_is_configured(self):
95 pass
96
97 params = {'hello': 23, 'path': '/path/to/stuff'}
98 comp = self._graph.add_component(MySink, 'salut', params)
99 self.assertEqual(params, comp_params)
100 del comp_params
101
102 def test_add_component_invalid_cls_type(self):
103 with self.assertRaises(TypeError):
104 self._graph.add_component(int, 'salut')
105
106 def test_add_component_invalid_logging_level_type(self):
107 class MySink(bt2._UserSinkComponent):
108 def _consume(self):
109 pass
110 def _graph_is_configured(self):
111 pass
112
113 with self.assertRaises(TypeError):
114 self._graph.add_component(MySink, 'salut', logging_level='yo')
115
116 def test_add_component_invalid_logging_level_value(self):
117 class MySink(bt2._UserSinkComponent):
118 def _consume(self):
119 pass
120 def _graph_is_configured(self):
121 pass
122
123 with self.assertRaises(ValueError):
124 self._graph.add_component(MySink, 'salut', logging_level=12345)
125
126 def test_add_component_logging_level(self):
127 class MySink(bt2._UserSinkComponent):
128 def _consume(self):
129 pass
130 def _graph_is_configured(self):
131 pass
132
133 comp = self._graph.add_component(MySink, 'salut',
134 logging_level=bt2.LoggingLevel.DEBUG)
135 self.assertEqual(comp.logging_level, bt2.LoggingLevel.DEBUG)
136
137 def test_connect_ports(self):
138 class MyIter(bt2._UserMessageIterator):
139 def __next__(self):
140 raise bt2.Stop
141
142 class MySource(bt2._UserSourceComponent,
143 message_iterator_class=MyIter):
144 def __init__(self, params):
145 self._add_output_port('out')
146
147 class MySink(bt2._UserSinkComponent):
148 def __init__(self, params):
149 self._add_input_port('in')
150
151 def _consume(self):
152 raise bt2.Stop
153
154 def _graph_is_configured(self):
155 pass
156
157 src = self._graph.add_component(MySource, 'src')
158 sink = self._graph.add_component(MySink, 'sink')
159
160 conn = self._graph.connect_ports(src.output_ports['out'],
161 sink.input_ports['in'])
162 self.assertTrue(src.output_ports['out'].is_connected)
163 self.assertTrue(sink.input_ports['in'].is_connected)
164 self.assertEqual(src.output_ports['out'].connection.addr, conn.addr)
165 self.assertEqual(sink.input_ports['in'].connection.addr, conn.addr)
166
167 def test_connect_ports_invalid_direction(self):
168 class MyIter(bt2._UserMessageIterator):
169 def __next__(self):
170 raise bt2.Stop
171
172 class MySource(bt2._UserSourceComponent,
173 message_iterator_class=MyIter):
174 def __init__(self, params):
175 self._add_output_port('out')
176
177 class MySink(bt2._UserSinkComponent):
178 def __init__(self, params):
179 self._add_input_port('in')
180
181 def _consume(self):
182 raise bt2.Stop
183
184 def _graph_is_configured(self):
185 pass
186
187 src = self._graph.add_component(MySource, 'src')
188 sink = self._graph.add_component(MySink, 'sink')
189
190 with self.assertRaises(TypeError):
191 conn = self._graph.connect_ports(sink.input_ports['in'],
192 src.output_ports['out'])
193
194 def test_cancel(self):
195 self.assertFalse(self._graph.is_canceled)
196 self._graph.cancel()
197 self.assertTrue(self._graph.is_canceled)
198
199 # Test that Graph.run() raises bt2.Canceled if the graph gets canceled
200 # during execution.
201 def test_cancel_while_running(self):
202 class MyIter(_MyIter):
203 def __next__(self):
204 return self._create_stream_beginning_message(self._stream)
205
206 class MySource(bt2._UserSourceComponent,
207 message_iterator_class=MyIter):
208 def __init__(self, params):
209 self._add_output_port('out')
210
211 class MySink(bt2._UserSinkComponent):
212 def __init__(self, params):
213 self._add_input_port('in')
214
215 def _consume(self):
216 # Pretend that somebody asynchronously cancelled the graph.
217 nonlocal graph
218 graph.cancel()
219
220 return next(self._msg_iter)
221
222 def _graph_is_configured(self):
223 self._msg_iter = self._input_ports['in'].create_message_iterator()
224
225 graph = bt2.Graph()
226 up = graph.add_component(MySource, 'down')
227 down = graph.add_component(MySink, 'up')
228 graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
229 with self.assertRaises(bt2.Canceled):
230 graph.run()
231
232 def test_run(self):
233 class MyIter(_MyIter):
234 def __next__(self):
235 if self._at == 9:
236 raise StopIteration
237
238 if self._at == 0:
239 msg = self._create_stream_beginning_message(self._stream)
240 elif self._at == 1:
241 msg = self._create_packet_beginning_message(self._packet)
242 elif self._at == 7:
243 msg = self._create_packet_end_message(self._packet)
244 elif self._at == 8:
245 msg = self._create_stream_end_message(self._stream)
246 else:
247 msg = self._create_event_message(self._ec, self._packet)
248
249 self._at += 1
250 return msg
251
252 class MySource(bt2._UserSourceComponent,
253 message_iterator_class=MyIter):
254 def __init__(self, params):
255 self._add_output_port('out')
256
257 class MySink(bt2._UserSinkComponent):
258 def __init__(self, params):
259 self._input_port = self._add_input_port('in')
260 self._at = 0
261
262 def _consume(comp_self):
263 msg = next(comp_self._msg_iter)
264
265 if comp_self._at == 0:
266 self.assertIsInstance(msg, bt2.message._StreamBeginningMessage)
267 elif comp_self._at == 1:
268 self.assertIsInstance(msg, bt2.message._PacketBeginningMessage)
269 elif comp_self._at >= 2 and comp_self._at <= 6:
270 self.assertIsInstance(msg, bt2.message._EventMessage)
271 self.assertEqual(msg.event.cls.name, 'salut')
272 elif comp_self._at == 7:
273 self.assertIsInstance(msg, bt2.message._PacketEndMessage)
274 elif comp_self._at == 8:
275 self.assertIsInstance(msg, bt2.message._StreamEndMessage)
276
277 comp_self._at += 1
278
279 def _graph_is_configured(self):
280 self._msg_iter = self._input_port.create_message_iterator()
281
282 src = self._graph.add_component(MySource, 'src')
283 sink = self._graph.add_component(MySink, 'sink')
284 conn = self._graph.connect_ports(src.output_ports['out'],
285 sink.input_ports['in'])
286 self._graph.run()
287
288 def test_run_again(self):
289 class MyIter(_MyIter):
290 def __next__(self):
291 if self._at == 3:
292 raise bt2.TryAgain
293
294 if self._at == 0:
295 msg = self._create_stream_beginning_message(self._stream)
296 elif self._at == 1:
297 msg = self._create_packet_beginning_message(self._packet)
298 elif self._at == 2:
299 msg = self._create_event_message(self._ec, self._packet)
300
301 self._at += 1
302 return msg
303
304 class MySource(bt2._UserSourceComponent,
305 message_iterator_class=MyIter):
306 def __init__(self, params):
307 self._add_output_port('out')
308
309 class MySink(bt2._UserSinkComponent):
310 def __init__(self, params):
311 self._input_port = self._add_input_port('in')
312 self._at = 0
313
314 def _consume(comp_self):
315 msg = next(comp_self._msg_iter)
316 if comp_self._at == 0:
317 self.assertIsInstance(msg, bt2.message._StreamBeginningMessage)
318 elif comp_self._at == 1:
319 self.assertIsInstance(msg, bt2.message._PacketBeginningMessage)
320 elif comp_self._at == 2:
321 self.assertIsInstance(msg, bt2.message._EventMessage)
322 raise bt2.TryAgain
323 else:
324 pass
325
326 comp_self._at += 1
327
328 def _graph_is_configured(self):
329 self._msg_iter = self._input_port.create_message_iterator()
330
331 src = self._graph.add_component(MySource, 'src')
332 sink = self._graph.add_component(MySink, 'sink')
333 conn = self._graph.connect_ports(src.output_ports['out'],
334 sink.input_ports['in'])
335
336 with self.assertRaises(bt2.TryAgain):
337 self._graph.run()
338
339 def test_run_error(self):
340 raised_in_sink = False
341
342 class MyIter(_MyIter):
343 def __next__(self):
344 # If this gets called after the sink raised an exception, it is
345 # an error.
346 nonlocal raised_in_sink
347 assert raised_in_sink is False
348
349 if self._at == 0:
350 msg = self._create_stream_beginning_message(self._stream)
351 elif self._at == 1:
352 msg = self._create_packet_beginning_message(self._packet)
353 elif self._at == 2 or self._at == 3:
354 msg = self._create_event_message(self._ec, self._packet)
355 else:
356 raise bt2.TryAgain
357 self._at += 1
358 return msg
359
360 class MySource(bt2._UserSourceComponent,
361 message_iterator_class=MyIter):
362 def __init__(self, params):
363 self._add_output_port('out')
364
365 class MySink(bt2._UserSinkComponent):
366 def __init__(self, params):
367 self._input_port = self._add_input_port('in')
368 self._at = 0
369
370 def _consume(comp_self):
371 msg = next(comp_self._msg_iter)
372 if comp_self._at == 0:
373 self.assertIsInstance(msg, bt2.message._StreamBeginningMessage)
374 elif comp_self._at == 1:
375 self.assertIsInstance(msg, bt2.message._PacketBeginningMessage)
376 elif comp_self._at == 2:
377 self.assertIsInstance(msg, bt2.message._EventMessage)
378 elif comp_self._at == 3:
379 nonlocal raised_in_sink
380 raised_in_sink = True
381 raise RuntimeError('error!')
382
383 comp_self._at += 1
384
385 def _graph_is_configured(self):
386 self._msg_iter = self._input_port.create_message_iterator()
387
388 src = self._graph.add_component(MySource, 'src')
389 sink = self._graph.add_component(MySink, 'sink')
390 conn = self._graph.connect_ports(src.output_ports['out'],
391 sink.input_ports['in'])
392
393 with self.assertRaises(bt2.Error):
394 self._graph.run()
395
396 def test_listeners(self):
397 class MyIter(bt2._UserMessageIterator):
398 def __next__(self):
399 raise bt2.Stop
400
401 class MySource(bt2._UserSourceComponent,
402 message_iterator_class=MyIter):
403 def __init__(self, params):
404 self._add_output_port('out')
405 self._add_output_port('zero')
406
407 class MySink(bt2._UserSinkComponent):
408 def __init__(self, params):
409 self._add_input_port('in')
410
411 def _consume(self):
412 raise bt2.Stop
413
414 def _graph_is_configured(self):
415 pass
416
417 def _port_connected(self, port, other_port):
418 self._add_input_port('taste')
419
420 def port_added_listener(component, port):
421 nonlocal calls
422 calls.append((port_added_listener, component, port))
423
424 def ports_connected_listener(upstream_component, upstream_port,
425 downstream_component, downstream_port):
426 nonlocal calls
427 calls.append((ports_connected_listener,
428 upstream_component, upstream_port,
429 downstream_component, downstream_port))
430
431 calls = []
432 self._graph.add_port_added_listener(port_added_listener)
433 self._graph.add_ports_connected_listener(ports_connected_listener)
434 src = self._graph.add_component(MySource, 'src')
435 sink = self._graph.add_component(MySink, 'sink')
436 self._graph.connect_ports(src.output_ports['out'],
437 sink.input_ports['in'])
438
439 self.assertEqual(len(calls), 5)
440
441 self.assertIs(calls[0][0], port_added_listener)
442 self.assertEqual(calls[0][1].name, 'src')
443 self.assertEqual(calls[0][2].name, 'out')
444
445 self.assertIs(calls[1][0], port_added_listener)
446 self.assertEqual(calls[1][1].name, 'src')
447 self.assertEqual(calls[1][2].name, 'zero')
448
449 self.assertIs(calls[2][0], port_added_listener)
450 self.assertEqual(calls[2][1].name, 'sink')
451 self.assertEqual(calls[2][2].name, 'in')
452
453 self.assertIs(calls[3][0], port_added_listener)
454 self.assertEqual(calls[3][1].name, 'sink')
455 self.assertEqual(calls[3][2].name, 'taste')
456
457 self.assertIs(calls[4][0], ports_connected_listener)
458 self.assertEqual(calls[4][1].name, 'src')
459 self.assertEqual(calls[4][2].name, 'out')
460 self.assertEqual(calls[4][3].name, 'sink')
461 self.assertEqual(calls[4][4].name, 'in')
462
463 def test_invalid_listeners(self):
464 class MyIter(bt2._UserMessageIterator):
465 def __next__(self):
466 raise bt2.Stop
467
468 class MySource(bt2._UserSourceComponent,
469 message_iterator_class=MyIter):
470 def __init__(self, params):
471 self._add_output_port('out')
472 self._add_output_port('zero')
473
474 class MySink(bt2._UserSinkComponent):
475 def __init__(self, params):
476 self._add_input_port('in')
477
478 def _consume(self):
479 raise bt2.Stop
480
481 def _graph_is_configured(self):
482 pass
483
484 def _port_connected(self, port, other_port):
485 self._add_input_port('taste')
486
487 with self.assertRaises(TypeError):
488 self._graph.add_port_added_listener(1234)
489 with self.assertRaises(TypeError):
490 self._graph.add_ports_connected_listener(1234)
491
492 def test_raise_in_component_init(self):
493 class MySink(bt2._UserSinkComponent):
494 def __init__(self, params):
495 raise ValueError('oops!')
496
497 def _consume(self):
498 raise bt2.Stop
499
500 def _graph_is_configured(self):
501 pass
502
503 graph = bt2.Graph()
504
505 with self.assertRaises(bt2.Error):
506 graph.add_component(MySink, 'comp')
507
508 def test_raise_in_port_added_listener(self):
509 class MySink(bt2._UserSinkComponent):
510 def __init__(self, params):
511 self._add_input_port('in')
512
513 def _consume(self):
514 raise bt2.Stop
515
516 def _graph_is_configured(self):
517 pass
518
519 def port_added_listener(component, port):
520 raise ValueError('oh noes!')
521
522 graph = bt2.Graph()
523 graph.add_port_added_listener(port_added_listener)
524
525 with self.assertRaises(bt2.Error):
526 graph.add_component(MySink, 'comp')
527
528 def test_raise_in_ports_connected_listener(self):
529 class MyIter(bt2._UserMessageIterator):
530 def __next__(self):
531 raise bt2.Stop
532
533 class MySource(bt2._UserSourceComponent,
534 message_iterator_class=MyIter):
535 def __init__(self, params):
536 self._add_output_port('out')
537
538 class MySink(bt2._UserSinkComponent):
539 def __init__(self, params):
540 self._add_input_port('in')
541
542 def _consume(self):
543 raise bt2.Stop
544
545 def _graph_is_configured(self):
546 pass
547
548 def ports_connected_listener(upstream_component, upstream_port,
549 downstream_component, downstream_port):
550 raise ValueError('oh noes!')
551
552 graph = bt2.Graph()
553 graph.add_ports_connected_listener(ports_connected_listener)
554 up = graph.add_component(MySource, 'down')
555 down = graph.add_component(MySink, 'up')
556
557 with self.assertRaises(bt2.Error):
558 graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
This page took 0.040677 seconds and 3 git commands to generate.