ff4a81985b8a375362743e51f9fbbfa5c2336552
[babeltrace.git] / tests / bindings / python / bt2 / test_graph.py
1 from bt2 import values
2 import collections
3 import unittest
4 import copy
5 import bt2
6
7
8 class GraphTestCase(unittest.TestCase):
9 def setUp(self):
10 self._graph = bt2.Graph()
11
12 def tearDown(self):
13 del self._graph
14
15 def test_create_empty(self):
16 graph = bt2.Graph()
17
18 def test_add_component_user_cls(self):
19 class MySink(bt2._UserSinkComponent):
20 def _consume(self):
21 pass
22
23 comp = self._graph.add_component(MySink, 'salut')
24 self.assertEqual(comp.name, 'salut')
25
26 def test_add_component_gen_cls(self):
27 class MySink(bt2._UserSinkComponent):
28 def _consume(self):
29 pass
30
31 comp = self._graph.add_component(MySink, 'salut')
32 assert(comp)
33 comp2 = self._graph.add_component(comp.component_class, 'salut2')
34 self.assertEqual(comp2.name, 'salut2')
35
36 def test_add_component_params(self):
37 comp_params = None
38
39 class MySink(bt2._UserSinkComponent):
40 def __init__(self, params):
41 nonlocal comp_params
42 comp_params = params
43
44 def _consume(self):
45 pass
46
47 params = {'hello': 23, 'path': '/path/to/stuff'}
48 comp = self._graph.add_component(MySink, 'salut', params)
49 self.assertEqual(params, comp_params)
50 del comp_params
51
52 def test_add_component_invalid_cls_type(self):
53 with self.assertRaises(TypeError):
54 self._graph.add_component(int, 'salut')
55
56 def test_connect_ports(self):
57 class MyIter(bt2._UserNotificationIterator):
58 def __next__(self):
59 raise bt2.Stop
60
61 class MySource(bt2._UserSourceComponent,
62 notification_iterator_class=MyIter):
63 def __init__(self, params):
64 self._add_output_port('out')
65
66 class MySink(bt2._UserSinkComponent):
67 def __init__(self, params):
68 self._add_input_port('in')
69
70 def _consume(self):
71 raise bt2.Stop
72
73 src = self._graph.add_component(MySource, 'src')
74 sink = self._graph.add_component(MySink, 'sink')
75 conn = self._graph.connect_ports(src.output_ports['out'],
76 sink.input_ports['in'])
77 self.assertTrue(src.output_ports['out'].is_connected)
78 self.assertTrue(sink.input_ports['in'].is_connected)
79 self.assertEqual(src.output_ports['out'].connection, conn)
80 self.assertEqual(sink.input_ports['in'].connection, conn)
81
82 def test_connect_ports_invalid_direction(self):
83 class MyIter(bt2._UserNotificationIterator):
84 def __next__(self):
85 raise bt2.Stop
86
87 class MySource(bt2._UserSourceComponent,
88 notification_iterator_class=MyIter):
89 def __init__(self, params):
90 self._add_output_port('out')
91
92 class MySink(bt2._UserSinkComponent):
93 def __init__(self, params):
94 self._add_input_port('in')
95
96 def _consume(self):
97 raise bt2.Stop
98
99 src = self._graph.add_component(MySource, 'src')
100 sink = self._graph.add_component(MySink, 'sink')
101
102 with self.assertRaises(TypeError):
103 conn = self._graph.connect_ports(sink.input_ports['in'],
104 src.output_ports['out'])
105
106 def test_connect_ports_refused(self):
107 class MyIter(bt2._UserNotificationIterator):
108 def __next__(self):
109 raise bt2.Stop
110
111 class MySource(bt2._UserSourceComponent,
112 notification_iterator_class=MyIter):
113 def __init__(self, params):
114 self._add_output_port('out')
115
116 class MySink(bt2._UserSinkComponent):
117 def __init__(self, params):
118 self._add_input_port('in')
119
120 def _consume(self):
121 raise bt2.Stop
122
123 def _accept_port_connection(self, port, other_port):
124 return False
125
126 src = self._graph.add_component(MySource, 'src')
127 sink = self._graph.add_component(MySink, 'sink')
128
129 with self.assertRaises(bt2.PortConnectionRefused):
130 conn = self._graph.connect_ports(src.output_ports['out'],
131 sink.input_ports['in'])
132
133 def test_connect_ports_canceled(self):
134 class MyIter(bt2._UserNotificationIterator):
135 def __next__(self):
136 raise bt2.Stop
137
138 class MySource(bt2._UserSourceComponent,
139 notification_iterator_class=MyIter):
140 def __init__(self, params):
141 self._add_output_port('out')
142
143 class MySink(bt2._UserSinkComponent):
144 def __init__(self, params):
145 self._add_input_port('in')
146
147 def _consume(self):
148 raise bt2.Stop
149
150 src = self._graph.add_component(MySource, 'src')
151 sink = self._graph.add_component(MySink, 'sink')
152 self._graph.cancel()
153
154 with self.assertRaises(bt2.GraphCanceled):
155 conn = self._graph.connect_ports(src.output_ports['out'],
156 sink.input_ports['in'])
157
158 def test_connect_ports_cannot_consume_accept(self):
159 class MyIter(bt2._UserNotificationIterator):
160 def __next__(self):
161 raise bt2.Stop
162
163 class MySource(bt2._UserSourceComponent,
164 notification_iterator_class=MyIter):
165 def __init__(self, params):
166 self._add_output_port('out')
167
168 class MySink(bt2._UserSinkComponent):
169 def __init__(self, params):
170 self._add_input_port('in')
171
172 def _consume(self):
173 raise bt2.Stop
174
175 def _accept_port_connection(self, port, other_port):
176 nonlocal exc
177
178 try:
179 self.graph.run()
180 except Exception as e:
181 exc = e
182
183 return True
184
185 exc = None
186 src = self._graph.add_component(MySource, 'src')
187 sink = self._graph.add_component(MySink, 'sink')
188 self._graph.connect_ports(src.output_ports['out'],
189 sink.input_ports['in'])
190 self.assertIs(type(exc), bt2.CannotConsumeGraph)
191
192 def test_connect_ports_cannot_consume_connected(self):
193 class MyIter(bt2._UserNotificationIterator):
194 def __next__(self):
195 raise bt2.Stop
196
197 class MySource(bt2._UserSourceComponent,
198 notification_iterator_class=MyIter):
199 def __init__(self, params):
200 self._add_output_port('out')
201
202 class MySink(bt2._UserSinkComponent):
203 def __init__(self, params):
204 self._add_input_port('in')
205
206 def _consume(self):
207 raise bt2.Stop
208
209 def _port_connected(self, port, other_port):
210 nonlocal exc
211
212 try:
213 self.graph.run()
214 except Exception as e:
215 exc = e
216
217 return True
218
219 exc = None
220 src = self._graph.add_component(MySource, 'src')
221 sink = self._graph.add_component(MySink, 'sink')
222 self._graph.connect_ports(src.output_ports['out'],
223 sink.input_ports['in'])
224 self._graph.run()
225 self.assertIs(type(exc), bt2.CannotConsumeGraph)
226
227 def test_cancel(self):
228 self.assertFalse(self._graph.is_canceled)
229 self._graph.cancel()
230 self.assertTrue(self._graph.is_canceled)
231
232 def test_run(self):
233 class MyIter(bt2._UserNotificationIterator):
234 def __init__(self):
235 self._build_meta()
236 self._at = 0
237
238 def _build_meta(self):
239 self._trace = bt2.Trace()
240 self._sc = bt2.StreamClass()
241 self._ec = bt2.EventClass('salut')
242 self._my_int_ft = bt2.IntegerFieldType(32)
243 self._ec.payload_field_type = bt2.StructureFieldType()
244 self._ec.payload_field_type += collections.OrderedDict([
245 ('my_int', self._my_int_ft),
246 ])
247 self._sc.add_event_class(self._ec)
248 self._trace.add_stream_class(self._sc)
249 self._stream = self._sc()
250 self._packet = self._stream.create_packet()
251
252 def _create_event(self, value):
253 ev = self._ec()
254 ev.payload_field['my_int'] = value
255 ev.packet = self._packet
256 return ev
257
258 def __next__(self):
259 if self._at == 5:
260 raise bt2.Stop
261
262 notif = bt2.EventNotification(self._create_event(self._at * 3))
263 self._at += 1
264 return notif
265
266 class MySource(bt2._UserSourceComponent,
267 notification_iterator_class=MyIter):
268 def __init__(self, params):
269 self._add_output_port('out')
270
271 class MySink(bt2._UserSinkComponent):
272 def __init__(self, params):
273 self._add_input_port('in')
274 self._at = 0
275
276 def _consume(comp_self):
277 notif = next(comp_self._notif_iter)
278
279 if comp_self._at == 0:
280 self.assertIsInstance(notif, bt2.StreamBeginningNotification)
281 elif comp_self._at == 1:
282 self.assertIsInstance(notif, bt2.PacketBeginningNotification)
283 elif comp_self._at >= 2 and comp_self._at <= 6:
284 self.assertIsInstance(notif, bt2.EventNotification)
285 self.assertEqual(notif.event.event_class.name, 'salut')
286 field = notif.event.payload_field['my_int']
287 self.assertEqual(field, (comp_self._at - 2) * 3)
288 elif comp_self._at == 7:
289 self.assertIsInstance(notif, bt2.PacketEndNotification)
290 elif comp_self._at == 8:
291 self.assertIsInstance(notif, bt2.StreamEndNotification)
292
293 comp_self._at += 1
294
295 def _port_connected(self, port, other_port):
296 self._notif_iter = port.connection.create_notification_iterator()
297
298 src = self._graph.add_component(MySource, 'src')
299 sink = self._graph.add_component(MySink, 'sink')
300 conn = self._graph.connect_ports(src.output_ports['out'],
301 sink.input_ports['in'])
302 self._graph.run()
303
304 def test_run_again(self):
305 class MyIter(bt2._UserNotificationIterator):
306 def __init__(self):
307 self._build_meta()
308 self._at = 0
309
310 def _build_meta(self):
311 self._trace = bt2.Trace()
312 self._sc = bt2.StreamClass()
313 self._ec = bt2.EventClass('salut')
314 self._my_int_ft = bt2.IntegerFieldType(32)
315 self._ec.payload_field_type = bt2.StructureFieldType()
316 self._ec.payload_field_type += collections.OrderedDict([
317 ('my_int', self._my_int_ft),
318 ])
319 self._sc.add_event_class(self._ec)
320 self._trace.add_stream_class(self._sc)
321 self._stream = self._sc()
322 self._packet = self._stream.create_packet()
323
324 def _create_event(self, value):
325 ev = self._ec()
326 ev.payload_field['my_int'] = value
327 ev.packet = self._packet
328 return ev
329
330 def __next__(self):
331 if self._at == 1:
332 raise bt2.TryAgain
333
334 notif = bt2.EventNotification(self._create_event(self._at * 3))
335 self._at += 1
336 return notif
337
338 class MySource(bt2._UserSourceComponent,
339 notification_iterator_class=MyIter):
340 def __init__(self, params):
341 self._add_output_port('out')
342
343 class MySink(bt2._UserSinkComponent):
344 def __init__(self, params):
345 self._add_input_port('in')
346 self._at = 0
347
348 def _consume(comp_self):
349 if comp_self._at == 0:
350 notif = next(comp_self._notif_iter)
351 self.assertIsInstance(notif, bt2.EventNotification)
352 elif comp_self._at == 1:
353 with self.assertRaises(bt2.TryAgain):
354 notif = next(comp_self._notif_iter)
355
356 raise bt2.TryAgain
357
358 comp_self._at += 1
359
360 def _port_connected(self, port, other_port):
361 types = [bt2.EventNotification]
362 self._notif_iter = port.connection.create_notification_iterator(types)
363
364 src = self._graph.add_component(MySource, 'src')
365 sink = self._graph.add_component(MySink, 'sink')
366 conn = self._graph.connect_ports(src.output_ports['out'],
367 sink.input_ports['in'])
368
369 with self.assertRaises(bt2.TryAgain):
370 self._graph.run()
371
372 def test_run_no_sink(self):
373 class MyIter(bt2._UserNotificationIterator):
374 pass
375
376 class MySource(bt2._UserSourceComponent,
377 notification_iterator_class=MyIter):
378 def __init__(self, params):
379 self._add_output_port('out')
380
381 class MyFilter(bt2._UserFilterComponent,
382 notification_iterator_class=MyIter):
383 def __init__(self, params):
384 self._add_output_port('out')
385 self._add_input_port('in')
386
387 src = self._graph.add_component(MySource, 'src')
388 flt = self._graph.add_component(MyFilter, 'flt')
389 conn = self._graph.connect_ports(src.output_ports['out'],
390 flt.input_ports['in'])
391
392 with self.assertRaises(bt2.NoSinkComponent):
393 self._graph.run()
394
395 def test_run_error(self):
396 class MyIter(bt2._UserNotificationIterator):
397 def __init__(self):
398 self._build_meta()
399 self._at = 0
400
401 def _build_meta(self):
402 self._trace = bt2.Trace()
403 self._sc = bt2.StreamClass()
404 self._ec = bt2.EventClass('salut')
405 self._my_int_ft = bt2.IntegerFieldType(32)
406 self._ec.payload_field_type = bt2.StructureFieldType()
407 self._ec.payload_field_type += collections.OrderedDict([
408 ('my_int', self._my_int_ft),
409 ])
410 self._sc.add_event_class(self._ec)
411 self._trace.add_stream_class(self._sc)
412 self._stream = self._sc()
413 self._packet = self._stream.create_packet()
414
415 def _create_event(self, value):
416 ev = self._ec()
417 ev.payload_field['my_int'] = value
418 ev.packet = self._packet
419 return ev
420
421 def __next__(self):
422 if self._at == 1:
423 raise bt2.TryAgain
424
425 notif = bt2.EventNotification(self._create_event(self._at * 3))
426 self._at += 1
427 return notif
428
429 class MySource(bt2._UserSourceComponent,
430 notification_iterator_class=MyIter):
431 def __init__(self, params):
432 self._add_output_port('out')
433
434 class MySink(bt2._UserSinkComponent):
435 def __init__(self, params):
436 self._add_input_port('in')
437 self._at = 0
438
439 def _consume(comp_self):
440 if comp_self._at == 0:
441 notif = next(comp_self._notif_iter)
442 self.assertIsInstance(notif, bt2.EventNotification)
443 elif comp_self._at == 1:
444 raise RuntimeError('error!')
445
446 comp_self._at += 1
447
448 def _port_connected(self, port, other_port):
449 types = [bt2.EventNotification]
450 self._notif_iter = port.connection.create_notification_iterator(types)
451
452 src = self._graph.add_component(MySource, 'src')
453 sink = self._graph.add_component(MySink, 'sink')
454 conn = self._graph.connect_ports(src.output_ports['out'],
455 sink.input_ports['in'])
456
457 with self.assertRaises(bt2.Error):
458 self._graph.run()
459
460 def test_run_cannot_consume(self):
461 class MyIter(bt2._UserNotificationIterator):
462 pass
463
464 class MySource(bt2._UserSourceComponent,
465 notification_iterator_class=MyIter):
466 def __init__(self, params):
467 self._add_output_port('out')
468
469 class MySink(bt2._UserSinkComponent):
470 def __init__(self, params):
471 self._add_input_port('in')
472 self._at = 0
473
474 def _consume(comp_self):
475 nonlocal exc
476
477 try:
478 print('going in')
479 comp_self.graph.run()
480 print('going out')
481 except Exception as e:
482 exc = e
483
484 raise bt2.Stop
485
486 exc = None
487 src = self._graph.add_component(MySource, 'src')
488 sink = self._graph.add_component(MySink, 'sink')
489 conn = self._graph.connect_ports(src.output_ports['out'],
490 sink.input_ports['in'])
491 self._graph.run()
492 self.assertIs(type(exc), bt2.CannotConsumeGraph)
493
494 def test_listeners(self):
495 class MyIter(bt2._UserNotificationIterator):
496 def __next__(self):
497 raise bt2.Stop
498
499 class MySource(bt2._UserSourceComponent,
500 notification_iterator_class=MyIter):
501 def __init__(self, params):
502 self._add_output_port('out')
503 self._add_output_port('zero')
504
505 def _port_connected(self, port, other_port):
506 self._output_ports['zero'].remove_from_component()
507
508 class MySink(bt2._UserSinkComponent):
509 def __init__(self, params):
510 self._add_input_port('in')
511
512 def _consume(self):
513 raise bt2.Stop
514
515 def _port_connected(self, port, other_port):
516 self._add_input_port('taste')
517
518 def _port_disconnected(self, port):
519 port.remove_from_component()
520
521 def port_added_listener(port):
522 nonlocal calls
523 calls.append((port_added_listener, port))
524
525 def port_removed_listener(port):
526 nonlocal calls
527 calls.append((port_removed_listener, port))
528
529 def ports_connected_listener(upstream_port, downstream_port):
530 nonlocal calls
531 calls.append((ports_connected_listener, upstream_port,
532 downstream_port))
533
534 def ports_disconnected_listener(upstream_comp, downstream_comp,
535 upstream_port, downstream_port):
536 nonlocal calls
537 calls.append((ports_disconnected_listener, upstream_comp,
538 downstream_comp, upstream_port, downstream_port))
539
540 calls = []
541 self._graph.add_listener(bt2.GraphListenerType.PORT_ADDED,
542 port_added_listener)
543 self._graph.add_listener(bt2.GraphListenerType.PORT_REMOVED,
544 port_removed_listener)
545 self._graph.add_listener(bt2.GraphListenerType.PORTS_CONNECTED,
546 ports_connected_listener)
547 self._graph.add_listener(bt2.GraphListenerType.PORTS_DISCONNECTED,
548 ports_disconnected_listener)
549 src = self._graph.add_component(MySource, 'src')
550 sink = self._graph.add_component(MySink, 'sink')
551 self._graph.connect_ports(src.output_ports['out'],
552 sink.input_ports['in'])
553 sink.input_ports['in'].disconnect()
554 self.assertIs(calls[0][0], port_added_listener)
555 self.assertEqual(calls[0][1].name, 'out')
556 self.assertIs(calls[1][0], port_added_listener)
557 self.assertEqual(calls[1][1].name, 'zero')
558 self.assertIs(calls[2][0], port_added_listener)
559 self.assertEqual(calls[2][1].name, 'in')
560 self.assertIs(calls[3][0], port_removed_listener)
561 self.assertEqual(calls[3][1].name, 'zero')
562 self.assertIs(calls[4][0], port_added_listener)
563 self.assertEqual(calls[4][1].name, 'taste')
564 self.assertIs(calls[5][0], ports_connected_listener)
565 self.assertEqual(calls[5][1].name, 'out')
566 self.assertEqual(calls[5][2].name, 'in')
567 self.assertIs(calls[6][0], port_removed_listener)
568 self.assertEqual(calls[6][1].name, 'in')
569 self.assertIs(calls[7][0], ports_disconnected_listener)
570 self.assertEqual(calls[7][1].name, 'src')
571 self.assertEqual(calls[7][2].name, 'sink')
572 self.assertEqual(calls[7][3].name, 'out')
573 self.assertEqual(calls[7][4].name, 'in')
574 del calls
This page took 0.041026 seconds and 3 git commands to generate.