bt2: remove unused imports
[babeltrace.git] / tests / bindings / python / bt2 / test_message_iterator.py
1 #
2 # Copyright (C) 2019 EfficiOS Inc.
3 #
4 # This program is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License
6 # as published by the Free Software Foundation; only version 2
7 # of the License.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 #
18
19 import unittest
20 import bt2
21 import sys
22 from utils import TestOutputPortMessageIterator
23
24
25 class UserMessageIteratorTestCase(unittest.TestCase):
26 @staticmethod
27 def _create_graph(src_comp_cls, flt_comp_cls=None):
28 class MySink(bt2._UserSinkComponent):
29 def __init__(self, params, obj):
30 self._add_input_port('in')
31
32 def _user_consume(self):
33 next(self._msg_iter)
34
35 def _user_graph_is_configured(self):
36 self._msg_iter = self._create_input_port_message_iterator(
37 self._input_ports['in']
38 )
39
40 graph = bt2.Graph()
41 src_comp = graph.add_component(src_comp_cls, 'src')
42
43 if flt_comp_cls is not None:
44 flt_comp = graph.add_component(flt_comp_cls, 'flt')
45
46 sink_comp = graph.add_component(MySink, 'sink')
47
48 if flt_comp_cls is not None:
49 assert flt_comp is not None
50 graph.connect_ports(
51 src_comp.output_ports['out'], flt_comp.input_ports['in']
52 )
53 out_port = flt_comp.output_ports['out']
54 else:
55 out_port = src_comp.output_ports['out']
56
57 graph.connect_ports(out_port, sink_comp.input_ports['in'])
58 return graph
59
60 def test_init(self):
61 the_output_port_from_source = None
62 the_output_port_from_iter = None
63
64 class MyIter(bt2._UserMessageIterator):
65 def __init__(self, self_port_output):
66 nonlocal initialized
67 nonlocal the_output_port_from_iter
68 initialized = True
69 the_output_port_from_iter = self_port_output
70
71 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
72 def __init__(self, params, obj):
73 nonlocal the_output_port_from_source
74 the_output_port_from_source = self._add_output_port('out', 'user data')
75
76 initialized = False
77 graph = self._create_graph(MySource)
78 graph.run()
79 self.assertTrue(initialized)
80 self.assertEqual(
81 the_output_port_from_source.addr, the_output_port_from_iter.addr
82 )
83 self.assertEqual(the_output_port_from_iter.user_data, 'user data')
84
85 def test_create_from_message_iterator(self):
86 class MySourceIter(bt2._UserMessageIterator):
87 def __init__(self, self_port_output):
88 nonlocal src_iter_initialized
89 src_iter_initialized = True
90
91 class MySource(bt2._UserSourceComponent, message_iterator_class=MySourceIter):
92 def __init__(self, params, obj):
93 self._add_output_port('out')
94
95 class MyFilterIter(bt2._UserMessageIterator):
96 def __init__(self, self_port_output):
97 nonlocal flt_iter_initialized
98 flt_iter_initialized = True
99 self._up_iter = self._create_input_port_message_iterator(
100 self._component._input_ports['in']
101 )
102
103 def __next__(self):
104 return next(self._up_iter)
105
106 class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
107 def __init__(self, params, obj):
108 self._add_input_port('in')
109 self._add_output_port('out')
110
111 src_iter_initialized = False
112 flt_iter_initialized = False
113 graph = self._create_graph(MySource, MyFilter)
114 graph.run()
115 self.assertTrue(src_iter_initialized)
116 self.assertTrue(flt_iter_initialized)
117
118 def test_finalize(self):
119 class MyIter(bt2._UserMessageIterator):
120 def _user_finalize(self):
121 nonlocal finalized
122 finalized = True
123
124 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
125 def __init__(self, params, obj):
126 self._add_output_port('out')
127
128 finalized = False
129 graph = self._create_graph(MySource)
130 graph.run()
131 del graph
132 self.assertTrue(finalized)
133
134 def test_component(self):
135 class MyIter(bt2._UserMessageIterator):
136 def __init__(self, self_port_output):
137 nonlocal salut
138 salut = self._component._salut
139
140 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
141 def __init__(self, params, obj):
142 self._add_output_port('out')
143 self._salut = 23
144
145 salut = None
146 graph = self._create_graph(MySource)
147 graph.run()
148 self.assertEqual(salut, 23)
149
150 def test_addr(self):
151 class MyIter(bt2._UserMessageIterator):
152 def __init__(self, self_port_output):
153 nonlocal addr
154 addr = self.addr
155
156 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
157 def __init__(self, params, obj):
158 self._add_output_port('out')
159
160 addr = None
161 graph = self._create_graph(MySource)
162 graph.run()
163 self.assertIsNotNone(addr)
164 self.assertNotEqual(addr, 0)
165
166 # Test that messages returned by _UserMessageIterator.__next__ remain valid
167 # and can be re-used.
168 def test_reuse_message(self):
169 class MyIter(bt2._UserMessageIterator):
170 def __init__(self, port):
171 tc, sc, ec = port.user_data
172 trace = tc()
173 stream = trace.create_stream(sc)
174 packet = stream.create_packet()
175
176 # This message will be returned twice by __next__.
177 event_message = self._create_event_message(ec, packet)
178
179 self._msgs = [
180 self._create_stream_beginning_message(stream),
181 self._create_packet_beginning_message(packet),
182 event_message,
183 event_message,
184 ]
185
186 def __next__(self):
187 return self._msgs.pop(0)
188
189 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
190 def __init__(self, params, obj):
191 tc = self._create_trace_class()
192 sc = tc.create_stream_class(supports_packets=True)
193 ec = sc.create_event_class()
194 self._add_output_port('out', (tc, sc, ec))
195
196 graph = bt2.Graph()
197 src = graph.add_component(MySource, 'src')
198 it = TestOutputPortMessageIterator(graph, src.output_ports['out'])
199
200 # Skip beginning messages.
201 msg = next(it)
202 self.assertIsInstance(msg, bt2._StreamBeginningMessage)
203 msg = next(it)
204 self.assertIsInstance(msg, bt2._PacketBeginningMessage)
205
206 msg_ev1 = next(it)
207 msg_ev2 = next(it)
208
209 self.assertIsInstance(msg_ev1, bt2._EventMessage)
210 self.assertIsInstance(msg_ev2, bt2._EventMessage)
211 self.assertEqual(msg_ev1.addr, msg_ev2.addr)
212
213 @staticmethod
214 def _setup_seek_beginning_test(sink_cls):
215 # Use a source, a filter and an output port iterator. This allows us
216 # to test calling `seek_beginning` on both a _OutputPortMessageIterator
217 # and a _UserComponentInputPortMessageIterator, on top of checking that
218 # _UserMessageIterator._seek_beginning is properly called.
219
220 class MySourceIter(bt2._UserMessageIterator):
221 def __init__(self, port):
222 tc, sc, ec = port.user_data
223 trace = tc()
224 stream = trace.create_stream(sc)
225 packet = stream.create_packet()
226
227 self._msgs = [
228 self._create_stream_beginning_message(stream),
229 self._create_packet_beginning_message(packet),
230 self._create_event_message(ec, packet),
231 self._create_event_message(ec, packet),
232 self._create_packet_end_message(packet),
233 self._create_stream_end_message(stream),
234 ]
235 self._at = 0
236
237 def _user_seek_beginning(self):
238 self._at = 0
239
240 def __next__(self):
241 if self._at < len(self._msgs):
242 msg = self._msgs[self._at]
243 self._at += 1
244 return msg
245 else:
246 raise StopIteration
247
248 class MySource(bt2._UserSourceComponent, message_iterator_class=MySourceIter):
249 def __init__(self, params, obj):
250 tc = self._create_trace_class()
251 sc = tc.create_stream_class(supports_packets=True)
252 ec = sc.create_event_class()
253
254 self._add_output_port('out', (tc, sc, ec))
255
256 class MyFilterIter(bt2._UserMessageIterator):
257 def __init__(self, port):
258 input_port = port.user_data
259 self._upstream_iter = self._create_input_port_message_iterator(
260 input_port
261 )
262
263 def __next__(self):
264 return next(self._upstream_iter)
265
266 def _user_seek_beginning(self):
267 self._upstream_iter.seek_beginning()
268
269 @property
270 def _user_can_seek_beginning(self):
271 return self._upstream_iter.can_seek_beginning
272
273 class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
274 def __init__(self, params, obj):
275 input_port = self._add_input_port('in')
276 self._add_output_port('out', input_port)
277
278 graph = bt2.Graph()
279 src = graph.add_component(MySource, 'src')
280 flt = graph.add_component(MyFilter, 'flt')
281 sink = graph.add_component(sink_cls, 'sink')
282 graph.connect_ports(src.output_ports['out'], flt.input_ports['in'])
283 graph.connect_ports(flt.output_ports['out'], sink.input_ports['in'])
284 return MySourceIter, graph
285
286 def test_can_seek_beginning(self):
287 class MySink(bt2._UserSinkComponent):
288 def __init__(self, params, obj):
289 self._add_input_port('in')
290
291 def _user_graph_is_configured(self):
292 self._msg_iter = self._create_input_port_message_iterator(
293 self._input_ports['in']
294 )
295
296 def _user_consume(self):
297 nonlocal can_seek_beginning
298 can_seek_beginning = self._msg_iter.can_seek_beginning
299
300 MySourceIter, graph = self._setup_seek_beginning_test(MySink)
301
302 def _user_can_seek_beginning(self):
303 nonlocal input_port_iter_can_seek_beginning
304 return input_port_iter_can_seek_beginning
305
306 MySourceIter._user_can_seek_beginning = property(_user_can_seek_beginning)
307
308 input_port_iter_can_seek_beginning = True
309 can_seek_beginning = None
310 graph.run_once()
311 self.assertTrue(can_seek_beginning)
312
313 input_port_iter_can_seek_beginning = False
314 can_seek_beginning = None
315 graph.run_once()
316 self.assertFalse(can_seek_beginning)
317
318 # Once can_seek_beginning returns an error, verify that it raises when
319 # _can_seek_beginning has/returns the wrong type.
320
321 # Remove the _can_seek_beginning method, we now rely on the presence of
322 # a _seek_beginning method to know whether the iterator can seek to
323 # beginning or not.
324 del MySourceIter._user_can_seek_beginning
325 can_seek_beginning = None
326 graph.run_once()
327 self.assertTrue(can_seek_beginning)
328
329 del MySourceIter._user_seek_beginning
330 can_seek_beginning = None
331 graph.run_once()
332 self.assertFalse(can_seek_beginning)
333
334 def test_seek_beginning(self):
335 class MySink(bt2._UserSinkComponent):
336 def __init__(self, params, obj):
337 self._add_input_port('in')
338
339 def _user_graph_is_configured(self):
340 self._msg_iter = self._create_input_port_message_iterator(
341 self._input_ports['in']
342 )
343
344 def _user_consume(self):
345 nonlocal do_seek_beginning
346 nonlocal msg
347
348 if do_seek_beginning:
349 self._msg_iter.seek_beginning()
350 return
351
352 msg = next(self._msg_iter)
353
354 do_seek_beginning = False
355 msg = None
356 MySourceIter, graph = self._setup_seek_beginning_test(MySink)
357 graph.run_once()
358 self.assertIsInstance(msg, bt2._StreamBeginningMessage)
359 graph.run_once()
360 self.assertIsInstance(msg, bt2._PacketBeginningMessage)
361 do_seek_beginning = True
362 graph.run_once()
363 do_seek_beginning = False
364 graph.run_once()
365 self.assertIsInstance(msg, bt2._StreamBeginningMessage)
366
367 def test_seek_beginning_user_error(self):
368 class MySink(bt2._UserSinkComponent):
369 def __init__(self, params, obj):
370 self._add_input_port('in')
371
372 def _user_graph_is_configured(self):
373 self._msg_iter = self._create_input_port_message_iterator(
374 self._input_ports['in']
375 )
376
377 def _user_consume(self):
378 self._msg_iter.seek_beginning()
379
380 MySourceIter, graph = self._setup_seek_beginning_test(MySink)
381
382 def _user_seek_beginning_error(self):
383 raise ValueError('ouch')
384
385 MySourceIter._user_seek_beginning = _user_seek_beginning_error
386
387 with self.assertRaises(bt2._Error):
388 graph.run_once()
389
390 # Try consuming many times from an iterator that always returns TryAgain.
391 # This verifies that we are not missing an incref of Py_None, making the
392 # refcount of Py_None reach 0.
393 def test_try_again_many_times(self):
394 class MyIter(bt2._UserMessageIterator):
395 def __next__(self):
396 raise bt2.TryAgain
397
398 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
399 def __init__(self, params, obj):
400 self._add_output_port('out')
401
402 graph = bt2.Graph()
403 src = graph.add_component(MySource, 'src')
404 it = TestOutputPortMessageIterator(graph, src.output_ports['out'])
405
406 # Three times the initial ref count of `None` iterations should
407 # be enough to catch the bug even if there are small differences
408 # between configurations.
409 none_ref_count = sys.getrefcount(None) * 3
410
411 for i in range(none_ref_count):
412 with self.assertRaises(bt2.TryAgain):
413 next(it)
414
415
416 if __name__ == '__main__':
417 unittest.main()
This page took 0.039287 seconds and 5 git commands to generate.