8 class GraphTestCase(unittest
.TestCase
):
10 self
._graph
= bt2
.Graph()
15 def test_create_empty(self
):
18 def test_add_component_user_cls(self
):
19 class MySink(bt2
._UserSinkComponent
):
23 comp
= self
._graph
.add_component(MySink
, 'salut')
24 self
.assertEqual(comp
.name
, 'salut')
26 def test_add_component_gen_cls(self
):
27 class MySink(bt2
._UserSinkComponent
):
31 comp
= self
._graph
.add_component(MySink
, 'salut')
33 comp2
= self
._graph
.add_component(comp
.component_class
, 'salut2')
34 self
.assertEqual(comp2
.name
, 'salut2')
36 def test_add_component_params(self
):
39 class MySink(bt2
._UserSinkComponent
):
40 def __init__(self
, params
):
47 params
= {'hello': 23, 'path': '/path/to/stuff'}
48 comp
= self
._graph
.add_component(MySink
, 'salut', params
)
49 self
.assertEqual(params
, comp_params
)
52 def test_add_component_invalid_cls_type(self
):
53 with self
.assertRaises(TypeError):
54 self
._graph
.add_component(int, 'salut')
56 def test_connect_ports(self
):
57 class MyIter(bt2
._UserNotificationIterator
):
61 class MySource(bt2
._UserSourceComponent
,
62 notification_iterator_class
=MyIter
):
63 def __init__(self
, params
):
64 self
._add
_output
_port
('out')
66 class MySink(bt2
._UserSinkComponent
):
67 def __init__(self
, params
):
68 self
._add
_input
_port
('in')
73 src
= self
._graph
.add_component(MySource
, 'src')
74 sink
= self
._graph
.add_component(MySink
, 'sink')
75 conn
= self
._graph
.connect_ports(src
.output_ports
['out'],
76 sink
.input_ports
['in'])
77 self
.assertTrue(src
.output_ports
['out'].is_connected
)
78 self
.assertTrue(sink
.input_ports
['in'].is_connected
)
79 self
.assertEqual(src
.output_ports
['out'].connection
, conn
)
80 self
.assertEqual(sink
.input_ports
['in'].connection
, conn
)
82 def test_connect_ports_invalid_direction(self
):
83 class MyIter(bt2
._UserNotificationIterator
):
87 class MySource(bt2
._UserSourceComponent
,
88 notification_iterator_class
=MyIter
):
89 def __init__(self
, params
):
90 self
._add
_output
_port
('out')
92 class MySink(bt2
._UserSinkComponent
):
93 def __init__(self
, params
):
94 self
._add
_input
_port
('in')
99 src
= self
._graph
.add_component(MySource
, 'src')
100 sink
= self
._graph
.add_component(MySink
, 'sink')
102 with self
.assertRaises(TypeError):
103 conn
= self
._graph
.connect_ports(sink
.input_ports
['in'],
104 src
.output_ports
['out'])
106 def test_connect_ports_refused(self
):
107 class MyIter(bt2
._UserNotificationIterator
):
111 class MySource(bt2
._UserSourceComponent
,
112 notification_iterator_class
=MyIter
):
113 def __init__(self
, params
):
114 self
._add
_output
_port
('out')
116 class MySink(bt2
._UserSinkComponent
):
117 def __init__(self
, params
):
118 self
._add
_input
_port
('in')
123 def _accept_port_connection(self
, port
, other_port
):
126 src
= self
._graph
.add_component(MySource
, 'src')
127 sink
= self
._graph
.add_component(MySink
, 'sink')
129 with self
.assertRaises(bt2
.PortConnectionRefused
):
130 conn
= self
._graph
.connect_ports(src
.output_ports
['out'],
131 sink
.input_ports
['in'])
133 def test_connect_ports_canceled(self
):
134 class MyIter(bt2
._UserNotificationIterator
):
138 class MySource(bt2
._UserSourceComponent
,
139 notification_iterator_class
=MyIter
):
140 def __init__(self
, params
):
141 self
._add
_output
_port
('out')
143 class MySink(bt2
._UserSinkComponent
):
144 def __init__(self
, params
):
145 self
._add
_input
_port
('in')
150 src
= self
._graph
.add_component(MySource
, 'src')
151 sink
= self
._graph
.add_component(MySink
, 'sink')
154 with self
.assertRaises(bt2
.GraphCanceled
):
155 conn
= self
._graph
.connect_ports(src
.output_ports
['out'],
156 sink
.input_ports
['in'])
158 def test_cancel(self
):
159 self
.assertFalse(self
._graph
.is_canceled
)
161 self
.assertTrue(self
._graph
.is_canceled
)
164 class MyIter(bt2
._UserNotificationIterator
):
169 def _build_meta(self
):
170 self
._trace
= bt2
.Trace()
171 self
._sc
= bt2
.StreamClass()
172 self
._ec
= bt2
.EventClass('salut')
173 self
._my
_int
_ft
= bt2
.IntegerFieldType(32)
174 self
._ec
.payload_field_type
= bt2
.StructureFieldType()
175 self
._ec
.payload_field_type
+= collections
.OrderedDict([
176 ('my_int', self
._my
_int
_ft
),
178 self
._sc
.add_event_class(self
._ec
)
179 self
._trace
.add_stream_class(self
._sc
)
180 self
._stream
= self
._sc
()
181 self
._packet
= self
._stream
.create_packet()
183 def _create_event(self
, value
):
185 ev
.payload_field
['my_int'] = value
186 ev
.packet
= self
._packet
193 notif
= bt2
.EventNotification(self
._create
_event
(self
._at
* 3))
197 class MySource(bt2
._UserSourceComponent
,
198 notification_iterator_class
=MyIter
):
199 def __init__(self
, params
):
200 self
._add
_output
_port
('out')
202 class MySink(bt2
._UserSinkComponent
):
203 def __init__(self
, params
):
204 self
._add
_input
_port
('in')
207 def _consume(comp_self
):
208 notif
= next(comp_self
._notif
_iter
)
210 if comp_self
._at
== 0:
211 self
.assertIsInstance(notif
, bt2
.StreamBeginningNotification
)
212 elif comp_self
._at
== 1:
213 self
.assertIsInstance(notif
, bt2
.PacketBeginningNotification
)
214 elif comp_self
._at
>= 2 and comp_self
._at
<= 6:
215 self
.assertIsInstance(notif
, bt2
.EventNotification
)
216 self
.assertEqual(notif
.event
.event_class
.name
, 'salut')
217 field
= notif
.event
.payload_field
['my_int']
218 self
.assertEqual(field
, (comp_self
._at
- 2) * 3)
219 elif comp_self
._at
== 7:
220 self
.assertIsInstance(notif
, bt2
.PacketEndNotification
)
221 elif comp_self
._at
== 8:
222 self
.assertIsInstance(notif
, bt2
.StreamEndNotification
)
226 def _port_connected(self
, port
, other_port
):
227 self
._notif
_iter
= port
.connection
.create_notification_iterator()
229 src
= self
._graph
.add_component(MySource
, 'src')
230 sink
= self
._graph
.add_component(MySink
, 'sink')
231 conn
= self
._graph
.connect_ports(src
.output_ports
['out'],
232 sink
.input_ports
['in'])
235 def test_run_again(self
):
236 class MyIter(bt2
._UserNotificationIterator
):
241 def _build_meta(self
):
242 self
._trace
= bt2
.Trace()
243 self
._sc
= bt2
.StreamClass()
244 self
._ec
= bt2
.EventClass('salut')
245 self
._my
_int
_ft
= bt2
.IntegerFieldType(32)
246 self
._ec
.payload_field_type
= bt2
.StructureFieldType()
247 self
._ec
.payload_field_type
+= collections
.OrderedDict([
248 ('my_int', self
._my
_int
_ft
),
250 self
._sc
.add_event_class(self
._ec
)
251 self
._trace
.add_stream_class(self
._sc
)
252 self
._stream
= self
._sc
()
253 self
._packet
= self
._stream
.create_packet()
255 def _create_event(self
, value
):
257 ev
.payload_field
['my_int'] = value
258 ev
.packet
= self
._packet
265 notif
= bt2
.EventNotification(self
._create
_event
(self
._at
* 3))
269 class MySource(bt2
._UserSourceComponent
,
270 notification_iterator_class
=MyIter
):
271 def __init__(self
, params
):
272 self
._add
_output
_port
('out')
274 class MySink(bt2
._UserSinkComponent
):
275 def __init__(self
, params
):
276 self
._add
_input
_port
('in')
279 def _consume(comp_self
):
280 if comp_self
._at
== 0:
281 notif
= next(comp_self
._notif
_iter
)
282 self
.assertIsInstance(notif
, bt2
.EventNotification
)
283 elif comp_self
._at
== 1:
284 with self
.assertRaises(bt2
.TryAgain
):
285 notif
= next(comp_self
._notif
_iter
)
291 def _port_connected(self
, port
, other_port
):
292 types
= [bt2
.EventNotification
]
293 self
._notif
_iter
= port
.connection
.create_notification_iterator(types
)
295 src
= self
._graph
.add_component(MySource
, 'src')
296 sink
= self
._graph
.add_component(MySink
, 'sink')
297 conn
= self
._graph
.connect_ports(src
.output_ports
['out'],
298 sink
.input_ports
['in'])
300 with self
.assertRaises(bt2
.TryAgain
):
303 def test_run_no_sink(self
):
304 class MyIter(bt2
._UserNotificationIterator
):
307 class MySource(bt2
._UserSourceComponent
,
308 notification_iterator_class
=MyIter
):
309 def __init__(self
, params
):
310 self
._add
_output
_port
('out')
312 class MyFilter(bt2
._UserFilterComponent
,
313 notification_iterator_class
=MyIter
):
314 def __init__(self
, params
):
315 self
._add
_output
_port
('out')
316 self
._add
_input
_port
('in')
318 src
= self
._graph
.add_component(MySource
, 'src')
319 flt
= self
._graph
.add_component(MyFilter
, 'flt')
320 conn
= self
._graph
.connect_ports(src
.output_ports
['out'],
321 flt
.input_ports
['in'])
323 with self
.assertRaises(bt2
.NoSinkComponent
):
326 def test_run_error(self
):
327 class MyIter(bt2
._UserNotificationIterator
):
332 def _build_meta(self
):
333 self
._trace
= bt2
.Trace()
334 self
._sc
= bt2
.StreamClass()
335 self
._ec
= bt2
.EventClass('salut')
336 self
._my
_int
_ft
= bt2
.IntegerFieldType(32)
337 self
._ec
.payload_field_type
= bt2
.StructureFieldType()
338 self
._ec
.payload_field_type
+= collections
.OrderedDict([
339 ('my_int', self
._my
_int
_ft
),
341 self
._sc
.add_event_class(self
._ec
)
342 self
._trace
.add_stream_class(self
._sc
)
343 self
._stream
= self
._sc
()
344 self
._packet
= self
._stream
.create_packet()
346 def _create_event(self
, value
):
348 ev
.payload_field
['my_int'] = value
349 ev
.packet
= self
._packet
356 notif
= bt2
.EventNotification(self
._create
_event
(self
._at
* 3))
360 class MySource(bt2
._UserSourceComponent
,
361 notification_iterator_class
=MyIter
):
362 def __init__(self
, params
):
363 self
._add
_output
_port
('out')
365 class MySink(bt2
._UserSinkComponent
):
366 def __init__(self
, params
):
367 self
._add
_input
_port
('in')
370 def _consume(comp_self
):
371 if comp_self
._at
== 0:
372 notif
= next(comp_self
._notif
_iter
)
373 self
.assertIsInstance(notif
, bt2
.EventNotification
)
374 elif comp_self
._at
== 1:
375 raise RuntimeError('error!')
379 def _port_connected(self
, port
, other_port
):
380 types
= [bt2
.EventNotification
]
381 self
._notif
_iter
= port
.connection
.create_notification_iterator(types
)
383 src
= self
._graph
.add_component(MySource
, 'src')
384 sink
= self
._graph
.add_component(MySink
, 'sink')
385 conn
= self
._graph
.connect_ports(src
.output_ports
['out'],
386 sink
.input_ports
['in'])
388 with self
.assertRaises(bt2
.Error
):
391 def test_listeners(self
):
392 class MyIter(bt2
._UserNotificationIterator
):
396 class MySource(bt2
._UserSourceComponent
,
397 notification_iterator_class
=MyIter
):
398 def __init__(self
, params
):
399 self
._add
_output
_port
('out')
400 self
._add
_output
_port
('zero')
402 def _port_connected(self
, port
, other_port
):
403 self
._output
_ports
['zero'].remove_from_component()
405 class MySink(bt2
._UserSinkComponent
):
406 def __init__(self
, params
):
407 self
._add
_input
_port
('in')
412 def _port_connected(self
, port
, other_port
):
413 self
._add
_input
_port
('taste')
415 def _port_disconnected(self
, port
):
416 port
.remove_from_component()
418 def port_added_listener(port
):
420 calls
.append((port_added_listener
, port
))
422 def port_removed_listener(port
):
424 calls
.append((port_removed_listener
, port
))
426 def ports_connected_listener(upstream_port
, downstream_port
):
428 calls
.append((ports_connected_listener
, upstream_port
,
431 def ports_disconnected_listener(upstream_comp
, downstream_comp
,
432 upstream_port
, downstream_port
):
434 calls
.append((ports_disconnected_listener
, upstream_comp
,
435 downstream_comp
, upstream_port
, downstream_port
))
438 self
._graph
.add_listener(bt2
.GraphListenerType
.PORT_ADDED
,
440 self
._graph
.add_listener(bt2
.GraphListenerType
.PORT_REMOVED
,
441 port_removed_listener
)
442 self
._graph
.add_listener(bt2
.GraphListenerType
.PORTS_CONNECTED
,
443 ports_connected_listener
)
444 self
._graph
.add_listener(bt2
.GraphListenerType
.PORTS_DISCONNECTED
,
445 ports_disconnected_listener
)
446 src
= self
._graph
.add_component(MySource
, 'src')
447 sink
= self
._graph
.add_component(MySink
, 'sink')
448 self
._graph
.connect_ports(src
.output_ports
['out'],
449 sink
.input_ports
['in'])
450 sink
.input_ports
['in'].disconnect()
451 self
.assertIs(calls
[0][0], port_added_listener
)
452 self
.assertEqual(calls
[0][1].name
, 'out')
453 self
.assertIs(calls
[1][0], port_added_listener
)
454 self
.assertEqual(calls
[1][1].name
, 'zero')
455 self
.assertIs(calls
[2][0], port_added_listener
)
456 self
.assertEqual(calls
[2][1].name
, 'in')
457 self
.assertIs(calls
[3][0], port_removed_listener
)
458 self
.assertEqual(calls
[3][1].name
, 'zero')
459 self
.assertIs(calls
[4][0], port_added_listener
)
460 self
.assertEqual(calls
[4][1].name
, 'taste')
461 self
.assertIs(calls
[5][0], ports_connected_listener
)
462 self
.assertEqual(calls
[5][1].name
, 'out')
463 self
.assertEqual(calls
[5][2].name
, 'in')
464 self
.assertIs(calls
[6][0], port_removed_listener
)
465 self
.assertEqual(calls
[6][1].name
, 'in')
466 self
.assertIs(calls
[7][0], ports_disconnected_listener
)
467 self
.assertEqual(calls
[7][1].name
, 'src')
468 self
.assertEqual(calls
[7][2].name
, 'sink')
469 self
.assertEqual(calls
[7][3].name
, 'out')
470 self
.assertEqual(calls
[7][4].name
, 'in')