bt2: make `TraceCollectionMessageIterator` not use an output port msg iter
[babeltrace.git] / tests / bindings / python / bt2 / test_message_iterator.py
CommitLineData
32d2d479
MJ
1#
2# Copyright (C) 2019 EfficiOS Inc.
3#
4# This program is free software; you can redistribute it and/or
5# modify it under the terms of the GNU General Public License
6# as published by the Free Software Foundation; only version 2
7# of the License.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License
15# along with this program; if not, write to the Free Software
16# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17#
18
ae0bfae8 19from bt2 import value
0d0edb5e 20import collections
f6a5e476
PP
21import unittest
22import copy
23import bt2
24
25
fa4c33e3 26class UserMessageIteratorTestCase(unittest.TestCase):
f6a5e476 27 @staticmethod
692f1a01 28 def _create_graph(src_comp_cls, flt_comp_cls=None):
f6a5e476 29 class MySink(bt2._UserSinkComponent):
b20382e2 30 def __init__(self, params, obj):
f6a5e476
PP
31 self._add_input_port('in')
32
819d0ae7 33 def _user_consume(self):
fa4c33e3 34 next(self._msg_iter)
f6a5e476 35
819d0ae7 36 def _user_graph_is_configured(self):
692f1a01
PP
37 self._msg_iter = self._create_input_port_message_iterator(
38 self._input_ports['in']
39 )
f6a5e476
PP
40
41 graph = bt2.Graph()
42 src_comp = graph.add_component(src_comp_cls, 'src')
692f1a01
PP
43
44 if flt_comp_cls is not None:
45 flt_comp = graph.add_component(flt_comp_cls, 'flt')
46
f6a5e476 47 sink_comp = graph.add_component(MySink, 'sink')
692f1a01
PP
48
49 if flt_comp_cls is not None:
50 assert flt_comp is not None
51 graph.connect_ports(
52 src_comp.output_ports['out'], flt_comp.input_ports['in']
53 )
54 out_port = flt_comp.output_ports['out']
55 else:
56 out_port = src_comp.output_ports['out']
57
58 graph.connect_ports(out_port, sink_comp.input_ports['in'])
f6a5e476
PP
59 return graph
60
61 def test_init(self):
a4dcfa96
SM
62 the_output_port_from_source = None
63 the_output_port_from_iter = None
64
fa4c33e3 65 class MyIter(bt2._UserMessageIterator):
a4dcfa96 66 def __init__(self, self_port_output):
f6a5e476 67 nonlocal initialized
a4dcfa96 68 nonlocal the_output_port_from_iter
f6a5e476 69 initialized = True
a4dcfa96 70 the_output_port_from_iter = self_port_output
f6a5e476 71
61d96b89 72 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
b20382e2 73 def __init__(self, params, obj):
a4dcfa96 74 nonlocal the_output_port_from_source
03ec9ebd 75 the_output_port_from_source = self._add_output_port('out', 'user data')
f6a5e476
PP
76
77 initialized = False
78 graph = self._create_graph(MySource)
a4dcfa96 79 graph.run()
f6a5e476 80 self.assertTrue(initialized)
61d96b89
FD
81 self.assertEqual(
82 the_output_port_from_source.addr, the_output_port_from_iter.addr
83 )
03ec9ebd 84 self.assertEqual(the_output_port_from_iter.user_data, 'user data')
f6a5e476 85
692f1a01
PP
86 def test_create_from_message_iterator(self):
87 class MySourceIter(bt2._UserMessageIterator):
88 def __init__(self, self_port_output):
89 nonlocal src_iter_initialized
90 src_iter_initialized = True
91
92 class MySource(bt2._UserSourceComponent, message_iterator_class=MySourceIter):
b20382e2 93 def __init__(self, params, obj):
692f1a01
PP
94 self._add_output_port('out')
95
96 class MyFilterIter(bt2._UserMessageIterator):
97 def __init__(self, self_port_output):
98 nonlocal flt_iter_initialized
99 flt_iter_initialized = True
100 self._up_iter = self._create_input_port_message_iterator(
101 self._component._input_ports['in']
102 )
103
104 def __next__(self):
105 return next(self._up_iter)
106
107 class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
b20382e2 108 def __init__(self, params, obj):
692f1a01
PP
109 self._add_input_port('in')
110 self._add_output_port('out')
111
112 src_iter_initialized = False
113 flt_iter_initialized = False
114 graph = self._create_graph(MySource, MyFilter)
115 graph.run()
116 self.assertTrue(src_iter_initialized)
117 self.assertTrue(flt_iter_initialized)
118
f6a5e476 119 def test_finalize(self):
fa4c33e3 120 class MyIter(bt2._UserMessageIterator):
819d0ae7 121 def _user_finalize(self):
f6a5e476
PP
122 nonlocal finalized
123 finalized = True
124
61d96b89 125 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
b20382e2 126 def __init__(self, params, obj):
f6a5e476
PP
127 self._add_output_port('out')
128
129 finalized = False
130 graph = self._create_graph(MySource)
a4dcfa96 131 graph.run()
f6a5e476
PP
132 del graph
133 self.assertTrue(finalized)
134
135 def test_component(self):
fa4c33e3 136 class MyIter(bt2._UserMessageIterator):
a4dcfa96 137 def __init__(self, self_port_output):
f6a5e476
PP
138 nonlocal salut
139 salut = self._component._salut
140
61d96b89 141 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
b20382e2 142 def __init__(self, params, obj):
f6a5e476
PP
143 self._add_output_port('out')
144 self._salut = 23
145
146 salut = None
147 graph = self._create_graph(MySource)
a4dcfa96 148 graph.run()
f6a5e476
PP
149 self.assertEqual(salut, 23)
150
151 def test_addr(self):
fa4c33e3 152 class MyIter(bt2._UserMessageIterator):
a4dcfa96 153 def __init__(self, self_port_output):
f6a5e476
PP
154 nonlocal addr
155 addr = self.addr
156
61d96b89 157 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
b20382e2 158 def __init__(self, params, obj):
f6a5e476
PP
159 self._add_output_port('out')
160
161 addr = None
162 graph = self._create_graph(MySource)
a4dcfa96 163 graph.run()
f6a5e476
PP
164 self.assertIsNotNone(addr)
165 self.assertNotEqual(addr, 0)
166
4e853135
SM
167 # Test that messages returned by _UserMessageIterator.__next__ remain valid
168 # and can be re-used.
169 def test_reuse_message(self):
170 class MyIter(bt2._UserMessageIterator):
171 def __init__(self, port):
172 tc, sc, ec = port.user_data
173 trace = tc()
174 stream = trace.create_stream(sc)
175 packet = stream.create_packet()
176
177 # This message will be returned twice by __next__.
178 event_message = self._create_event_message(ec, packet)
179
180 self._msgs = [
181 self._create_stream_beginning_message(stream),
4e853135
SM
182 self._create_packet_beginning_message(packet),
183 event_message,
184 event_message,
185 ]
186
187 def __next__(self):
188 return self._msgs.pop(0)
189
190 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
b20382e2 191 def __init__(self, params, obj):
4e853135 192 tc = self._create_trace_class()
37a93d41 193 sc = tc.create_stream_class(supports_packets=True)
4e853135
SM
194 ec = sc.create_event_class()
195 self._add_output_port('out', (tc, sc, ec))
196
197 graph = bt2.Graph()
198 src = graph.add_component(MySource, 'src')
199 it = graph.create_output_port_message_iterator(src.output_ports['out'])
200
201 # Skip beginning messages.
b7cbc799 202 msg = next(it)
c946c9de 203 self.assertIsInstance(msg, bt2._StreamBeginningMessage)
b7cbc799 204 msg = next(it)
c946c9de 205 self.assertIsInstance(msg, bt2._PacketBeginningMessage)
4e853135
SM
206
207 msg_ev1 = next(it)
208 msg_ev2 = next(it)
209
c946c9de
PP
210 self.assertIsInstance(msg_ev1, bt2._EventMessage)
211 self.assertIsInstance(msg_ev2, bt2._EventMessage)
4e853135
SM
212 self.assertEqual(msg_ev1.addr, msg_ev2.addr)
213
39ddfa44
SM
214 @staticmethod
215 def _setup_seek_beginning_test():
216 # Use a source, a filter and an output port iterator. This allows us
217 # to test calling `seek_beginning` on both a _OutputPortMessageIterator
218 # and a _UserComponentInputPortMessageIterator, on top of checking that
219 # _UserMessageIterator._seek_beginning is properly called.
220
221 class MySourceIter(bt2._UserMessageIterator):
222 def __init__(self, port):
223 tc, sc, ec = port.user_data
224 trace = tc()
225 stream = trace.create_stream(sc)
226 packet = stream.create_packet()
227
228 self._msgs = [
229 self._create_stream_beginning_message(stream),
39ddfa44
SM
230 self._create_packet_beginning_message(packet),
231 self._create_event_message(ec, packet),
232 self._create_event_message(ec, packet),
233 self._create_packet_end_message(packet),
39ddfa44
SM
234 self._create_stream_end_message(stream),
235 ]
236 self._at = 0
237
819d0ae7 238 def _user_seek_beginning(self):
39ddfa44
SM
239 self._at = 0
240
241 def __next__(self):
242 if self._at < len(self._msgs):
243 msg = self._msgs[self._at]
244 self._at += 1
245 return msg
246 else:
247 raise StopIteration
248
61d96b89 249 class MySource(bt2._UserSourceComponent, message_iterator_class=MySourceIter):
b20382e2 250 def __init__(self, params, obj):
39ddfa44 251 tc = self._create_trace_class()
37a93d41 252 sc = tc.create_stream_class(supports_packets=True)
39ddfa44
SM
253 ec = sc.create_event_class()
254
255 self._add_output_port('out', (tc, sc, ec))
256
257 class MyFilterIter(bt2._UserMessageIterator):
258 def __init__(self, port):
259 input_port = port.user_data
692f1a01
PP
260 self._upstream_iter = self._create_input_port_message_iterator(
261 input_port
262 )
39ddfa44
SM
263
264 def __next__(self):
265 return next(self._upstream_iter)
266
819d0ae7 267 def _user_seek_beginning(self):
39ddfa44
SM
268 self._upstream_iter.seek_beginning()
269
270 @property
819d0ae7 271 def _user_can_seek_beginning(self):
39ddfa44
SM
272 return self._upstream_iter.can_seek_beginning
273
274 class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
b20382e2 275 def __init__(self, params, obj):
39ddfa44
SM
276 input_port = self._add_input_port('in')
277 self._add_output_port('out', input_port)
278
39ddfa44
SM
279 graph = bt2.Graph()
280 src = graph.add_component(MySource, 'src')
281 flt = graph.add_component(MyFilter, 'flt')
282 graph.connect_ports(src.output_ports['out'], flt.input_ports['in'])
283 it = graph.create_output_port_message_iterator(flt.output_ports['out'])
284
285 return it, MySourceIter
286
287 def test_can_seek_beginning(self):
288 it, MySourceIter = self._setup_seek_beginning_test()
289
819d0ae7 290 def _user_can_seek_beginning(self):
39ddfa44
SM
291 nonlocal can_seek_beginning
292 return can_seek_beginning
293
819d0ae7 294 MySourceIter._user_can_seek_beginning = property(_user_can_seek_beginning)
39ddfa44
SM
295
296 can_seek_beginning = True
297 self.assertTrue(it.can_seek_beginning)
298
299 can_seek_beginning = False
300 self.assertFalse(it.can_seek_beginning)
301
302 # Once can_seek_beginning returns an error, verify that it raises when
303 # _can_seek_beginning has/returns the wrong type.
304
305 # Remove the _can_seek_beginning method, we now rely on the presence of
306 # a _seek_beginning method to know whether the iterator can seek to
307 # beginning or not.
819d0ae7 308 del MySourceIter._user_can_seek_beginning
39ddfa44
SM
309 self.assertTrue(it.can_seek_beginning)
310
819d0ae7 311 del MySourceIter._user_seek_beginning
39ddfa44
SM
312 self.assertFalse(it.can_seek_beginning)
313
314 def test_seek_beginning(self):
315 it, MySourceIter = self._setup_seek_beginning_test()
316
317 msg = next(it)
c946c9de 318 self.assertIsInstance(msg, bt2._StreamBeginningMessage)
39ddfa44 319 msg = next(it)
c946c9de 320 self.assertIsInstance(msg, bt2._PacketBeginningMessage)
39ddfa44
SM
321
322 it.seek_beginning()
323
324 msg = next(it)
c946c9de 325 self.assertIsInstance(msg, bt2._StreamBeginningMessage)
39ddfa44
SM
326
327 # Verify that we can seek beginning after having reached the end.
328 #
329 # It currently does not work to seek an output port message iterator
330 # once it's ended, but we should eventually make it work and uncomment
331 # the following snippet.
332 #
333 # try:
334 # while True:
335 # next(it)
336 # except bt2.Stop:
337 # pass
338 #
339 # it.seek_beginning()
340 # msg = next(it)
c946c9de 341 # self.assertIsInstance(msg, bt2._StreamBeginningMessage)
39ddfa44
SM
342
343 def test_seek_beginning_user_error(self):
344 it, MySourceIter = self._setup_seek_beginning_test()
345
819d0ae7 346 def _user_seek_beginning_error(self):
61d96b89 347 raise ValueError('ouch')
39ddfa44 348
819d0ae7 349 MySourceIter._user_seek_beginning = _user_seek_beginning_error
39ddfa44 350
614743a5 351 with self.assertRaises(bt2._Error):
39ddfa44
SM
352 it.seek_beginning()
353
752f4edf
SM
354 # Try consuming many times from an iterator that always returns TryAgain.
355 # This verifies that we are not missing an incref of Py_None, making the
356 # refcount of Py_None reach 0.
357 def test_try_again_many_times(self):
358 class MyIter(bt2._UserMessageIterator):
359 def __next__(self):
360 raise bt2.TryAgain
361
362 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
b20382e2 363 def __init__(self, params, obj):
752f4edf
SM
364 self._add_output_port('out')
365
366 graph = bt2.Graph()
367 src = graph.add_component(MySource, 'src')
368 it = graph.create_output_port_message_iterator(src.output_ports['out'])
369
370 # The initial refcount of Py_None was in the 7000, so 100000 iterations
371 # should be enough to catch the bug even if there are small differences
372 # between configurations.
373 for i in range(100000):
374 with self.assertRaises(bt2.TryAgain):
375 next(it)
376
39ddfa44 377
fa4c33e3 378class OutputPortMessageIteratorTestCase(unittest.TestCase):
0d0edb5e 379 def test_component(self):
fa4c33e3 380 class MyIter(bt2._UserMessageIterator):
a4dcfa96 381 def __init__(self, self_port_output):
0d0edb5e
PP
382 self._at = 0
383
0d0edb5e 384 def __next__(self):
a4dcfa96 385 if self._at == 7:
0d0edb5e
PP
386 raise bt2.Stop
387
a4dcfa96
SM
388 if self._at == 0:
389 msg = self._create_stream_beginning_message(test_obj._stream)
390 elif self._at == 1:
391 msg = self._create_packet_beginning_message(test_obj._packet)
392 elif self._at == 5:
393 msg = self._create_packet_end_message(test_obj._packet)
394 elif self._at == 6:
395 msg = self._create_stream_end_message(test_obj._stream)
396 else:
61d96b89
FD
397 msg = self._create_event_message(
398 test_obj._event_class, test_obj._packet
399 )
a4dcfa96
SM
400 msg.event.payload_field['my_int'] = self._at * 3
401
0d0edb5e 402 self._at += 1
fa4c33e3 403 return msg
0d0edb5e 404
61d96b89 405 class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
b20382e2 406 def __init__(self, params, obj):
0d0edb5e
PP
407 self._add_output_port('out')
408
a4dcfa96 409 trace_class = self._create_trace_class()
37a93d41 410 stream_class = trace_class.create_stream_class(supports_packets=True)
a4dcfa96
SM
411
412 # Create payload field class
413 my_int_ft = trace_class.create_signed_integer_field_class(32)
414 payload_ft = trace_class.create_structure_field_class()
61d96b89 415 payload_ft += [('my_int', my_int_ft)]
a4dcfa96 416
61d96b89
FD
417 event_class = stream_class.create_event_class(
418 name='salut', payload_field_class=payload_ft
419 )
a4dcfa96
SM
420
421 trace = trace_class()
422 stream = trace.create_stream(stream_class)
423 packet = stream.create_packet()
424
425 test_obj._event_class = event_class
426 test_obj._stream = stream
427 test_obj._packet = packet
428
429 test_obj = self
0d0edb5e
PP
430 graph = bt2.Graph()
431 src = graph.add_component(MySource, 'src')
a4dcfa96 432 msg_iter = graph.create_output_port_message_iterator(src.output_ports['out'])
0d0edb5e 433
fa4c33e3 434 for at, msg in enumerate(msg_iter):
a4dcfa96 435 if at == 0:
c946c9de 436 self.assertIsInstance(msg, bt2._StreamBeginningMessage)
a4dcfa96 437 elif at == 1:
c946c9de 438 self.assertIsInstance(msg, bt2._PacketBeginningMessage)
a4dcfa96 439 elif at == 5:
c946c9de 440 self.assertIsInstance(msg, bt2._PacketEndMessage)
a4dcfa96 441 elif at == 6:
c946c9de 442 self.assertIsInstance(msg, bt2._StreamEndMessage)
a4dcfa96 443 else:
c946c9de 444 self.assertIsInstance(msg, bt2._EventMessage)
c88be1c8 445 self.assertEqual(msg.event.cls.name, 'salut')
a4dcfa96
SM
446 field = msg.event.payload_field['my_int']
447 self.assertEqual(field, at * 3)
39ddfa44 448
61d96b89 449
39ddfa44
SM
450if __name__ == '__main__':
451 unittest.main()
This page took 0.059603 seconds and 4 git commands to generate.