Commit | Line | Data |
---|---|---|
811644b8 PP |
1 | from bt2 import values |
2 | import collections | |
3 | import unittest | |
4 | import copy | |
5 | import bt2 | |
6 | ||
7 | ||
8 | class GraphTestCase(unittest.TestCase): | |
9 | def setUp(self): | |
10 | self._graph = bt2.Graph() | |
11 | ||
12 | def tearDown(self): | |
13 | del self._graph | |
14 | ||
15 | def test_create_empty(self): | |
16 | graph = bt2.Graph() | |
17 | ||
18 | def test_add_component_user_cls(self): | |
19 | class MySink(bt2._UserSinkComponent): | |
20 | def _consume(self): | |
21 | pass | |
22 | ||
23 | comp = self._graph.add_component(MySink, 'salut') | |
24 | self.assertEqual(comp.name, 'salut') | |
25 | ||
26 | def test_add_component_gen_cls(self): | |
27 | class MySink(bt2._UserSinkComponent): | |
28 | def _consume(self): | |
29 | pass | |
30 | ||
31 | comp = self._graph.add_component(MySink, 'salut') | |
32 | assert(comp) | |
33 | comp2 = self._graph.add_component(comp.component_class, 'salut2') | |
34 | self.assertEqual(comp2.name, 'salut2') | |
35 | ||
36 | def test_add_component_params(self): | |
37 | comp_params = None | |
38 | ||
39 | class MySink(bt2._UserSinkComponent): | |
40 | def __init__(self, params): | |
41 | nonlocal comp_params | |
42 | comp_params = params | |
43 | ||
44 | def _consume(self): | |
45 | pass | |
46 | ||
47 | params = {'hello': 23, 'path': '/path/to/stuff'} | |
48 | comp = self._graph.add_component(MySink, 'salut', params) | |
49 | self.assertEqual(params, comp_params) | |
50 | del comp_params | |
51 | ||
52 | def test_add_component_invalid_cls_type(self): | |
53 | with self.assertRaises(TypeError): | |
54 | self._graph.add_component(int, 'salut') | |
55 | ||
56 | def test_connect_ports(self): | |
57 | class MyIter(bt2._UserNotificationIterator): | |
58 | def __next__(self): | |
59 | raise bt2.Stop | |
60 | ||
61 | class MySource(bt2._UserSourceComponent, | |
62 | notification_iterator_class=MyIter): | |
63 | def __init__(self, params): | |
64 | self._add_output_port('out') | |
65 | ||
66 | class MySink(bt2._UserSinkComponent): | |
67 | def __init__(self, params): | |
68 | self._add_input_port('in') | |
69 | ||
70 | def _consume(self): | |
71 | raise bt2.Stop | |
72 | ||
73 | src = self._graph.add_component(MySource, 'src') | |
74 | sink = self._graph.add_component(MySink, 'sink') | |
75 | conn = self._graph.connect_ports(src.output_ports['out'], | |
76 | sink.input_ports['in']) | |
77 | self.assertTrue(src.output_ports['out'].is_connected) | |
78 | self.assertTrue(sink.input_ports['in'].is_connected) | |
79 | self.assertEqual(src.output_ports['out'].connection, conn) | |
80 | self.assertEqual(sink.input_ports['in'].connection, conn) | |
81 | ||
82 | def test_connect_ports_invalid_direction(self): | |
83 | class MyIter(bt2._UserNotificationIterator): | |
84 | def __next__(self): | |
85 | raise bt2.Stop | |
86 | ||
87 | class MySource(bt2._UserSourceComponent, | |
88 | notification_iterator_class=MyIter): | |
89 | def __init__(self, params): | |
90 | self._add_output_port('out') | |
91 | ||
92 | class MySink(bt2._UserSinkComponent): | |
93 | def __init__(self, params): | |
94 | self._add_input_port('in') | |
95 | ||
96 | def _consume(self): | |
97 | raise bt2.Stop | |
98 | ||
99 | src = self._graph.add_component(MySource, 'src') | |
100 | sink = self._graph.add_component(MySink, 'sink') | |
101 | ||
102 | with self.assertRaises(TypeError): | |
103 | conn = self._graph.connect_ports(sink.input_ports['in'], | |
104 | src.output_ports['out']) | |
105 | ||
106 | def test_connect_ports_refused(self): | |
107 | class MyIter(bt2._UserNotificationIterator): | |
108 | def __next__(self): | |
109 | raise bt2.Stop | |
110 | ||
111 | class MySource(bt2._UserSourceComponent, | |
112 | notification_iterator_class=MyIter): | |
113 | def __init__(self, params): | |
114 | self._add_output_port('out') | |
115 | ||
116 | class MySink(bt2._UserSinkComponent): | |
117 | def __init__(self, params): | |
118 | self._add_input_port('in') | |
119 | ||
120 | def _consume(self): | |
121 | raise bt2.Stop | |
122 | ||
123 | def _accept_port_connection(self, port, other_port): | |
124 | return False | |
125 | ||
126 | src = self._graph.add_component(MySource, 'src') | |
127 | sink = self._graph.add_component(MySink, 'sink') | |
128 | ||
129 | with self.assertRaises(bt2.PortConnectionRefused): | |
130 | conn = self._graph.connect_ports(src.output_ports['out'], | |
131 | sink.input_ports['in']) | |
132 | ||
133 | def test_connect_ports_canceled(self): | |
134 | class MyIter(bt2._UserNotificationIterator): | |
135 | def __next__(self): | |
136 | raise bt2.Stop | |
137 | ||
138 | class MySource(bt2._UserSourceComponent, | |
139 | notification_iterator_class=MyIter): | |
140 | def __init__(self, params): | |
141 | self._add_output_port('out') | |
142 | ||
143 | class MySink(bt2._UserSinkComponent): | |
144 | def __init__(self, params): | |
145 | self._add_input_port('in') | |
146 | ||
147 | def _consume(self): | |
148 | raise bt2.Stop | |
149 | ||
150 | src = self._graph.add_component(MySource, 'src') | |
151 | sink = self._graph.add_component(MySink, 'sink') | |
152 | self._graph.cancel() | |
153 | ||
154 | with self.assertRaises(bt2.GraphCanceled): | |
155 | conn = self._graph.connect_ports(src.output_ports['out'], | |
156 | sink.input_ports['in']) | |
157 | ||
158 | def test_cancel(self): | |
159 | self.assertFalse(self._graph.is_canceled) | |
160 | self._graph.cancel() | |
161 | self.assertTrue(self._graph.is_canceled) | |
162 | ||
163 | def test_run(self): | |
164 | class MyIter(bt2._UserNotificationIterator): | |
165 | def __init__(self): | |
166 | self._build_meta() | |
167 | self._at = 0 | |
168 | ||
169 | def _build_meta(self): | |
170 | self._trace = bt2.Trace() | |
171 | self._sc = bt2.StreamClass() | |
172 | self._ec = bt2.EventClass('salut') | |
173 | self._my_int_ft = bt2.IntegerFieldType(32) | |
174 | self._ec.payload_field_type = bt2.StructureFieldType() | |
175 | self._ec.payload_field_type += collections.OrderedDict([ | |
176 | ('my_int', self._my_int_ft), | |
177 | ]) | |
178 | self._sc.add_event_class(self._ec) | |
179 | self._trace.add_stream_class(self._sc) | |
180 | self._stream = self._sc() | |
181 | self._packet = self._stream.create_packet() | |
182 | ||
183 | def _create_event(self, value): | |
184 | ev = self._ec() | |
185 | ev.payload_field['my_int'] = value | |
186 | ev.packet = self._packet | |
187 | return ev | |
188 | ||
189 | def __next__(self): | |
190 | if self._at == 5: | |
191 | raise bt2.Stop | |
192 | ||
193 | notif = bt2.EventNotification(self._create_event(self._at * 3)) | |
194 | self._at += 1 | |
195 | return notif | |
196 | ||
197 | class MySource(bt2._UserSourceComponent, | |
198 | notification_iterator_class=MyIter): | |
199 | def __init__(self, params): | |
200 | self._add_output_port('out') | |
201 | ||
202 | class MySink(bt2._UserSinkComponent): | |
203 | def __init__(self, params): | |
204 | self._add_input_port('in') | |
205 | self._at = 0 | |
206 | ||
207 | def _consume(comp_self): | |
208 | notif = next(comp_self._notif_iter) | |
209 | ||
210 | if comp_self._at == 0: | |
211 | self.assertIsInstance(notif, bt2.StreamBeginningNotification) | |
212 | elif comp_self._at == 1: | |
213 | self.assertIsInstance(notif, bt2.PacketBeginningNotification) | |
214 | elif comp_self._at >= 2 and comp_self._at <= 6: | |
215 | self.assertIsInstance(notif, bt2.EventNotification) | |
216 | self.assertEqual(notif.event.event_class.name, 'salut') | |
217 | field = notif.event.payload_field['my_int'] | |
218 | self.assertEqual(field, (comp_self._at - 2) * 3) | |
219 | elif comp_self._at == 7: | |
220 | self.assertIsInstance(notif, bt2.PacketEndNotification) | |
221 | elif comp_self._at == 8: | |
222 | self.assertIsInstance(notif, bt2.StreamEndNotification) | |
223 | ||
224 | comp_self._at += 1 | |
225 | ||
226 | def _port_connected(self, port, other_port): | |
227 | self._notif_iter = port.connection.create_notification_iterator() | |
228 | ||
229 | src = self._graph.add_component(MySource, 'src') | |
230 | sink = self._graph.add_component(MySink, 'sink') | |
231 | conn = self._graph.connect_ports(src.output_ports['out'], | |
232 | sink.input_ports['in']) | |
233 | self._graph.run() | |
234 | ||
235 | def test_run_again(self): | |
236 | class MyIter(bt2._UserNotificationIterator): | |
237 | def __init__(self): | |
238 | self._build_meta() | |
239 | self._at = 0 | |
240 | ||
241 | def _build_meta(self): | |
242 | self._trace = bt2.Trace() | |
243 | self._sc = bt2.StreamClass() | |
244 | self._ec = bt2.EventClass('salut') | |
245 | self._my_int_ft = bt2.IntegerFieldType(32) | |
246 | self._ec.payload_field_type = bt2.StructureFieldType() | |
247 | self._ec.payload_field_type += collections.OrderedDict([ | |
248 | ('my_int', self._my_int_ft), | |
249 | ]) | |
250 | self._sc.add_event_class(self._ec) | |
251 | self._trace.add_stream_class(self._sc) | |
252 | self._stream = self._sc() | |
253 | self._packet = self._stream.create_packet() | |
254 | ||
255 | def _create_event(self, value): | |
256 | ev = self._ec() | |
257 | ev.payload_field['my_int'] = value | |
258 | ev.packet = self._packet | |
259 | return ev | |
260 | ||
261 | def __next__(self): | |
262 | if self._at == 1: | |
263 | raise bt2.TryAgain | |
264 | ||
265 | notif = bt2.EventNotification(self._create_event(self._at * 3)) | |
266 | self._at += 1 | |
267 | return notif | |
268 | ||
269 | class MySource(bt2._UserSourceComponent, | |
270 | notification_iterator_class=MyIter): | |
271 | def __init__(self, params): | |
272 | self._add_output_port('out') | |
273 | ||
274 | class MySink(bt2._UserSinkComponent): | |
275 | def __init__(self, params): | |
276 | self._add_input_port('in') | |
277 | self._at = 0 | |
278 | ||
279 | def _consume(comp_self): | |
280 | if comp_self._at == 0: | |
281 | notif = next(comp_self._notif_iter) | |
282 | self.assertIsInstance(notif, bt2.EventNotification) | |
283 | elif comp_self._at == 1: | |
284 | with self.assertRaises(bt2.TryAgain): | |
285 | notif = next(comp_self._notif_iter) | |
286 | ||
287 | raise bt2.TryAgain | |
288 | ||
289 | comp_self._at += 1 | |
290 | ||
291 | def _port_connected(self, port, other_port): | |
292 | types = [bt2.EventNotification] | |
293 | self._notif_iter = port.connection.create_notification_iterator(types) | |
294 | ||
295 | src = self._graph.add_component(MySource, 'src') | |
296 | sink = self._graph.add_component(MySink, 'sink') | |
297 | conn = self._graph.connect_ports(src.output_ports['out'], | |
298 | sink.input_ports['in']) | |
299 | ||
300 | with self.assertRaises(bt2.TryAgain): | |
301 | self._graph.run() | |
302 | ||
303 | def test_run_no_sink(self): | |
304 | class MyIter(bt2._UserNotificationIterator): | |
305 | pass | |
306 | ||
307 | class MySource(bt2._UserSourceComponent, | |
308 | notification_iterator_class=MyIter): | |
309 | def __init__(self, params): | |
310 | self._add_output_port('out') | |
311 | ||
312 | class MyFilter(bt2._UserFilterComponent, | |
313 | notification_iterator_class=MyIter): | |
314 | def __init__(self, params): | |
315 | self._add_output_port('out') | |
316 | self._add_input_port('in') | |
317 | ||
318 | src = self._graph.add_component(MySource, 'src') | |
319 | flt = self._graph.add_component(MyFilter, 'flt') | |
320 | conn = self._graph.connect_ports(src.output_ports['out'], | |
321 | flt.input_ports['in']) | |
322 | ||
323 | with self.assertRaises(bt2.NoSinkComponent): | |
324 | self._graph.run() | |
325 | ||
326 | def test_run_error(self): | |
327 | class MyIter(bt2._UserNotificationIterator): | |
328 | def __init__(self): | |
329 | self._build_meta() | |
330 | self._at = 0 | |
331 | ||
332 | def _build_meta(self): | |
333 | self._trace = bt2.Trace() | |
334 | self._sc = bt2.StreamClass() | |
335 | self._ec = bt2.EventClass('salut') | |
336 | self._my_int_ft = bt2.IntegerFieldType(32) | |
337 | self._ec.payload_field_type = bt2.StructureFieldType() | |
338 | self._ec.payload_field_type += collections.OrderedDict([ | |
339 | ('my_int', self._my_int_ft), | |
340 | ]) | |
341 | self._sc.add_event_class(self._ec) | |
342 | self._trace.add_stream_class(self._sc) | |
343 | self._stream = self._sc() | |
344 | self._packet = self._stream.create_packet() | |
345 | ||
346 | def _create_event(self, value): | |
347 | ev = self._ec() | |
348 | ev.payload_field['my_int'] = value | |
349 | ev.packet = self._packet | |
350 | return ev | |
351 | ||
352 | def __next__(self): | |
353 | if self._at == 1: | |
354 | raise bt2.TryAgain | |
355 | ||
356 | notif = bt2.EventNotification(self._create_event(self._at * 3)) | |
357 | self._at += 1 | |
358 | return notif | |
359 | ||
360 | class MySource(bt2._UserSourceComponent, | |
361 | notification_iterator_class=MyIter): | |
362 | def __init__(self, params): | |
363 | self._add_output_port('out') | |
364 | ||
365 | class MySink(bt2._UserSinkComponent): | |
366 | def __init__(self, params): | |
367 | self._add_input_port('in') | |
368 | self._at = 0 | |
369 | ||
370 | def _consume(comp_self): | |
371 | if comp_self._at == 0: | |
372 | notif = next(comp_self._notif_iter) | |
373 | self.assertIsInstance(notif, bt2.EventNotification) | |
374 | elif comp_self._at == 1: | |
375 | raise RuntimeError('error!') | |
376 | ||
377 | comp_self._at += 1 | |
378 | ||
379 | def _port_connected(self, port, other_port): | |
380 | types = [bt2.EventNotification] | |
381 | self._notif_iter = port.connection.create_notification_iterator(types) | |
382 | ||
383 | src = self._graph.add_component(MySource, 'src') | |
384 | sink = self._graph.add_component(MySink, 'sink') | |
385 | conn = self._graph.connect_ports(src.output_ports['out'], | |
386 | sink.input_ports['in']) | |
387 | ||
388 | with self.assertRaises(bt2.Error): | |
389 | self._graph.run() | |
390 | ||
391 | def test_listeners(self): | |
392 | class MyIter(bt2._UserNotificationIterator): | |
393 | def __next__(self): | |
394 | raise bt2.Stop | |
395 | ||
396 | class MySource(bt2._UserSourceComponent, | |
397 | notification_iterator_class=MyIter): | |
398 | def __init__(self, params): | |
399 | self._add_output_port('out') | |
400 | self._add_output_port('zero') | |
401 | ||
402 | def _port_connected(self, port, other_port): | |
403 | self._output_ports['zero'].remove_from_component() | |
404 | ||
405 | class MySink(bt2._UserSinkComponent): | |
406 | def __init__(self, params): | |
407 | self._add_input_port('in') | |
408 | ||
409 | def _consume(self): | |
410 | raise bt2.Stop | |
411 | ||
412 | def _port_connected(self, port, other_port): | |
413 | self._add_input_port('taste') | |
414 | ||
415 | def _port_disconnected(self, port): | |
416 | port.remove_from_component() | |
417 | ||
418 | def port_added_listener(port): | |
419 | nonlocal calls | |
420 | calls.append((port_added_listener, port)) | |
421 | ||
422 | def port_removed_listener(port): | |
423 | nonlocal calls | |
424 | calls.append((port_removed_listener, port)) | |
425 | ||
426 | def ports_connected_listener(upstream_port, downstream_port): | |
427 | nonlocal calls | |
428 | calls.append((ports_connected_listener, upstream_port, | |
429 | downstream_port)) | |
430 | ||
431 | def ports_disconnected_listener(upstream_comp, downstream_comp, | |
432 | upstream_port, downstream_port): | |
433 | nonlocal calls | |
434 | calls.append((ports_disconnected_listener, upstream_comp, | |
435 | downstream_comp, upstream_port, downstream_port)) | |
436 | ||
437 | calls = [] | |
438 | self._graph.add_listener(bt2.GraphListenerType.PORT_ADDED, | |
439 | port_added_listener) | |
440 | self._graph.add_listener(bt2.GraphListenerType.PORT_REMOVED, | |
441 | port_removed_listener) | |
442 | self._graph.add_listener(bt2.GraphListenerType.PORTS_CONNECTED, | |
443 | ports_connected_listener) | |
444 | self._graph.add_listener(bt2.GraphListenerType.PORTS_DISCONNECTED, | |
445 | ports_disconnected_listener) | |
446 | src = self._graph.add_component(MySource, 'src') | |
447 | sink = self._graph.add_component(MySink, 'sink') | |
448 | self._graph.connect_ports(src.output_ports['out'], | |
449 | sink.input_ports['in']) | |
450 | sink.input_ports['in'].disconnect() | |
451 | self.assertIs(calls[0][0], port_added_listener) | |
452 | self.assertEqual(calls[0][1].name, 'out') | |
453 | self.assertIs(calls[1][0], port_added_listener) | |
454 | self.assertEqual(calls[1][1].name, 'zero') | |
455 | self.assertIs(calls[2][0], port_added_listener) | |
456 | self.assertEqual(calls[2][1].name, 'in') | |
457 | self.assertIs(calls[3][0], port_removed_listener) | |
458 | self.assertEqual(calls[3][1].name, 'zero') | |
459 | self.assertIs(calls[4][0], port_added_listener) | |
460 | self.assertEqual(calls[4][1].name, 'taste') | |
461 | self.assertIs(calls[5][0], ports_connected_listener) | |
462 | self.assertEqual(calls[5][1].name, 'out') | |
463 | self.assertEqual(calls[5][2].name, 'in') | |
464 | self.assertIs(calls[6][0], port_removed_listener) | |
465 | self.assertEqual(calls[6][1].name, 'in') | |
466 | self.assertIs(calls[7][0], ports_disconnected_listener) | |
467 | self.assertEqual(calls[7][1].name, 'src') | |
468 | self.assertEqual(calls[7][2].name, 'sink') | |
469 | self.assertEqual(calls[7][3].name, 'out') | |
470 | self.assertEqual(calls[7][4].name, 'in') | |
471 | del calls |