85b6bbde32618de167b3828278c27c37da0471a6
[babeltrace.git] / tests / bindings / python / bt2 / test_graph.py
1 from bt2 import value
2 import collections
3 import unittest
4 import copy
5 import bt2
6
7
8 class _MyIter(bt2._UserMessageIterator):
9 def __init__(self, self_output_port):
10 self._build_meta()
11 self._at = 0
12
13 def _build_meta(self):
14 self._tc = self._component._create_trace_class()
15 self._t = self._tc()
16 self._sc = self._tc.create_stream_class()
17 self._ec = self._sc.create_event_class(name='salut')
18 self._my_int_ft = self._tc.create_signed_integer_field_class(32)
19 payload_ft = self._tc.create_structure_field_class()
20 payload_ft += collections.OrderedDict([
21 ('my_int', self._my_int_ft),
22 ])
23 self._ec.payload_field_type = payload_ft
24 self._stream = self._t.create_stream(self._sc)
25 self._packet = self._stream.create_packet()
26
27 def _create_event(self, value):
28 ev = self._ec()
29 ev.payload_field['my_int'] = value
30 ev.packet = self._packet
31 return ev
32
33
34 class GraphTestCase(unittest.TestCase):
35 def setUp(self):
36 self._graph = bt2.Graph()
37
38 def tearDown(self):
39 del self._graph
40
41 def test_create_empty(self):
42 graph = bt2.Graph()
43
44 def test_add_component_user_cls(self):
45 class MySink(bt2._UserSinkComponent):
46 def _consume(self):
47 pass
48
49 comp = self._graph.add_component(MySink, 'salut')
50 self.assertEqual(comp.name, 'salut')
51
52 def test_add_component_gen_cls(self):
53 class MySink(bt2._UserSinkComponent):
54 def _consume(self):
55 pass
56
57 comp = self._graph.add_component(MySink, 'salut')
58 assert comp
59 comp2 = self._graph.add_component(comp.cls, 'salut2')
60 self.assertEqual(comp2.name, 'salut2')
61
62 def test_add_component_params(self):
63 comp_params = None
64
65 class MySink(bt2._UserSinkComponent):
66 def __init__(self, params):
67 nonlocal comp_params
68 comp_params = params
69
70 def _consume(self):
71 pass
72
73 params = {'hello': 23, 'path': '/path/to/stuff'}
74 comp = self._graph.add_component(MySink, 'salut', params)
75 self.assertEqual(params, comp_params)
76 del comp_params
77
78 def test_add_component_invalid_cls_type(self):
79 with self.assertRaises(TypeError):
80 self._graph.add_component(int, 'salut')
81
82 def test_add_component_invalid_logging_level_type(self):
83 class MySink(bt2._UserSinkComponent):
84 def _consume(self):
85 pass
86
87 with self.assertRaises(TypeError):
88 self._graph.add_component(MySink, 'salut', logging_level='yo')
89
90 def test_add_component_invalid_logging_level_value(self):
91 class MySink(bt2._UserSinkComponent):
92 def _consume(self):
93 pass
94
95 with self.assertRaises(ValueError):
96 self._graph.add_component(MySink, 'salut', logging_level=12345)
97
98 def test_add_component_logging_level(self):
99 class MySink(bt2._UserSinkComponent):
100 def _consume(self):
101 pass
102
103 comp = self._graph.add_component(MySink, 'salut',
104 logging_level=bt2.LoggingLevel.DEBUG)
105 self.assertEqual(comp.logging_level, bt2.LoggingLevel.DEBUG)
106
107 def test_connect_ports(self):
108 class MyIter(bt2._UserMessageIterator):
109 def __next__(self):
110 raise bt2.Stop
111
112 class MySource(bt2._UserSourceComponent,
113 message_iterator_class=MyIter):
114 def __init__(self, params):
115 self._add_output_port('out')
116
117 class MySink(bt2._UserSinkComponent):
118 def __init__(self, params):
119 self._add_input_port('in')
120
121 def _consume(self):
122 raise bt2.Stop
123
124 src = self._graph.add_component(MySource, 'src')
125 sink = self._graph.add_component(MySink, 'sink')
126
127 conn = self._graph.connect_ports(src.output_ports['out'],
128 sink.input_ports['in'])
129 self.assertTrue(src.output_ports['out'].is_connected)
130 self.assertTrue(sink.input_ports['in'].is_connected)
131 self.assertEqual(src.output_ports['out'].connection._ptr, conn._ptr)
132 self.assertEqual(sink.input_ports['in'].connection._ptr, conn._ptr)
133
134 def test_connect_ports_invalid_direction(self):
135 class MyIter(bt2._UserMessageIterator):
136 def __next__(self):
137 raise bt2.Stop
138
139 class MySource(bt2._UserSourceComponent,
140 message_iterator_class=MyIter):
141 def __init__(self, params):
142 self._add_output_port('out')
143
144 class MySink(bt2._UserSinkComponent):
145 def __init__(self, params):
146 self._add_input_port('in')
147
148 def _consume(self):
149 raise bt2.Stop
150
151 src = self._graph.add_component(MySource, 'src')
152 sink = self._graph.add_component(MySink, 'sink')
153
154 with self.assertRaises(TypeError):
155 conn = self._graph.connect_ports(sink.input_ports['in'],
156 src.output_ports['out'])
157
158 def test_connect_ports_refused(self):
159 class MyIter(bt2._UserMessageIterator):
160 def __next__(self):
161 raise bt2.Stop
162
163 class MySource(bt2._UserSourceComponent,
164 message_iterator_class=MyIter):
165 def __init__(self, params):
166 self._add_output_port('out')
167
168 class MySink(bt2._UserSinkComponent):
169 def __init__(self, params):
170 self._add_input_port('in')
171
172 def _consume(self):
173 raise bt2.Stop
174
175 def _accept_port_connection(self, port, other_port):
176 return False
177
178 src = self._graph.add_component(MySource, 'src')
179 sink = self._graph.add_component(MySink, 'sink')
180
181 with self.assertRaises(bt2.PortConnectionRefused):
182 conn = self._graph.connect_ports(src.output_ports['out'],
183 sink.input_ports['in'])
184
185 def test_cancel(self):
186 self.assertFalse(self._graph.is_canceled)
187 self._graph.cancel()
188 self.assertTrue(self._graph.is_canceled)
189
190 # Test that Graph.run() raises bt2.GraphCanceled if the graph gets canceled
191 # during execution.
192 def test_cancel_while_running(self):
193 class MyIter(_MyIter):
194 def __next__(self):
195 return self._create_stream_beginning_message(self._stream)
196
197 class MySource(bt2._UserSourceComponent,
198 message_iterator_class=MyIter):
199 def __init__(self, params):
200 self._add_output_port('out')
201
202 class MySink(bt2._UserSinkComponent):
203 def __init__(self, params):
204 self._add_input_port('in')
205
206 def _consume(self):
207 # Pretend that somebody asynchronously cancelled the graph.
208 nonlocal graph
209 graph.cancel()
210
211 return next(self._msg_iter)
212
213 def _graph_is_configured(self):
214 self._msg_iter = self._input_ports['in'].create_message_iterator()
215
216 graph = bt2.Graph()
217 up = graph.add_component(MySource, 'down')
218 down = graph.add_component(MySink, 'up')
219 graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
220 with self.assertRaises(bt2.GraphCanceled):
221 graph.run()
222
223 def test_run(self):
224 class MyIter(_MyIter):
225 def __next__(self):
226 if self._at == 9:
227 raise StopIteration
228
229 if self._at == 0:
230 msg = self._create_stream_beginning_message(self._stream)
231 elif self._at == 1:
232 msg = self._create_packet_beginning_message(self._packet)
233 elif self._at == 7:
234 msg = self._create_packet_end_message(self._packet)
235 elif self._at == 8:
236 msg = self._create_stream_end_message(self._stream)
237 else:
238 msg = self._create_event_message(self._ec, self._packet)
239
240 self._at += 1
241 return msg
242
243 class MySource(bt2._UserSourceComponent,
244 message_iterator_class=MyIter):
245 def __init__(self, params):
246 self._add_output_port('out')
247
248 class MySink(bt2._UserSinkComponent):
249 def __init__(self, params):
250 self._input_port = self._add_input_port('in')
251 self._at = 0
252
253 def _consume(comp_self):
254 msg = next(comp_self._msg_iter)
255
256 if comp_self._at == 0:
257 self.assertIsInstance(msg, bt2.message._StreamBeginningMessage)
258 elif comp_self._at == 1:
259 self.assertIsInstance(msg, bt2.message._PacketBeginningMessage)
260 elif comp_self._at >= 2 and comp_self._at <= 6:
261 self.assertIsInstance(msg, bt2.message._EventMessage)
262 self.assertEqual(msg.event.cls.name, 'salut')
263 elif comp_self._at == 7:
264 self.assertIsInstance(msg, bt2.message._PacketEndMessage)
265 elif comp_self._at == 8:
266 self.assertIsInstance(msg, bt2.message._StreamEndMessage)
267
268 comp_self._at += 1
269
270 def _graph_is_configured(self):
271 self._msg_iter = self._input_port.create_message_iterator()
272
273 src = self._graph.add_component(MySource, 'src')
274 sink = self._graph.add_component(MySink, 'sink')
275 conn = self._graph.connect_ports(src.output_ports['out'],
276 sink.input_ports['in'])
277 self._graph.run()
278
279 def test_run_again(self):
280 class MyIter(_MyIter):
281 def __next__(self):
282 if self._at == 3:
283 raise bt2.TryAgain
284
285 if self._at == 0:
286 msg = self._create_stream_beginning_message(self._stream)
287 elif self._at == 1:
288 msg = self._create_packet_beginning_message(self._packet)
289 elif self._at == 2:
290 msg = self._create_event_message(self._ec, self._packet)
291
292 self._at += 1
293 return msg
294
295 class MySource(bt2._UserSourceComponent,
296 message_iterator_class=MyIter):
297 def __init__(self, params):
298 self._add_output_port('out')
299
300 class MySink(bt2._UserSinkComponent):
301 def __init__(self, params):
302 self._input_port = self._add_input_port('in')
303 self._at = 0
304
305 def _consume(comp_self):
306 msg = next(comp_self._msg_iter)
307 if comp_self._at == 0:
308 self.assertIsInstance(msg, bt2.message._StreamBeginningMessage)
309 elif comp_self._at == 1:
310 self.assertIsInstance(msg, bt2.message._PacketBeginningMessage)
311 elif comp_self._at == 2:
312 self.assertIsInstance(msg, bt2.message._EventMessage)
313 raise bt2.TryAgain
314 else:
315 pass
316
317 comp_self._at += 1
318
319 def _graph_is_configured(self):
320 self._msg_iter = self._input_port.create_message_iterator()
321
322 src = self._graph.add_component(MySource, 'src')
323 sink = self._graph.add_component(MySink, 'sink')
324 conn = self._graph.connect_ports(src.output_ports['out'],
325 sink.input_ports['in'])
326
327 with self.assertRaises(bt2.TryAgain):
328 self._graph.run()
329
330 def test_run_error(self):
331 raised_in_sink = False
332
333 class MyIter(_MyIter):
334 def __next__(self):
335 # If this gets called after the sink raised an exception, it is
336 # an error.
337 nonlocal raised_in_sink
338 assert raised_in_sink is False
339
340 if self._at == 0:
341 msg = self._create_stream_beginning_message(self._stream)
342 elif self._at == 1:
343 msg = self._create_packet_beginning_message(self._packet)
344 elif self._at == 2 or self._at == 3:
345 msg = self._create_event_message(self._ec, self._packet)
346 else:
347 raise bt2.TryAgain
348 self._at += 1
349 return msg
350
351 class MySource(bt2._UserSourceComponent,
352 message_iterator_class=MyIter):
353 def __init__(self, params):
354 self._add_output_port('out')
355
356 class MySink(bt2._UserSinkComponent):
357 def __init__(self, params):
358 self._input_port = self._add_input_port('in')
359 self._at = 0
360
361 def _consume(comp_self):
362 msg = next(comp_self._msg_iter)
363 if comp_self._at == 0:
364 self.assertIsInstance(msg, bt2.message._StreamBeginningMessage)
365 elif comp_self._at == 1:
366 self.assertIsInstance(msg, bt2.message._PacketBeginningMessage)
367 elif comp_self._at == 2:
368 self.assertIsInstance(msg, bt2.message._EventMessage)
369 elif comp_self._at == 3:
370 nonlocal raised_in_sink
371 raised_in_sink = True
372 raise RuntimeError('error!')
373
374 comp_self._at += 1
375
376 def _graph_is_configured(self):
377 self._msg_iter = self._input_port.create_message_iterator()
378
379 src = self._graph.add_component(MySource, 'src')
380 sink = self._graph.add_component(MySink, 'sink')
381 conn = self._graph.connect_ports(src.output_ports['out'],
382 sink.input_ports['in'])
383
384 with self.assertRaises(bt2.Error):
385 self._graph.run()
386
387 def test_listeners(self):
388 class MyIter(bt2._UserMessageIterator):
389 def __next__(self):
390 raise bt2.Stop
391
392 class MySource(bt2._UserSourceComponent,
393 message_iterator_class=MyIter):
394 def __init__(self, params):
395 self._add_output_port('out')
396 self._add_output_port('zero')
397
398 class MySink(bt2._UserSinkComponent):
399 def __init__(self, params):
400 self._add_input_port('in')
401
402 def _consume(self):
403 raise bt2.Stop
404
405 def _port_connected(self, port, other_port):
406 self._add_input_port('taste')
407
408 def port_added_listener(component, port):
409 nonlocal calls
410 calls.append((port_added_listener, component, port))
411
412 def ports_connected_listener(upstream_component, upstream_port,
413 downstream_component, downstream_port):
414 nonlocal calls
415 calls.append((ports_connected_listener,
416 upstream_component, upstream_port,
417 downstream_component, downstream_port))
418
419 calls = []
420 self._graph.add_port_added_listener(port_added_listener)
421 self._graph.add_ports_connected_listener(ports_connected_listener)
422 src = self._graph.add_component(MySource, 'src')
423 sink = self._graph.add_component(MySink, 'sink')
424 self._graph.connect_ports(src.output_ports['out'],
425 sink.input_ports['in'])
426
427 self.assertEqual(len(calls), 5)
428
429 self.assertIs(calls[0][0], port_added_listener)
430 self.assertEqual(calls[0][1].name, 'src')
431 self.assertEqual(calls[0][2].name, 'out')
432
433 self.assertIs(calls[1][0], port_added_listener)
434 self.assertEqual(calls[1][1].name, 'src')
435 self.assertEqual(calls[1][2].name, 'zero')
436
437 self.assertIs(calls[2][0], port_added_listener)
438 self.assertEqual(calls[2][1].name, 'sink')
439 self.assertEqual(calls[2][2].name, 'in')
440
441 self.assertIs(calls[3][0], port_added_listener)
442 self.assertEqual(calls[3][1].name, 'sink')
443 self.assertEqual(calls[3][2].name, 'taste')
444
445 self.assertIs(calls[4][0], ports_connected_listener)
446 self.assertEqual(calls[4][1].name, 'src')
447 self.assertEqual(calls[4][2].name, 'out')
448 self.assertEqual(calls[4][3].name, 'sink')
449 self.assertEqual(calls[4][4].name, 'in')
450
451 def test_invalid_listeners(self):
452 class MyIter(bt2._UserMessageIterator):
453 def __next__(self):
454 raise bt2.Stop
455
456 class MySource(bt2._UserSourceComponent,
457 message_iterator_class=MyIter):
458 def __init__(self, params):
459 self._add_output_port('out')
460 self._add_output_port('zero')
461
462 class MySink(bt2._UserSinkComponent):
463 def __init__(self, params):
464 self._add_input_port('in')
465
466 def _consume(self):
467 raise bt2.Stop
468
469 def _port_connected(self, port, other_port):
470 self._add_input_port('taste')
471
472 with self.assertRaises(TypeError):
473 self._graph.add_port_added_listener(1234)
474 with self.assertRaises(TypeError):
475 self._graph.add_ports_connected_listener(1234)
476
477 def test_raise_in_component_init(self):
478 class MySink(bt2._UserSinkComponent):
479 def __init__(self, params):
480 raise ValueError('oops!')
481
482 def _consume(self):
483 raise bt2.Stop
484
485 graph = bt2.Graph()
486
487 with self.assertRaises(bt2.Error):
488 graph.add_component(MySink, 'comp')
489
490 def test_raise_in_port_added_listener(self):
491 class MySink(bt2._UserSinkComponent):
492 def __init__(self, params):
493 self._add_input_port('in')
494
495 def _consume(self):
496 raise bt2.Stop
497
498 def port_added_listener(component, port):
499 raise ValueError('oh noes!')
500
501 graph = bt2.Graph()
502 graph.add_port_added_listener(port_added_listener)
503
504 with self.assertRaises(bt2.Error):
505 graph.add_component(MySink, 'comp')
506
507 def test_raise_in_ports_connected_listener(self):
508 class MyIter(bt2._UserMessageIterator):
509 def __next__(self):
510 raise bt2.Stop
511
512 class MySource(bt2._UserSourceComponent,
513 message_iterator_class=MyIter):
514 def __init__(self, params):
515 self._add_output_port('out')
516
517 class MySink(bt2._UserSinkComponent):
518 def __init__(self, params):
519 self._add_input_port('in')
520
521 def _consume(self):
522 raise bt2.Stop
523
524 def ports_connected_listener(upstream_component, upstream_port,
525 downstream_component, downstream_port):
526 raise ValueError('oh noes!')
527
528 graph = bt2.Graph()
529 graph.add_ports_connected_listener(ports_connected_listener)
530 up = graph.add_component(MySource, 'down')
531 down = graph.add_component(MySink, 'up')
532
533 with self.assertRaises(bt2.Error):
534 graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
This page took 0.056309 seconds and 3 git commands to generate.