Commit | Line | Data |
---|---|---|
f6a5e476 PP |
1 | from bt2 import values |
2 | import collections | |
3 | import unittest | |
4 | import copy | |
5 | import bt2 | |
6 | ||
7 | ||
8 | class GraphTestCase(unittest.TestCase): | |
9 | def setUp(self): | |
10 | self._graph = bt2.Graph() | |
11 | ||
12 | def tearDown(self): | |
13 | del self._graph | |
14 | ||
15 | def test_create_empty(self): | |
16 | graph = bt2.Graph() | |
17 | ||
18 | def test_add_component_user_cls(self): | |
19 | class MySink(bt2._UserSinkComponent): | |
20 | def _consume(self): | |
21 | pass | |
22 | ||
23 | comp = self._graph.add_component(MySink, 'salut') | |
24 | self.assertEqual(comp.name, 'salut') | |
25 | ||
26 | def test_add_component_gen_cls(self): | |
27 | class MySink(bt2._UserSinkComponent): | |
28 | def _consume(self): | |
29 | pass | |
30 | ||
31 | comp = self._graph.add_component(MySink, 'salut') | |
32 | assert(comp) | |
33 | comp2 = self._graph.add_component(comp.component_class, 'salut2') | |
34 | self.assertEqual(comp2.name, 'salut2') | |
35 | ||
36 | def test_add_component_params(self): | |
37 | comp_params = None | |
38 | ||
39 | class MySink(bt2._UserSinkComponent): | |
40 | def __init__(self, params): | |
41 | nonlocal comp_params | |
42 | comp_params = params | |
43 | ||
44 | def _consume(self): | |
45 | pass | |
46 | ||
47 | params = {'hello': 23, 'path': '/path/to/stuff'} | |
48 | comp = self._graph.add_component(MySink, 'salut', params) | |
49 | self.assertEqual(params, comp_params) | |
50 | del comp_params | |
51 | ||
52 | def test_add_component_invalid_cls_type(self): | |
53 | with self.assertRaises(TypeError): | |
54 | self._graph.add_component(int, 'salut') | |
55 | ||
56 | def test_connect_ports(self): | |
57 | class MyIter(bt2._UserNotificationIterator): | |
58 | def __next__(self): | |
59 | raise bt2.Stop | |
60 | ||
61 | class MySource(bt2._UserSourceComponent, | |
62 | notification_iterator_class=MyIter): | |
63 | def __init__(self, params): | |
64 | self._add_output_port('out') | |
65 | ||
66 | class MySink(bt2._UserSinkComponent): | |
67 | def __init__(self, params): | |
68 | self._add_input_port('in') | |
69 | ||
70 | def _consume(self): | |
71 | raise bt2.Stop | |
72 | ||
73 | src = self._graph.add_component(MySource, 'src') | |
74 | sink = self._graph.add_component(MySink, 'sink') | |
75 | conn = self._graph.connect_ports(src.output_ports['out'], | |
76 | sink.input_ports['in']) | |
77 | self.assertTrue(src.output_ports['out'].is_connected) | |
78 | self.assertTrue(sink.input_ports['in'].is_connected) | |
79 | self.assertEqual(src.output_ports['out'].connection, conn) | |
80 | self.assertEqual(sink.input_ports['in'].connection, conn) | |
81 | ||
82 | def test_connect_ports_invalid_direction(self): | |
83 | class MyIter(bt2._UserNotificationIterator): | |
84 | def __next__(self): | |
85 | raise bt2.Stop | |
86 | ||
87 | class MySource(bt2._UserSourceComponent, | |
88 | notification_iterator_class=MyIter): | |
89 | def __init__(self, params): | |
90 | self._add_output_port('out') | |
91 | ||
92 | class MySink(bt2._UserSinkComponent): | |
93 | def __init__(self, params): | |
94 | self._add_input_port('in') | |
95 | ||
96 | def _consume(self): | |
97 | raise bt2.Stop | |
98 | ||
99 | src = self._graph.add_component(MySource, 'src') | |
100 | sink = self._graph.add_component(MySink, 'sink') | |
101 | ||
102 | with self.assertRaises(TypeError): | |
103 | conn = self._graph.connect_ports(sink.input_ports['in'], | |
104 | src.output_ports['out']) | |
105 | ||
106 | def test_connect_ports_refused(self): | |
107 | class MyIter(bt2._UserNotificationIterator): | |
108 | def __next__(self): | |
109 | raise bt2.Stop | |
110 | ||
111 | class MySource(bt2._UserSourceComponent, | |
112 | notification_iterator_class=MyIter): | |
113 | def __init__(self, params): | |
114 | self._add_output_port('out') | |
115 | ||
116 | class MySink(bt2._UserSinkComponent): | |
117 | def __init__(self, params): | |
118 | self._add_input_port('in') | |
119 | ||
120 | def _consume(self): | |
121 | raise bt2.Stop | |
122 | ||
123 | def _accept_port_connection(self, port, other_port): | |
124 | return False | |
125 | ||
126 | src = self._graph.add_component(MySource, 'src') | |
127 | sink = self._graph.add_component(MySink, 'sink') | |
128 | ||
129 | with self.assertRaises(bt2.PortConnectionRefused): | |
130 | conn = self._graph.connect_ports(src.output_ports['out'], | |
131 | sink.input_ports['in']) | |
132 | ||
133 | def test_connect_ports_canceled(self): | |
134 | class MyIter(bt2._UserNotificationIterator): | |
135 | def __next__(self): | |
136 | raise bt2.Stop | |
137 | ||
138 | class MySource(bt2._UserSourceComponent, | |
139 | notification_iterator_class=MyIter): | |
140 | def __init__(self, params): | |
141 | self._add_output_port('out') | |
142 | ||
143 | class MySink(bt2._UserSinkComponent): | |
144 | def __init__(self, params): | |
145 | self._add_input_port('in') | |
146 | ||
147 | def _consume(self): | |
148 | raise bt2.Stop | |
149 | ||
150 | src = self._graph.add_component(MySource, 'src') | |
151 | sink = self._graph.add_component(MySink, 'sink') | |
152 | self._graph.cancel() | |
153 | ||
154 | with self.assertRaises(bt2.GraphCanceled): | |
155 | conn = self._graph.connect_ports(src.output_ports['out'], | |
156 | sink.input_ports['in']) | |
157 | ||
9ef22b36 PP |
158 | def test_connect_ports_cannot_consume_accept(self): |
159 | class MyIter(bt2._UserNotificationIterator): | |
160 | def __next__(self): | |
161 | raise bt2.Stop | |
162 | ||
163 | class MySource(bt2._UserSourceComponent, | |
164 | notification_iterator_class=MyIter): | |
165 | def __init__(self, params): | |
166 | self._add_output_port('out') | |
167 | ||
168 | class MySink(bt2._UserSinkComponent): | |
169 | def __init__(self, params): | |
170 | self._add_input_port('in') | |
171 | ||
172 | def _consume(self): | |
173 | raise bt2.Stop | |
174 | ||
175 | def _accept_port_connection(self, port, other_port): | |
176 | nonlocal exc | |
177 | ||
178 | try: | |
179 | self.graph.run() | |
180 | except Exception as e: | |
181 | exc = e | |
182 | ||
183 | return True | |
184 | ||
185 | exc = None | |
186 | src = self._graph.add_component(MySource, 'src') | |
187 | sink = self._graph.add_component(MySink, 'sink') | |
188 | self._graph.connect_ports(src.output_ports['out'], | |
189 | sink.input_ports['in']) | |
190 | self.assertIs(type(exc), bt2.CannotConsumeGraph) | |
191 | ||
192 | def test_connect_ports_cannot_consume_connected(self): | |
193 | class MyIter(bt2._UserNotificationIterator): | |
194 | def __next__(self): | |
195 | raise bt2.Stop | |
196 | ||
197 | class MySource(bt2._UserSourceComponent, | |
198 | notification_iterator_class=MyIter): | |
199 | def __init__(self, params): | |
200 | self._add_output_port('out') | |
201 | ||
202 | class MySink(bt2._UserSinkComponent): | |
203 | def __init__(self, params): | |
204 | self._add_input_port('in') | |
205 | ||
206 | def _consume(self): | |
207 | raise bt2.Stop | |
208 | ||
209 | def _port_connected(self, port, other_port): | |
210 | nonlocal exc | |
211 | ||
212 | try: | |
213 | self.graph.run() | |
214 | except Exception as e: | |
215 | exc = e | |
216 | ||
217 | return True | |
218 | ||
219 | exc = None | |
220 | src = self._graph.add_component(MySource, 'src') | |
221 | sink = self._graph.add_component(MySink, 'sink') | |
222 | self._graph.connect_ports(src.output_ports['out'], | |
223 | sink.input_ports['in']) | |
224 | self._graph.run() | |
225 | self.assertIs(type(exc), bt2.CannotConsumeGraph) | |
226 | ||
f6a5e476 PP |
227 | def test_cancel(self): |
228 | self.assertFalse(self._graph.is_canceled) | |
229 | self._graph.cancel() | |
230 | self.assertTrue(self._graph.is_canceled) | |
231 | ||
232 | def test_run(self): | |
233 | class MyIter(bt2._UserNotificationIterator): | |
234 | def __init__(self): | |
235 | self._build_meta() | |
236 | self._at = 0 | |
237 | ||
238 | def _build_meta(self): | |
239 | self._trace = bt2.Trace() | |
240 | self._sc = bt2.StreamClass() | |
241 | self._ec = bt2.EventClass('salut') | |
242 | self._my_int_ft = bt2.IntegerFieldType(32) | |
243 | self._ec.payload_field_type = bt2.StructureFieldType() | |
244 | self._ec.payload_field_type += collections.OrderedDict([ | |
245 | ('my_int', self._my_int_ft), | |
246 | ]) | |
247 | self._sc.add_event_class(self._ec) | |
248 | self._trace.add_stream_class(self._sc) | |
249 | self._stream = self._sc() | |
250 | self._packet = self._stream.create_packet() | |
251 | ||
252 | def _create_event(self, value): | |
253 | ev = self._ec() | |
254 | ev.payload_field['my_int'] = value | |
255 | ev.packet = self._packet | |
256 | return ev | |
257 | ||
258 | def __next__(self): | |
259 | if self._at == 5: | |
260 | raise bt2.Stop | |
261 | ||
262 | notif = bt2.EventNotification(self._create_event(self._at * 3)) | |
263 | self._at += 1 | |
264 | return notif | |
265 | ||
266 | class MySource(bt2._UserSourceComponent, | |
267 | notification_iterator_class=MyIter): | |
268 | def __init__(self, params): | |
269 | self._add_output_port('out') | |
270 | ||
271 | class MySink(bt2._UserSinkComponent): | |
272 | def __init__(self, params): | |
273 | self._add_input_port('in') | |
274 | self._at = 0 | |
275 | ||
276 | def _consume(comp_self): | |
277 | notif = next(comp_self._notif_iter) | |
278 | ||
279 | if comp_self._at == 0: | |
280 | self.assertIsInstance(notif, bt2.StreamBeginningNotification) | |
281 | elif comp_self._at == 1: | |
282 | self.assertIsInstance(notif, bt2.PacketBeginningNotification) | |
283 | elif comp_self._at >= 2 and comp_self._at <= 6: | |
284 | self.assertIsInstance(notif, bt2.EventNotification) | |
285 | self.assertEqual(notif.event.event_class.name, 'salut') | |
286 | field = notif.event.payload_field['my_int'] | |
287 | self.assertEqual(field, (comp_self._at - 2) * 3) | |
288 | elif comp_self._at == 7: | |
289 | self.assertIsInstance(notif, bt2.PacketEndNotification) | |
290 | elif comp_self._at == 8: | |
291 | self.assertIsInstance(notif, bt2.StreamEndNotification) | |
292 | ||
293 | comp_self._at += 1 | |
294 | ||
295 | def _port_connected(self, port, other_port): | |
296 | self._notif_iter = port.connection.create_notification_iterator() | |
297 | ||
298 | src = self._graph.add_component(MySource, 'src') | |
299 | sink = self._graph.add_component(MySink, 'sink') | |
300 | conn = self._graph.connect_ports(src.output_ports['out'], | |
301 | sink.input_ports['in']) | |
302 | self._graph.run() | |
303 | ||
304 | def test_run_again(self): | |
305 | class MyIter(bt2._UserNotificationIterator): | |
306 | def __init__(self): | |
307 | self._build_meta() | |
308 | self._at = 0 | |
309 | ||
310 | def _build_meta(self): | |
311 | self._trace = bt2.Trace() | |
312 | self._sc = bt2.StreamClass() | |
313 | self._ec = bt2.EventClass('salut') | |
314 | self._my_int_ft = bt2.IntegerFieldType(32) | |
315 | self._ec.payload_field_type = bt2.StructureFieldType() | |
316 | self._ec.payload_field_type += collections.OrderedDict([ | |
317 | ('my_int', self._my_int_ft), | |
318 | ]) | |
319 | self._sc.add_event_class(self._ec) | |
320 | self._trace.add_stream_class(self._sc) | |
321 | self._stream = self._sc() | |
322 | self._packet = self._stream.create_packet() | |
323 | ||
324 | def _create_event(self, value): | |
325 | ev = self._ec() | |
326 | ev.payload_field['my_int'] = value | |
327 | ev.packet = self._packet | |
328 | return ev | |
329 | ||
330 | def __next__(self): | |
331 | if self._at == 1: | |
332 | raise bt2.TryAgain | |
333 | ||
334 | notif = bt2.EventNotification(self._create_event(self._at * 3)) | |
335 | self._at += 1 | |
336 | return notif | |
337 | ||
338 | class MySource(bt2._UserSourceComponent, | |
339 | notification_iterator_class=MyIter): | |
340 | def __init__(self, params): | |
341 | self._add_output_port('out') | |
342 | ||
343 | class MySink(bt2._UserSinkComponent): | |
344 | def __init__(self, params): | |
345 | self._add_input_port('in') | |
346 | self._at = 0 | |
347 | ||
348 | def _consume(comp_self): | |
349 | if comp_self._at == 0: | |
350 | notif = next(comp_self._notif_iter) | |
351 | self.assertIsInstance(notif, bt2.EventNotification) | |
352 | elif comp_self._at == 1: | |
353 | with self.assertRaises(bt2.TryAgain): | |
354 | notif = next(comp_self._notif_iter) | |
355 | ||
356 | raise bt2.TryAgain | |
357 | ||
358 | comp_self._at += 1 | |
359 | ||
360 | def _port_connected(self, port, other_port): | |
361 | types = [bt2.EventNotification] | |
362 | self._notif_iter = port.connection.create_notification_iterator(types) | |
363 | ||
364 | src = self._graph.add_component(MySource, 'src') | |
365 | sink = self._graph.add_component(MySink, 'sink') | |
366 | conn = self._graph.connect_ports(src.output_ports['out'], | |
367 | sink.input_ports['in']) | |
368 | ||
369 | with self.assertRaises(bt2.TryAgain): | |
370 | self._graph.run() | |
371 | ||
372 | def test_run_no_sink(self): | |
373 | class MyIter(bt2._UserNotificationIterator): | |
374 | pass | |
375 | ||
376 | class MySource(bt2._UserSourceComponent, | |
377 | notification_iterator_class=MyIter): | |
378 | def __init__(self, params): | |
379 | self._add_output_port('out') | |
380 | ||
381 | class MyFilter(bt2._UserFilterComponent, | |
382 | notification_iterator_class=MyIter): | |
383 | def __init__(self, params): | |
384 | self._add_output_port('out') | |
385 | self._add_input_port('in') | |
386 | ||
387 | src = self._graph.add_component(MySource, 'src') | |
388 | flt = self._graph.add_component(MyFilter, 'flt') | |
389 | conn = self._graph.connect_ports(src.output_ports['out'], | |
390 | flt.input_ports['in']) | |
391 | ||
392 | with self.assertRaises(bt2.NoSinkComponent): | |
393 | self._graph.run() | |
394 | ||
395 | def test_run_error(self): | |
396 | class MyIter(bt2._UserNotificationIterator): | |
397 | def __init__(self): | |
398 | self._build_meta() | |
399 | self._at = 0 | |
400 | ||
401 | def _build_meta(self): | |
402 | self._trace = bt2.Trace() | |
403 | self._sc = bt2.StreamClass() | |
404 | self._ec = bt2.EventClass('salut') | |
405 | self._my_int_ft = bt2.IntegerFieldType(32) | |
406 | self._ec.payload_field_type = bt2.StructureFieldType() | |
407 | self._ec.payload_field_type += collections.OrderedDict([ | |
408 | ('my_int', self._my_int_ft), | |
409 | ]) | |
410 | self._sc.add_event_class(self._ec) | |
411 | self._trace.add_stream_class(self._sc) | |
412 | self._stream = self._sc() | |
413 | self._packet = self._stream.create_packet() | |
414 | ||
415 | def _create_event(self, value): | |
416 | ev = self._ec() | |
417 | ev.payload_field['my_int'] = value | |
418 | ev.packet = self._packet | |
419 | return ev | |
420 | ||
421 | def __next__(self): | |
422 | if self._at == 1: | |
423 | raise bt2.TryAgain | |
424 | ||
425 | notif = bt2.EventNotification(self._create_event(self._at * 3)) | |
426 | self._at += 1 | |
427 | return notif | |
428 | ||
429 | class MySource(bt2._UserSourceComponent, | |
430 | notification_iterator_class=MyIter): | |
431 | def __init__(self, params): | |
432 | self._add_output_port('out') | |
433 | ||
434 | class MySink(bt2._UserSinkComponent): | |
435 | def __init__(self, params): | |
436 | self._add_input_port('in') | |
437 | self._at = 0 | |
438 | ||
439 | def _consume(comp_self): | |
440 | if comp_self._at == 0: | |
441 | notif = next(comp_self._notif_iter) | |
442 | self.assertIsInstance(notif, bt2.EventNotification) | |
443 | elif comp_self._at == 1: | |
444 | raise RuntimeError('error!') | |
445 | ||
446 | comp_self._at += 1 | |
447 | ||
448 | def _port_connected(self, port, other_port): | |
449 | types = [bt2.EventNotification] | |
450 | self._notif_iter = port.connection.create_notification_iterator(types) | |
451 | ||
452 | src = self._graph.add_component(MySource, 'src') | |
453 | sink = self._graph.add_component(MySink, 'sink') | |
454 | conn = self._graph.connect_ports(src.output_ports['out'], | |
455 | sink.input_ports['in']) | |
456 | ||
457 | with self.assertRaises(bt2.Error): | |
458 | self._graph.run() | |
459 | ||
9ef22b36 PP |
460 | def test_run_cannot_consume(self): |
461 | class MyIter(bt2._UserNotificationIterator): | |
462 | pass | |
463 | ||
464 | class MySource(bt2._UserSourceComponent, | |
465 | notification_iterator_class=MyIter): | |
466 | def __init__(self, params): | |
467 | self._add_output_port('out') | |
468 | ||
469 | class MySink(bt2._UserSinkComponent): | |
470 | def __init__(self, params): | |
471 | self._add_input_port('in') | |
472 | self._at = 0 | |
473 | ||
474 | def _consume(comp_self): | |
475 | nonlocal exc | |
476 | ||
477 | try: | |
478 | print('going in') | |
479 | comp_self.graph.run() | |
480 | print('going out') | |
481 | except Exception as e: | |
482 | exc = e | |
483 | ||
484 | raise bt2.Stop | |
485 | ||
486 | exc = None | |
487 | src = self._graph.add_component(MySource, 'src') | |
488 | sink = self._graph.add_component(MySink, 'sink') | |
489 | conn = self._graph.connect_ports(src.output_ports['out'], | |
490 | sink.input_ports['in']) | |
491 | self._graph.run() | |
492 | self.assertIs(type(exc), bt2.CannotConsumeGraph) | |
493 | ||
f6a5e476 PP |
494 | def test_listeners(self): |
495 | class MyIter(bt2._UserNotificationIterator): | |
496 | def __next__(self): | |
497 | raise bt2.Stop | |
498 | ||
499 | class MySource(bt2._UserSourceComponent, | |
500 | notification_iterator_class=MyIter): | |
501 | def __init__(self, params): | |
502 | self._add_output_port('out') | |
503 | self._add_output_port('zero') | |
504 | ||
505 | def _port_connected(self, port, other_port): | |
506 | self._output_ports['zero'].remove_from_component() | |
507 | ||
508 | class MySink(bt2._UserSinkComponent): | |
509 | def __init__(self, params): | |
510 | self._add_input_port('in') | |
511 | ||
512 | def _consume(self): | |
513 | raise bt2.Stop | |
514 | ||
515 | def _port_connected(self, port, other_port): | |
516 | self._add_input_port('taste') | |
517 | ||
518 | def _port_disconnected(self, port): | |
519 | port.remove_from_component() | |
520 | ||
521 | def port_added_listener(port): | |
522 | nonlocal calls | |
523 | calls.append((port_added_listener, port)) | |
524 | ||
525 | def port_removed_listener(port): | |
526 | nonlocal calls | |
527 | calls.append((port_removed_listener, port)) | |
528 | ||
529 | def ports_connected_listener(upstream_port, downstream_port): | |
530 | nonlocal calls | |
531 | calls.append((ports_connected_listener, upstream_port, | |
532 | downstream_port)) | |
533 | ||
534 | def ports_disconnected_listener(upstream_comp, downstream_comp, | |
535 | upstream_port, downstream_port): | |
536 | nonlocal calls | |
537 | calls.append((ports_disconnected_listener, upstream_comp, | |
538 | downstream_comp, upstream_port, downstream_port)) | |
539 | ||
540 | calls = [] | |
541 | self._graph.add_listener(bt2.GraphListenerType.PORT_ADDED, | |
542 | port_added_listener) | |
543 | self._graph.add_listener(bt2.GraphListenerType.PORT_REMOVED, | |
544 | port_removed_listener) | |
545 | self._graph.add_listener(bt2.GraphListenerType.PORTS_CONNECTED, | |
546 | ports_connected_listener) | |
547 | self._graph.add_listener(bt2.GraphListenerType.PORTS_DISCONNECTED, | |
548 | ports_disconnected_listener) | |
549 | src = self._graph.add_component(MySource, 'src') | |
550 | sink = self._graph.add_component(MySink, 'sink') | |
551 | self._graph.connect_ports(src.output_ports['out'], | |
552 | sink.input_ports['in']) | |
553 | sink.input_ports['in'].disconnect() | |
554 | self.assertIs(calls[0][0], port_added_listener) | |
555 | self.assertEqual(calls[0][1].name, 'out') | |
556 | self.assertIs(calls[1][0], port_added_listener) | |
557 | self.assertEqual(calls[1][1].name, 'zero') | |
558 | self.assertIs(calls[2][0], port_added_listener) | |
559 | self.assertEqual(calls[2][1].name, 'in') | |
560 | self.assertIs(calls[3][0], port_removed_listener) | |
561 | self.assertEqual(calls[3][1].name, 'zero') | |
562 | self.assertIs(calls[4][0], port_added_listener) | |
563 | self.assertEqual(calls[4][1].name, 'taste') | |
564 | self.assertIs(calls[5][0], ports_connected_listener) | |
565 | self.assertEqual(calls[5][1].name, 'out') | |
566 | self.assertEqual(calls[5][2].name, 'in') | |
567 | self.assertIs(calls[6][0], port_removed_listener) | |
568 | self.assertEqual(calls[6][1].name, 'in') | |
569 | self.assertIs(calls[7][0], ports_disconnected_listener) | |
570 | self.assertEqual(calls[7][1].name, 'src') | |
571 | self.assertEqual(calls[7][2].name, 'sink') | |
572 | self.assertEqual(calls[7][3].name, 'out') | |
573 | self.assertEqual(calls[7][4].name, 'in') | |
574 | del calls |