8 class GraphTestCase(unittest
.TestCase
):
10 self
._graph
= bt2
.Graph()
15 def test_create_empty(self
):
18 def test_add_component_user_cls(self
):
19 class MySink(bt2
._UserSinkComponent
):
23 comp
= self
._graph
.add_component(MySink
, 'salut')
24 self
.assertEqual(comp
.name
, 'salut')
26 def test_add_component_gen_cls(self
):
27 class MySink(bt2
._UserSinkComponent
):
31 comp
= self
._graph
.add_component(MySink
, 'salut')
33 comp2
= self
._graph
.add_component(comp
.component_class
, 'salut2')
34 self
.assertEqual(comp2
.name
, 'salut2')
36 def test_add_component_params(self
):
39 class MySink(bt2
._UserSinkComponent
):
40 def __init__(self
, params
):
47 params
= {'hello': 23, 'path': '/path/to/stuff'}
48 comp
= self
._graph
.add_component(MySink
, 'salut', params
)
49 self
.assertEqual(params
, comp_params
)
52 def test_add_component_invalid_cls_type(self
):
53 with self
.assertRaises(TypeError):
54 self
._graph
.add_component(int, 'salut')
56 def test_connect_ports(self
):
57 class MyIter(bt2
._UserNotificationIterator
):
61 class MySource(bt2
._UserSourceComponent
,
62 notification_iterator_class
=MyIter
):
63 def __init__(self
, params
):
64 self
._add
_output
_port
('out')
66 class MySink(bt2
._UserSinkComponent
):
67 def __init__(self
, params
):
68 self
._add
_input
_port
('in')
73 src
= self
._graph
.add_component(MySource
, 'src')
74 sink
= self
._graph
.add_component(MySink
, 'sink')
75 conn
= self
._graph
.connect_ports(src
.output_ports
['out'],
76 sink
.input_ports
['in'])
77 self
.assertTrue(src
.output_ports
['out'].is_connected
)
78 self
.assertTrue(sink
.input_ports
['in'].is_connected
)
79 self
.assertEqual(src
.output_ports
['out'].connection
, conn
)
80 self
.assertEqual(sink
.input_ports
['in'].connection
, conn
)
82 def test_connect_ports_invalid_direction(self
):
83 class MyIter(bt2
._UserNotificationIterator
):
87 class MySource(bt2
._UserSourceComponent
,
88 notification_iterator_class
=MyIter
):
89 def __init__(self
, params
):
90 self
._add
_output
_port
('out')
92 class MySink(bt2
._UserSinkComponent
):
93 def __init__(self
, params
):
94 self
._add
_input
_port
('in')
99 src
= self
._graph
.add_component(MySource
, 'src')
100 sink
= self
._graph
.add_component(MySink
, 'sink')
102 with self
.assertRaises(TypeError):
103 conn
= self
._graph
.connect_ports(sink
.input_ports
['in'],
104 src
.output_ports
['out'])
106 def test_connect_ports_refused(self
):
107 class MyIter(bt2
._UserNotificationIterator
):
111 class MySource(bt2
._UserSourceComponent
,
112 notification_iterator_class
=MyIter
):
113 def __init__(self
, params
):
114 self
._add
_output
_port
('out')
116 class MySink(bt2
._UserSinkComponent
):
117 def __init__(self
, params
):
118 self
._add
_input
_port
('in')
123 def _accept_port_connection(self
, port
, other_port
):
126 src
= self
._graph
.add_component(MySource
, 'src')
127 sink
= self
._graph
.add_component(MySink
, 'sink')
129 with self
.assertRaises(bt2
.PortConnectionRefused
):
130 conn
= self
._graph
.connect_ports(src
.output_ports
['out'],
131 sink
.input_ports
['in'])
133 def test_connect_ports_canceled(self
):
134 class MyIter(bt2
._UserNotificationIterator
):
138 class MySource(bt2
._UserSourceComponent
,
139 notification_iterator_class
=MyIter
):
140 def __init__(self
, params
):
141 self
._add
_output
_port
('out')
143 class MySink(bt2
._UserSinkComponent
):
144 def __init__(self
, params
):
145 self
._add
_input
_port
('in')
150 src
= self
._graph
.add_component(MySource
, 'src')
151 sink
= self
._graph
.add_component(MySink
, 'sink')
154 with self
.assertRaises(bt2
.GraphCanceled
):
155 conn
= self
._graph
.connect_ports(src
.output_ports
['out'],
156 sink
.input_ports
['in'])
158 def test_connect_ports_cannot_consume_accept(self
):
159 class MyIter(bt2
._UserNotificationIterator
):
163 class MySource(bt2
._UserSourceComponent
,
164 notification_iterator_class
=MyIter
):
165 def __init__(self
, params
):
166 self
._add
_output
_port
('out')
168 class MySink(bt2
._UserSinkComponent
):
169 def __init__(self
, params
):
170 self
._add
_input
_port
('in')
175 def _accept_port_connection(self
, port
, other_port
):
180 except Exception as e
:
186 src
= self
._graph
.add_component(MySource
, 'src')
187 sink
= self
._graph
.add_component(MySink
, 'sink')
188 self
._graph
.connect_ports(src
.output_ports
['out'],
189 sink
.input_ports
['in'])
190 self
.assertIs(type(exc
), bt2
.CannotConsumeGraph
)
192 def test_connect_ports_cannot_consume_connected(self
):
193 class MyIter(bt2
._UserNotificationIterator
):
197 class MySource(bt2
._UserSourceComponent
,
198 notification_iterator_class
=MyIter
):
199 def __init__(self
, params
):
200 self
._add
_output
_port
('out')
202 class MySink(bt2
._UserSinkComponent
):
203 def __init__(self
, params
):
204 self
._add
_input
_port
('in')
209 def _port_connected(self
, port
, other_port
):
214 except Exception as e
:
220 src
= self
._graph
.add_component(MySource
, 'src')
221 sink
= self
._graph
.add_component(MySink
, 'sink')
222 self
._graph
.connect_ports(src
.output_ports
['out'],
223 sink
.input_ports
['in'])
225 self
.assertIs(type(exc
), bt2
.CannotConsumeGraph
)
227 def test_cancel(self
):
228 self
.assertFalse(self
._graph
.is_canceled
)
230 self
.assertTrue(self
._graph
.is_canceled
)
233 class MyIter(bt2
._UserNotificationIterator
):
238 def _build_meta(self
):
239 self
._trace
= bt2
.Trace()
240 self
._sc
= bt2
.StreamClass()
241 self
._ec
= bt2
.EventClass('salut')
242 self
._my
_int
_ft
= bt2
.IntegerFieldType(32)
243 self
._ec
.payload_field_type
= bt2
.StructureFieldType()
244 self
._ec
.payload_field_type
+= collections
.OrderedDict([
245 ('my_int', self
._my
_int
_ft
),
247 self
._sc
.add_event_class(self
._ec
)
248 self
._trace
.add_stream_class(self
._sc
)
249 self
._stream
= self
._sc
()
250 self
._packet
= self
._stream
.create_packet()
252 def _create_event(self
, value
):
254 ev
.payload_field
['my_int'] = value
255 ev
.packet
= self
._packet
262 notif
= bt2
.EventNotification(self
._create
_event
(self
._at
* 3))
266 class MySource(bt2
._UserSourceComponent
,
267 notification_iterator_class
=MyIter
):
268 def __init__(self
, params
):
269 self
._add
_output
_port
('out')
271 class MySink(bt2
._UserSinkComponent
):
272 def __init__(self
, params
):
273 self
._add
_input
_port
('in')
276 def _consume(comp_self
):
277 notif
= next(comp_self
._notif
_iter
)
279 if comp_self
._at
== 0:
280 self
.assertIsInstance(notif
, bt2
.StreamBeginningNotification
)
281 elif comp_self
._at
== 1:
282 self
.assertIsInstance(notif
, bt2
.PacketBeginningNotification
)
283 elif comp_self
._at
>= 2 and comp_self
._at
<= 6:
284 self
.assertIsInstance(notif
, bt2
.EventNotification
)
285 self
.assertEqual(notif
.event
.event_class
.name
, 'salut')
286 field
= notif
.event
.payload_field
['my_int']
287 self
.assertEqual(field
, (comp_self
._at
- 2) * 3)
288 elif comp_self
._at
== 7:
289 self
.assertIsInstance(notif
, bt2
.PacketEndNotification
)
290 elif comp_self
._at
== 8:
291 self
.assertIsInstance(notif
, bt2
.StreamEndNotification
)
295 def _port_connected(self
, port
, other_port
):
296 self
._notif
_iter
= port
.connection
.create_notification_iterator()
298 src
= self
._graph
.add_component(MySource
, 'src')
299 sink
= self
._graph
.add_component(MySink
, 'sink')
300 conn
= self
._graph
.connect_ports(src
.output_ports
['out'],
301 sink
.input_ports
['in'])
304 def test_run_again(self
):
305 class MyIter(bt2
._UserNotificationIterator
):
310 def _build_meta(self
):
311 self
._trace
= bt2
.Trace()
312 self
._sc
= bt2
.StreamClass()
313 self
._ec
= bt2
.EventClass('salut')
314 self
._my
_int
_ft
= bt2
.IntegerFieldType(32)
315 self
._ec
.payload_field_type
= bt2
.StructureFieldType()
316 self
._ec
.payload_field_type
+= collections
.OrderedDict([
317 ('my_int', self
._my
_int
_ft
),
319 self
._sc
.add_event_class(self
._ec
)
320 self
._trace
.add_stream_class(self
._sc
)
321 self
._stream
= self
._sc
()
322 self
._packet
= self
._stream
.create_packet()
324 def _create_event(self
, value
):
326 ev
.payload_field
['my_int'] = value
327 ev
.packet
= self
._packet
334 notif
= bt2
.EventNotification(self
._create
_event
(self
._at
* 3))
338 class MySource(bt2
._UserSourceComponent
,
339 notification_iterator_class
=MyIter
):
340 def __init__(self
, params
):
341 self
._add
_output
_port
('out')
343 class MySink(bt2
._UserSinkComponent
):
344 def __init__(self
, params
):
345 self
._add
_input
_port
('in')
348 def _consume(comp_self
):
349 if comp_self
._at
== 0:
350 notif
= next(comp_self
._notif
_iter
)
351 self
.assertIsInstance(notif
, bt2
.EventNotification
)
352 elif comp_self
._at
== 1:
353 with self
.assertRaises(bt2
.TryAgain
):
354 notif
= next(comp_self
._notif
_iter
)
360 def _port_connected(self
, port
, other_port
):
361 types
= [bt2
.EventNotification
]
362 self
._notif
_iter
= port
.connection
.create_notification_iterator(types
)
364 src
= self
._graph
.add_component(MySource
, 'src')
365 sink
= self
._graph
.add_component(MySink
, 'sink')
366 conn
= self
._graph
.connect_ports(src
.output_ports
['out'],
367 sink
.input_ports
['in'])
369 with self
.assertRaises(bt2
.TryAgain
):
372 def test_run_no_sink(self
):
373 class MyIter(bt2
._UserNotificationIterator
):
376 class MySource(bt2
._UserSourceComponent
,
377 notification_iterator_class
=MyIter
):
378 def __init__(self
, params
):
379 self
._add
_output
_port
('out')
381 class MyFilter(bt2
._UserFilterComponent
,
382 notification_iterator_class
=MyIter
):
383 def __init__(self
, params
):
384 self
._add
_output
_port
('out')
385 self
._add
_input
_port
('in')
387 src
= self
._graph
.add_component(MySource
, 'src')
388 flt
= self
._graph
.add_component(MyFilter
, 'flt')
389 conn
= self
._graph
.connect_ports(src
.output_ports
['out'],
390 flt
.input_ports
['in'])
392 with self
.assertRaises(bt2
.NoSinkComponent
):
395 def test_run_error(self
):
396 class MyIter(bt2
._UserNotificationIterator
):
401 def _build_meta(self
):
402 self
._trace
= bt2
.Trace()
403 self
._sc
= bt2
.StreamClass()
404 self
._ec
= bt2
.EventClass('salut')
405 self
._my
_int
_ft
= bt2
.IntegerFieldType(32)
406 self
._ec
.payload_field_type
= bt2
.StructureFieldType()
407 self
._ec
.payload_field_type
+= collections
.OrderedDict([
408 ('my_int', self
._my
_int
_ft
),
410 self
._sc
.add_event_class(self
._ec
)
411 self
._trace
.add_stream_class(self
._sc
)
412 self
._stream
= self
._sc
()
413 self
._packet
= self
._stream
.create_packet()
415 def _create_event(self
, value
):
417 ev
.payload_field
['my_int'] = value
418 ev
.packet
= self
._packet
425 notif
= bt2
.EventNotification(self
._create
_event
(self
._at
* 3))
429 class MySource(bt2
._UserSourceComponent
,
430 notification_iterator_class
=MyIter
):
431 def __init__(self
, params
):
432 self
._add
_output
_port
('out')
434 class MySink(bt2
._UserSinkComponent
):
435 def __init__(self
, params
):
436 self
._add
_input
_port
('in')
439 def _consume(comp_self
):
440 if comp_self
._at
== 0:
441 notif
= next(comp_self
._notif
_iter
)
442 self
.assertIsInstance(notif
, bt2
.EventNotification
)
443 elif comp_self
._at
== 1:
444 raise RuntimeError('error!')
448 def _port_connected(self
, port
, other_port
):
449 types
= [bt2
.EventNotification
]
450 self
._notif
_iter
= port
.connection
.create_notification_iterator(types
)
452 src
= self
._graph
.add_component(MySource
, 'src')
453 sink
= self
._graph
.add_component(MySink
, 'sink')
454 conn
= self
._graph
.connect_ports(src
.output_ports
['out'],
455 sink
.input_ports
['in'])
457 with self
.assertRaises(bt2
.Error
):
460 def test_run_cannot_consume(self
):
461 class MyIter(bt2
._UserNotificationIterator
):
464 class MySource(bt2
._UserSourceComponent
,
465 notification_iterator_class
=MyIter
):
466 def __init__(self
, params
):
467 self
._add
_output
_port
('out')
469 class MySink(bt2
._UserSinkComponent
):
470 def __init__(self
, params
):
471 self
._add
_input
_port
('in')
474 def _consume(comp_self
):
479 comp_self
.graph
.run()
481 except Exception as e
:
487 src
= self
._graph
.add_component(MySource
, 'src')
488 sink
= self
._graph
.add_component(MySink
, 'sink')
489 conn
= self
._graph
.connect_ports(src
.output_ports
['out'],
490 sink
.input_ports
['in'])
492 self
.assertIs(type(exc
), bt2
.CannotConsumeGraph
)
494 def test_listeners(self
):
495 class MyIter(bt2
._UserNotificationIterator
):
499 class MySource(bt2
._UserSourceComponent
,
500 notification_iterator_class
=MyIter
):
501 def __init__(self
, params
):
502 self
._add
_output
_port
('out')
503 self
._add
_output
_port
('zero')
505 def _port_connected(self
, port
, other_port
):
506 self
._output
_ports
['zero'].remove_from_component()
508 class MySink(bt2
._UserSinkComponent
):
509 def __init__(self
, params
):
510 self
._add
_input
_port
('in')
515 def _port_connected(self
, port
, other_port
):
516 self
._add
_input
_port
('taste')
518 def _port_disconnected(self
, port
):
519 port
.remove_from_component()
521 def port_added_listener(port
):
523 calls
.append((port_added_listener
, port
))
525 def port_removed_listener(port
):
527 calls
.append((port_removed_listener
, port
))
529 def ports_connected_listener(upstream_port
, downstream_port
):
531 calls
.append((ports_connected_listener
, upstream_port
,
534 def ports_disconnected_listener(upstream_comp
, downstream_comp
,
535 upstream_port
, downstream_port
):
537 calls
.append((ports_disconnected_listener
, upstream_comp
,
538 downstream_comp
, upstream_port
, downstream_port
))
541 self
._graph
.add_listener(bt2
.GraphListenerType
.PORT_ADDED
,
543 self
._graph
.add_listener(bt2
.GraphListenerType
.PORT_REMOVED
,
544 port_removed_listener
)
545 self
._graph
.add_listener(bt2
.GraphListenerType
.PORTS_CONNECTED
,
546 ports_connected_listener
)
547 self
._graph
.add_listener(bt2
.GraphListenerType
.PORTS_DISCONNECTED
,
548 ports_disconnected_listener
)
549 src
= self
._graph
.add_component(MySource
, 'src')
550 sink
= self
._graph
.add_component(MySink
, 'sink')
551 self
._graph
.connect_ports(src
.output_ports
['out'],
552 sink
.input_ports
['in'])
553 sink
.input_ports
['in'].disconnect()
554 self
.assertIs(calls
[0][0], port_added_listener
)
555 self
.assertEqual(calls
[0][1].name
, 'out')
556 self
.assertIs(calls
[1][0], port_added_listener
)
557 self
.assertEqual(calls
[1][1].name
, 'zero')
558 self
.assertIs(calls
[2][0], port_added_listener
)
559 self
.assertEqual(calls
[2][1].name
, 'in')
560 self
.assertIs(calls
[3][0], port_removed_listener
)
561 self
.assertEqual(calls
[3][1].name
, 'zero')
562 self
.assertIs(calls
[4][0], port_added_listener
)
563 self
.assertEqual(calls
[4][1].name
, 'taste')
564 self
.assertIs(calls
[5][0], ports_connected_listener
)
565 self
.assertEqual(calls
[5][1].name
, 'out')
566 self
.assertEqual(calls
[5][2].name
, 'in')
567 self
.assertIs(calls
[6][0], port_removed_listener
)
568 self
.assertEqual(calls
[6][1].name
, 'in')
569 self
.assertIs(calls
[7][0], ports_disconnected_listener
)
570 self
.assertEqual(calls
[7][1].name
, 'src')
571 self
.assertEqual(calls
[7][2].name
, 'sink')
572 self
.assertEqual(calls
[7][3].name
, 'out')
573 self
.assertEqual(calls
[7][4].name
, 'in')