bt2: Adapt test_graph.py and make it pass
[babeltrace.git] / tests / bindings / python / bt2 / test_graph.py
1 from bt2 import value
2 import collections
3 import unittest
4 import copy
5 import bt2
6
7
8 class _MyIter(bt2._UserMessageIterator):
9 def __init__(self):
10 self._build_meta()
11 self._at = 0
12
13 def _build_meta(self):
14 self._tc = self._component._create_trace_class()
15 self._t = self._tc()
16 self._sc = self._tc.create_stream_class()
17 self._ec = self._sc.create_event_class(name='salut')
18 self._my_int_ft = self._tc.create_signed_integer_field_class(32)
19 payload_ft = self._tc.create_structure_field_class()
20 payload_ft += collections.OrderedDict([
21 ('my_int', self._my_int_ft),
22 ])
23 self._ec.payload_field_type = payload_ft
24 self._stream = self._t.create_stream(self._sc)
25 self._packet = self._stream.create_packet()
26
27 def _create_event(self, value):
28 ev = self._ec()
29 ev.payload_field['my_int'] = value
30 ev.packet = self._packet
31 return ev
32
33
34 class GraphTestCase(unittest.TestCase):
35 def setUp(self):
36 self._graph = bt2.Graph()
37
38 def tearDown(self):
39 del self._graph
40
41 def test_create_empty(self):
42 graph = bt2.Graph()
43
44 def test_add_component_user_cls(self):
45 class MySink(bt2._UserSinkComponent):
46 def _consume(self):
47 pass
48
49 comp = self._graph.add_component(MySink, 'salut')
50 self.assertEqual(comp.name, 'salut')
51
52 def test_add_component_gen_cls(self):
53 class MySink(bt2._UserSinkComponent):
54 def _consume(self):
55 pass
56
57 comp = self._graph.add_component(MySink, 'salut')
58 assert comp
59 comp2 = self._graph.add_component(comp.component_class, 'salut2')
60 self.assertEqual(comp2.name, 'salut2')
61
62 def test_add_component_params(self):
63 comp_params = None
64
65 class MySink(bt2._UserSinkComponent):
66 def __init__(self, params):
67 nonlocal comp_params
68 comp_params = params
69
70 def _consume(self):
71 pass
72
73 params = {'hello': 23, 'path': '/path/to/stuff'}
74 comp = self._graph.add_component(MySink, 'salut', params)
75 self.assertEqual(params, comp_params)
76 del comp_params
77
78 def test_add_component_invalid_cls_type(self):
79 with self.assertRaises(TypeError):
80 self._graph.add_component(int, 'salut')
81
82 def test_connect_ports(self):
83 class MyIter(bt2._UserMessageIterator):
84 def __next__(self):
85 raise bt2.Stop
86
87 class MySource(bt2._UserSourceComponent,
88 message_iterator_class=MyIter):
89 def __init__(self, params):
90 self._add_output_port('out')
91
92 class MySink(bt2._UserSinkComponent):
93 def __init__(self, params):
94 self._add_input_port('in')
95
96 def _consume(self):
97 raise bt2.Stop
98
99 src = self._graph.add_component(MySource, 'src')
100 sink = self._graph.add_component(MySink, 'sink')
101
102 conn = self._graph.connect_ports(src.output_ports['out'],
103 sink.input_ports['in'])
104 self.assertTrue(src.output_ports['out'].is_connected)
105 self.assertTrue(sink.input_ports['in'].is_connected)
106 self.assertEqual(src.output_ports['out'].connection._ptr, conn._ptr)
107 self.assertEqual(sink.input_ports['in'].connection._ptr, conn._ptr)
108
109 def test_connect_ports_invalid_direction(self):
110 class MyIter(bt2._UserMessageIterator):
111 def __next__(self):
112 raise bt2.Stop
113
114 class MySource(bt2._UserSourceComponent,
115 message_iterator_class=MyIter):
116 def __init__(self, params):
117 self._add_output_port('out')
118
119 class MySink(bt2._UserSinkComponent):
120 def __init__(self, params):
121 self._add_input_port('in')
122
123 def _consume(self):
124 raise bt2.Stop
125
126 src = self._graph.add_component(MySource, 'src')
127 sink = self._graph.add_component(MySink, 'sink')
128
129 with self.assertRaises(TypeError):
130 conn = self._graph.connect_ports(sink.input_ports['in'],
131 src.output_ports['out'])
132
133 def test_connect_ports_refused(self):
134 class MyIter(bt2._UserMessageIterator):
135 def __next__(self):
136 raise bt2.Stop
137
138 class MySource(bt2._UserSourceComponent,
139 message_iterator_class=MyIter):
140 def __init__(self, params):
141 self._add_output_port('out')
142
143 class MySink(bt2._UserSinkComponent):
144 def __init__(self, params):
145 self._add_input_port('in')
146
147 def _consume(self):
148 raise bt2.Stop
149
150 def _accept_port_connection(self, port, other_port):
151 return False
152
153 src = self._graph.add_component(MySource, 'src')
154 sink = self._graph.add_component(MySink, 'sink')
155
156 with self.assertRaises(bt2.PortConnectionRefused):
157 conn = self._graph.connect_ports(src.output_ports['out'],
158 sink.input_ports['in'])
159
160 def test_cancel(self):
161 self.assertFalse(self._graph.is_canceled)
162 self._graph.cancel()
163 self.assertTrue(self._graph.is_canceled)
164
165 # Test that Graph.run() raises bt2.GraphCanceled if the graph gets canceled
166 # during execution.
167 def test_cancel_while_running(self):
168 class MyIter(_MyIter):
169 def __next__(self):
170 return self._create_stream_beginning_message(self._stream)
171
172 class MySource(bt2._UserSourceComponent,
173 message_iterator_class=MyIter):
174 def __init__(self, params):
175 self._add_output_port('out')
176
177 class MySink(bt2._UserSinkComponent):
178 def __init__(self, params):
179 self._add_input_port('in')
180
181 def _consume(self):
182 # Pretend that somebody asynchronously cancelled the graph.
183 nonlocal graph
184 graph.cancel()
185
186 return next(self._msg_iter)
187
188 def _graph_is_configured(self):
189 self._msg_iter = self._input_ports['in'].create_message_iterator()
190
191 graph = bt2.Graph()
192 up = graph.add_component(MySource, 'down')
193 down = graph.add_component(MySink, 'up')
194 graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
195 with self.assertRaises(bt2.GraphCanceled):
196 graph.run()
197
198 def test_run(self):
199 class MyIter(_MyIter):
200 def __next__(self):
201 if self._at == 9:
202 raise StopIteration
203
204 if self._at == 0:
205 msg = self._create_stream_beginning_message(self._stream)
206 elif self._at == 1:
207 msg = self._create_packet_beginning_message(self._packet)
208 elif self._at == 7:
209 msg = self._create_packet_end_message(self._packet)
210 elif self._at == 8:
211 msg = self._create_stream_end_message(self._stream)
212 else:
213 msg = self._create_event_message(self._ec, self._packet)
214
215 self._at += 1
216 return msg
217
218 class MySource(bt2._UserSourceComponent,
219 message_iterator_class=MyIter):
220 def __init__(self, params):
221 self._add_output_port('out')
222
223 class MySink(bt2._UserSinkComponent):
224 def __init__(self, params):
225 self._input_port = self._add_input_port('in')
226 self._at = 0
227
228 def _consume(comp_self):
229 msg = next(comp_self._msg_iter)
230
231 if comp_self._at == 0:
232 self.assertIsInstance(msg, bt2.message._StreamBeginningMessage)
233 elif comp_self._at == 1:
234 self.assertIsInstance(msg, bt2.message._PacketBeginningMessage)
235 elif comp_self._at >= 2 and comp_self._at <= 6:
236 self.assertIsInstance(msg, bt2.message._EventMessage)
237 self.assertEqual(msg.event.event_class.name, 'salut')
238 elif comp_self._at == 7:
239 self.assertIsInstance(msg, bt2.message._PacketEndMessage)
240 elif comp_self._at == 8:
241 self.assertIsInstance(msg, bt2.message._StreamEndMessage)
242
243 comp_self._at += 1
244
245 def _graph_is_configured(self):
246 self._msg_iter = self._input_port.create_message_iterator()
247
248 src = self._graph.add_component(MySource, 'src')
249 sink = self._graph.add_component(MySink, 'sink')
250 conn = self._graph.connect_ports(src.output_ports['out'],
251 sink.input_ports['in'])
252 self._graph.run()
253
254 def test_run_again(self):
255 class MyIter(_MyIter):
256 def __next__(self):
257 if self._at == 3:
258 raise bt2.TryAgain
259
260 if self._at == 0:
261 msg = self._create_stream_beginning_message(self._stream)
262 elif self._at == 1:
263 msg = self._create_packet_beginning_message(self._packet)
264 elif self._at == 2:
265 msg = self._create_event_message(self._ec, self._packet)
266
267 self._at += 1
268 return msg
269
270 class MySource(bt2._UserSourceComponent,
271 message_iterator_class=MyIter):
272 def __init__(self, params):
273 self._add_output_port('out')
274
275 class MySink(bt2._UserSinkComponent):
276 def __init__(self, params):
277 self._input_port = self._add_input_port('in')
278 self._at = 0
279
280 def _consume(comp_self):
281 msg = next(comp_self._msg_iter)
282 if comp_self._at == 0:
283 self.assertIsInstance(msg, bt2.message._StreamBeginningMessage)
284 elif comp_self._at == 1:
285 self.assertIsInstance(msg, bt2.message._PacketBeginningMessage)
286 elif comp_self._at == 2:
287 self.assertIsInstance(msg, bt2.message._EventMessage)
288 raise bt2.TryAgain
289 else:
290 pass
291
292 comp_self._at += 1
293
294 def _graph_is_configured(self):
295 self._msg_iter = self._input_port.create_message_iterator()
296
297 src = self._graph.add_component(MySource, 'src')
298 sink = self._graph.add_component(MySink, 'sink')
299 conn = self._graph.connect_ports(src.output_ports['out'],
300 sink.input_ports['in'])
301
302 with self.assertRaises(bt2.TryAgain):
303 self._graph.run()
304
305 def test_run_error(self):
306 raised_in_sink = False
307
308 class MyIter(_MyIter):
309 def __next__(self):
310 # If this gets called after the sink raised an exception, it is
311 # an error.
312 nonlocal raised_in_sink
313 assert raised_in_sink is False
314
315 if self._at == 0:
316 msg = self._create_stream_beginning_message(self._stream)
317 elif self._at == 1:
318 msg = self._create_packet_beginning_message(self._packet)
319 elif self._at == 2 or self._at == 3:
320 msg = self._create_event_message(self._ec, self._packet)
321 else:
322 raise bt2.TryAgain
323 self._at += 1
324 return msg
325
326 class MySource(bt2._UserSourceComponent,
327 message_iterator_class=MyIter):
328 def __init__(self, params):
329 self._add_output_port('out')
330
331 class MySink(bt2._UserSinkComponent):
332 def __init__(self, params):
333 self._input_port = self._add_input_port('in')
334 self._at = 0
335
336 def _consume(comp_self):
337 msg = next(comp_self._msg_iter)
338 if comp_self._at == 0:
339 self.assertIsInstance(msg, bt2.message._StreamBeginningMessage)
340 elif comp_self._at == 1:
341 self.assertIsInstance(msg, bt2.message._PacketBeginningMessage)
342 elif comp_self._at == 2:
343 self.assertIsInstance(msg, bt2.message._EventMessage)
344 elif comp_self._at == 3:
345 nonlocal raised_in_sink
346 raised_in_sink = True
347 raise RuntimeError('error!')
348
349 comp_self._at += 1
350
351 def _graph_is_configured(self):
352 self._msg_iter = self._input_port.create_message_iterator()
353
354 src = self._graph.add_component(MySource, 'src')
355 sink = self._graph.add_component(MySink, 'sink')
356 conn = self._graph.connect_ports(src.output_ports['out'],
357 sink.input_ports['in'])
358
359 with self.assertRaises(bt2.Error):
360 self._graph.run()
361
362 def test_listeners(self):
363 class MyIter(bt2._UserMessageIterator):
364 def __next__(self):
365 raise bt2.Stop
366
367 class MySource(bt2._UserSourceComponent,
368 message_iterator_class=MyIter):
369 def __init__(self, params):
370 self._add_output_port('out')
371 self._add_output_port('zero')
372
373 class MySink(bt2._UserSinkComponent):
374 def __init__(self, params):
375 self._add_input_port('in')
376
377 def _consume(self):
378 raise bt2.Stop
379
380 def _port_connected(self, port, other_port):
381 self._add_input_port('taste')
382
383 def port_added_listener(component, port):
384 nonlocal calls
385 calls.append((port_added_listener, component, port))
386
387 def ports_connected_listener(upstream_component, upstream_port,
388 downstream_component, downstream_port):
389 nonlocal calls
390 calls.append((ports_connected_listener,
391 upstream_component, upstream_port,
392 downstream_component, downstream_port))
393
394 calls = []
395 self._graph.add_port_added_listener(port_added_listener)
396 self._graph.add_ports_connected_listener(ports_connected_listener)
397 src = self._graph.add_component(MySource, 'src')
398 sink = self._graph.add_component(MySink, 'sink')
399 self._graph.connect_ports(src.output_ports['out'],
400 sink.input_ports['in'])
401
402 self.assertEqual(len(calls), 5)
403
404 self.assertIs(calls[0][0], port_added_listener)
405 self.assertEqual(calls[0][1].name, 'src')
406 self.assertEqual(calls[0][2].name, 'out')
407
408 self.assertIs(calls[1][0], port_added_listener)
409 self.assertEqual(calls[1][1].name, 'src')
410 self.assertEqual(calls[1][2].name, 'zero')
411
412 self.assertIs(calls[2][0], port_added_listener)
413 self.assertEqual(calls[2][1].name, 'sink')
414 self.assertEqual(calls[2][2].name, 'in')
415
416 self.assertIs(calls[3][0], port_added_listener)
417 self.assertEqual(calls[3][1].name, 'sink')
418 self.assertEqual(calls[3][2].name, 'taste')
419
420 self.assertIs(calls[4][0], ports_connected_listener)
421 self.assertEqual(calls[4][1].name, 'src')
422 self.assertEqual(calls[4][2].name, 'out')
423 self.assertEqual(calls[4][3].name, 'sink')
424 self.assertEqual(calls[4][4].name, 'in')
425
426 def test_invalid_listeners(self):
427 class MyIter(bt2._UserMessageIterator):
428 def __next__(self):
429 raise bt2.Stop
430
431 class MySource(bt2._UserSourceComponent,
432 message_iterator_class=MyIter):
433 def __init__(self, params):
434 self._add_output_port('out')
435 self._add_output_port('zero')
436
437 class MySink(bt2._UserSinkComponent):
438 def __init__(self, params):
439 self._add_input_port('in')
440
441 def _consume(self):
442 raise bt2.Stop
443
444 def _port_connected(self, port, other_port):
445 self._add_input_port('taste')
446
447 with self.assertRaises(TypeError):
448 self._graph.add_port_added_listener(1234)
449 with self.assertRaises(TypeError):
450 self._graph.add_ports_connected_listener(1234)
451
452 def test_raise_in_component_init(self):
453 class MySink(bt2._UserSinkComponent):
454 def __init__(self, params):
455 raise ValueError('oops!')
456
457 def _consume(self):
458 raise bt2.Stop
459
460 graph = bt2.Graph()
461
462 with self.assertRaises(bt2.Error):
463 graph.add_component(MySink, 'comp')
464
465 def test_raise_in_port_added_listener(self):
466 class MySink(bt2._UserSinkComponent):
467 def __init__(self, params):
468 self._add_input_port('in')
469
470 def _consume(self):
471 raise bt2.Stop
472
473 def port_added_listener(component, port):
474 raise ValueError('oh noes!')
475
476 graph = bt2.Graph()
477 graph.add_port_added_listener(port_added_listener)
478
479 with self.assertRaises(bt2.Error):
480 graph.add_component(MySink, 'comp')
481
482 def test_raise_in_ports_connected_listener(self):
483 class MyIter(bt2._UserMessageIterator):
484 def __next__(self):
485 raise bt2.Stop
486
487 class MySource(bt2._UserSourceComponent,
488 message_iterator_class=MyIter):
489 def __init__(self, params):
490 self._add_output_port('out')
491
492 class MySink(bt2._UserSinkComponent):
493 def __init__(self, params):
494 self._add_input_port('in')
495
496 def _consume(self):
497 raise bt2.Stop
498
499 def ports_connected_listener(upstream_port, downstream_port):
500 raise ValueError('oh noes!')
501
502 graph = bt2.Graph()
503 graph.add_ports_connected_listener(ports_connected_listener)
504 up = graph.add_component(MySource, 'down')
505 down = graph.add_component(MySink, 'up')
506
507 with self.assertRaises(bt2.Error):
508 graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
This page took 0.0403 seconds and 5 git commands to generate.