Update Python bindings and tests to match the latest API
[babeltrace.git] / tests / bindings / python / bt2 / test_graph.py
CommitLineData
811644b8
PP
1from bt2 import values
2import collections
3import unittest
4import copy
5import bt2
6
7
8class GraphTestCase(unittest.TestCase):
9 def setUp(self):
10 self._graph = bt2.Graph()
11
12 def tearDown(self):
13 del self._graph
14
15 def test_create_empty(self):
16 graph = bt2.Graph()
17
18 def test_add_component_user_cls(self):
19 class MySink(bt2._UserSinkComponent):
20 def _consume(self):
21 pass
22
23 comp = self._graph.add_component(MySink, 'salut')
24 self.assertEqual(comp.name, 'salut')
25
26 def test_add_component_gen_cls(self):
27 class MySink(bt2._UserSinkComponent):
28 def _consume(self):
29 pass
30
31 comp = self._graph.add_component(MySink, 'salut')
32 assert(comp)
33 comp2 = self._graph.add_component(comp.component_class, 'salut2')
34 self.assertEqual(comp2.name, 'salut2')
35
36 def test_add_component_params(self):
37 comp_params = None
38
39 class MySink(bt2._UserSinkComponent):
40 def __init__(self, params):
41 nonlocal comp_params
42 comp_params = params
43
44 def _consume(self):
45 pass
46
47 params = {'hello': 23, 'path': '/path/to/stuff'}
48 comp = self._graph.add_component(MySink, 'salut', params)
49 self.assertEqual(params, comp_params)
50 del comp_params
51
52 def test_add_component_invalid_cls_type(self):
53 with self.assertRaises(TypeError):
54 self._graph.add_component(int, 'salut')
55
56 def test_connect_ports(self):
57 class MyIter(bt2._UserNotificationIterator):
58 def __next__(self):
59 raise bt2.Stop
60
61 class MySource(bt2._UserSourceComponent,
62 notification_iterator_class=MyIter):
63 def __init__(self, params):
64 self._add_output_port('out')
65
66 class MySink(bt2._UserSinkComponent):
67 def __init__(self, params):
68 self._add_input_port('in')
69
70 def _consume(self):
71 raise bt2.Stop
72
73 src = self._graph.add_component(MySource, 'src')
74 sink = self._graph.add_component(MySink, 'sink')
75 conn = self._graph.connect_ports(src.output_ports['out'],
76 sink.input_ports['in'])
77 self.assertTrue(src.output_ports['out'].is_connected)
78 self.assertTrue(sink.input_ports['in'].is_connected)
79 self.assertEqual(src.output_ports['out'].connection, conn)
80 self.assertEqual(sink.input_ports['in'].connection, conn)
81
82 def test_connect_ports_invalid_direction(self):
83 class MyIter(bt2._UserNotificationIterator):
84 def __next__(self):
85 raise bt2.Stop
86
87 class MySource(bt2._UserSourceComponent,
88 notification_iterator_class=MyIter):
89 def __init__(self, params):
90 self._add_output_port('out')
91
92 class MySink(bt2._UserSinkComponent):
93 def __init__(self, params):
94 self._add_input_port('in')
95
96 def _consume(self):
97 raise bt2.Stop
98
99 src = self._graph.add_component(MySource, 'src')
100 sink = self._graph.add_component(MySink, 'sink')
101
102 with self.assertRaises(TypeError):
103 conn = self._graph.connect_ports(sink.input_ports['in'],
104 src.output_ports['out'])
105
106 def test_connect_ports_refused(self):
107 class MyIter(bt2._UserNotificationIterator):
108 def __next__(self):
109 raise bt2.Stop
110
111 class MySource(bt2._UserSourceComponent,
112 notification_iterator_class=MyIter):
113 def __init__(self, params):
114 self._add_output_port('out')
115
116 class MySink(bt2._UserSinkComponent):
117 def __init__(self, params):
118 self._add_input_port('in')
119
120 def _consume(self):
121 raise bt2.Stop
122
123 def _accept_port_connection(self, port, other_port):
124 return False
125
126 src = self._graph.add_component(MySource, 'src')
127 sink = self._graph.add_component(MySink, 'sink')
128
129 with self.assertRaises(bt2.PortConnectionRefused):
130 conn = self._graph.connect_ports(src.output_ports['out'],
131 sink.input_ports['in'])
132
133 def test_connect_ports_canceled(self):
134 class MyIter(bt2._UserNotificationIterator):
135 def __next__(self):
136 raise bt2.Stop
137
138 class MySource(bt2._UserSourceComponent,
139 notification_iterator_class=MyIter):
140 def __init__(self, params):
141 self._add_output_port('out')
142
143 class MySink(bt2._UserSinkComponent):
144 def __init__(self, params):
145 self._add_input_port('in')
146
147 def _consume(self):
148 raise bt2.Stop
149
150 src = self._graph.add_component(MySource, 'src')
151 sink = self._graph.add_component(MySink, 'sink')
152 self._graph.cancel()
153
154 with self.assertRaises(bt2.GraphCanceled):
155 conn = self._graph.connect_ports(src.output_ports['out'],
156 sink.input_ports['in'])
157
158 def test_cancel(self):
159 self.assertFalse(self._graph.is_canceled)
160 self._graph.cancel()
161 self.assertTrue(self._graph.is_canceled)
162
163 def test_run(self):
164 class MyIter(bt2._UserNotificationIterator):
165 def __init__(self):
166 self._build_meta()
167 self._at = 0
168
169 def _build_meta(self):
170 self._trace = bt2.Trace()
171 self._sc = bt2.StreamClass()
172 self._ec = bt2.EventClass('salut')
173 self._my_int_ft = bt2.IntegerFieldType(32)
174 self._ec.payload_field_type = bt2.StructureFieldType()
175 self._ec.payload_field_type += collections.OrderedDict([
176 ('my_int', self._my_int_ft),
177 ])
178 self._sc.add_event_class(self._ec)
179 self._trace.add_stream_class(self._sc)
180 self._stream = self._sc()
181 self._packet = self._stream.create_packet()
182
183 def _create_event(self, value):
184 ev = self._ec()
185 ev.payload_field['my_int'] = value
186 ev.packet = self._packet
187 return ev
188
189 def __next__(self):
190 if self._at == 5:
191 raise bt2.Stop
192
193 notif = bt2.EventNotification(self._create_event(self._at * 3))
194 self._at += 1
195 return notif
196
197 class MySource(bt2._UserSourceComponent,
198 notification_iterator_class=MyIter):
199 def __init__(self, params):
200 self._add_output_port('out')
201
202 class MySink(bt2._UserSinkComponent):
203 def __init__(self, params):
204 self._add_input_port('in')
205 self._at = 0
206
207 def _consume(comp_self):
208 notif = next(comp_self._notif_iter)
209
210 if comp_self._at == 0:
211 self.assertIsInstance(notif, bt2.StreamBeginningNotification)
212 elif comp_self._at == 1:
213 self.assertIsInstance(notif, bt2.PacketBeginningNotification)
214 elif comp_self._at >= 2 and comp_self._at <= 6:
215 self.assertIsInstance(notif, bt2.EventNotification)
216 self.assertEqual(notif.event.event_class.name, 'salut')
217 field = notif.event.payload_field['my_int']
218 self.assertEqual(field, (comp_self._at - 2) * 3)
219 elif comp_self._at == 7:
220 self.assertIsInstance(notif, bt2.PacketEndNotification)
221 elif comp_self._at == 8:
222 self.assertIsInstance(notif, bt2.StreamEndNotification)
223
224 comp_self._at += 1
225
226 def _port_connected(self, port, other_port):
227 self._notif_iter = port.connection.create_notification_iterator()
228
229 src = self._graph.add_component(MySource, 'src')
230 sink = self._graph.add_component(MySink, 'sink')
231 conn = self._graph.connect_ports(src.output_ports['out'],
232 sink.input_ports['in'])
233 self._graph.run()
234
235 def test_run_again(self):
236 class MyIter(bt2._UserNotificationIterator):
237 def __init__(self):
238 self._build_meta()
239 self._at = 0
240
241 def _build_meta(self):
242 self._trace = bt2.Trace()
243 self._sc = bt2.StreamClass()
244 self._ec = bt2.EventClass('salut')
245 self._my_int_ft = bt2.IntegerFieldType(32)
246 self._ec.payload_field_type = bt2.StructureFieldType()
247 self._ec.payload_field_type += collections.OrderedDict([
248 ('my_int', self._my_int_ft),
249 ])
250 self._sc.add_event_class(self._ec)
251 self._trace.add_stream_class(self._sc)
252 self._stream = self._sc()
253 self._packet = self._stream.create_packet()
254
255 def _create_event(self, value):
256 ev = self._ec()
257 ev.payload_field['my_int'] = value
258 ev.packet = self._packet
259 return ev
260
261 def __next__(self):
262 if self._at == 1:
263 raise bt2.TryAgain
264
265 notif = bt2.EventNotification(self._create_event(self._at * 3))
266 self._at += 1
267 return notif
268
269 class MySource(bt2._UserSourceComponent,
270 notification_iterator_class=MyIter):
271 def __init__(self, params):
272 self._add_output_port('out')
273
274 class MySink(bt2._UserSinkComponent):
275 def __init__(self, params):
276 self._add_input_port('in')
277 self._at = 0
278
279 def _consume(comp_self):
280 if comp_self._at == 0:
281 notif = next(comp_self._notif_iter)
282 self.assertIsInstance(notif, bt2.EventNotification)
283 elif comp_self._at == 1:
284 with self.assertRaises(bt2.TryAgain):
285 notif = next(comp_self._notif_iter)
286
287 raise bt2.TryAgain
288
289 comp_self._at += 1
290
291 def _port_connected(self, port, other_port):
292 types = [bt2.EventNotification]
293 self._notif_iter = port.connection.create_notification_iterator(types)
294
295 src = self._graph.add_component(MySource, 'src')
296 sink = self._graph.add_component(MySink, 'sink')
297 conn = self._graph.connect_ports(src.output_ports['out'],
298 sink.input_ports['in'])
299
300 with self.assertRaises(bt2.TryAgain):
301 self._graph.run()
302
303 def test_run_no_sink(self):
304 class MyIter(bt2._UserNotificationIterator):
305 pass
306
307 class MySource(bt2._UserSourceComponent,
308 notification_iterator_class=MyIter):
309 def __init__(self, params):
310 self._add_output_port('out')
311
312 class MyFilter(bt2._UserFilterComponent,
313 notification_iterator_class=MyIter):
314 def __init__(self, params):
315 self._add_output_port('out')
316 self._add_input_port('in')
317
318 src = self._graph.add_component(MySource, 'src')
319 flt = self._graph.add_component(MyFilter, 'flt')
320 conn = self._graph.connect_ports(src.output_ports['out'],
321 flt.input_ports['in'])
322
323 with self.assertRaises(bt2.NoSinkComponent):
324 self._graph.run()
325
326 def test_run_error(self):
327 class MyIter(bt2._UserNotificationIterator):
328 def __init__(self):
329 self._build_meta()
330 self._at = 0
331
332 def _build_meta(self):
333 self._trace = bt2.Trace()
334 self._sc = bt2.StreamClass()
335 self._ec = bt2.EventClass('salut')
336 self._my_int_ft = bt2.IntegerFieldType(32)
337 self._ec.payload_field_type = bt2.StructureFieldType()
338 self._ec.payload_field_type += collections.OrderedDict([
339 ('my_int', self._my_int_ft),
340 ])
341 self._sc.add_event_class(self._ec)
342 self._trace.add_stream_class(self._sc)
343 self._stream = self._sc()
344 self._packet = self._stream.create_packet()
345
346 def _create_event(self, value):
347 ev = self._ec()
348 ev.payload_field['my_int'] = value
349 ev.packet = self._packet
350 return ev
351
352 def __next__(self):
353 if self._at == 1:
354 raise bt2.TryAgain
355
356 notif = bt2.EventNotification(self._create_event(self._at * 3))
357 self._at += 1
358 return notif
359
360 class MySource(bt2._UserSourceComponent,
361 notification_iterator_class=MyIter):
362 def __init__(self, params):
363 self._add_output_port('out')
364
365 class MySink(bt2._UserSinkComponent):
366 def __init__(self, params):
367 self._add_input_port('in')
368 self._at = 0
369
370 def _consume(comp_self):
371 if comp_self._at == 0:
372 notif = next(comp_self._notif_iter)
373 self.assertIsInstance(notif, bt2.EventNotification)
374 elif comp_self._at == 1:
375 raise RuntimeError('error!')
376
377 comp_self._at += 1
378
379 def _port_connected(self, port, other_port):
380 types = [bt2.EventNotification]
381 self._notif_iter = port.connection.create_notification_iterator(types)
382
383 src = self._graph.add_component(MySource, 'src')
384 sink = self._graph.add_component(MySink, 'sink')
385 conn = self._graph.connect_ports(src.output_ports['out'],
386 sink.input_ports['in'])
387
388 with self.assertRaises(bt2.Error):
389 self._graph.run()
390
391 def test_listeners(self):
392 class MyIter(bt2._UserNotificationIterator):
393 def __next__(self):
394 raise bt2.Stop
395
396 class MySource(bt2._UserSourceComponent,
397 notification_iterator_class=MyIter):
398 def __init__(self, params):
399 self._add_output_port('out')
400 self._add_output_port('zero')
401
402 def _port_connected(self, port, other_port):
403 self._output_ports['zero'].remove_from_component()
404
405 class MySink(bt2._UserSinkComponent):
406 def __init__(self, params):
407 self._add_input_port('in')
408
409 def _consume(self):
410 raise bt2.Stop
411
412 def _port_connected(self, port, other_port):
413 self._add_input_port('taste')
414
415 def _port_disconnected(self, port):
416 port.remove_from_component()
417
418 def port_added_listener(port):
419 nonlocal calls
420 calls.append((port_added_listener, port))
421
422 def port_removed_listener(port):
423 nonlocal calls
424 calls.append((port_removed_listener, port))
425
426 def ports_connected_listener(upstream_port, downstream_port):
427 nonlocal calls
428 calls.append((ports_connected_listener, upstream_port,
429 downstream_port))
430
431 def ports_disconnected_listener(upstream_comp, downstream_comp,
432 upstream_port, downstream_port):
433 nonlocal calls
434 calls.append((ports_disconnected_listener, upstream_comp,
435 downstream_comp, upstream_port, downstream_port))
436
437 calls = []
438 self._graph.add_listener(bt2.GraphListenerType.PORT_ADDED,
439 port_added_listener)
440 self._graph.add_listener(bt2.GraphListenerType.PORT_REMOVED,
441 port_removed_listener)
442 self._graph.add_listener(bt2.GraphListenerType.PORTS_CONNECTED,
443 ports_connected_listener)
444 self._graph.add_listener(bt2.GraphListenerType.PORTS_DISCONNECTED,
445 ports_disconnected_listener)
446 src = self._graph.add_component(MySource, 'src')
447 sink = self._graph.add_component(MySink, 'sink')
448 self._graph.connect_ports(src.output_ports['out'],
449 sink.input_ports['in'])
450 sink.input_ports['in'].disconnect()
451 self.assertIs(calls[0][0], port_added_listener)
452 self.assertEqual(calls[0][1].name, 'out')
453 self.assertIs(calls[1][0], port_added_listener)
454 self.assertEqual(calls[1][1].name, 'zero')
455 self.assertIs(calls[2][0], port_added_listener)
456 self.assertEqual(calls[2][1].name, 'in')
457 self.assertIs(calls[3][0], port_removed_listener)
458 self.assertEqual(calls[3][1].name, 'zero')
459 self.assertIs(calls[4][0], port_added_listener)
460 self.assertEqual(calls[4][1].name, 'taste')
461 self.assertIs(calls[5][0], ports_connected_listener)
462 self.assertEqual(calls[5][1].name, 'out')
463 self.assertEqual(calls[5][2].name, 'in')
464 self.assertIs(calls[6][0], port_removed_listener)
465 self.assertEqual(calls[6][1].name, 'in')
466 self.assertIs(calls[7][0], ports_disconnected_listener)
467 self.assertEqual(calls[7][1].name, 'src')
468 self.assertEqual(calls[7][2].name, 'sink')
469 self.assertEqual(calls[7][3].name, 'out')
470 self.assertEqual(calls[7][4].name, 'in')
471 del calls
This page took 0.038718 seconds and 4 git commands to generate.