lib: remove output port message iterator
[babeltrace.git] / tests / bindings / python / bt2 / test_message.py
1 #
2 # Copyright (C) 2019 EfficiOS Inc.
3 #
4 # This program is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License
6 # as published by the Free Software Foundation; only version 2
7 # of the License.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 #
18
19 import collections
20 import unittest
21 import bt2
22 from utils import TestOutputPortMessageIterator
23
24
25 class AllMessagesTestCase(unittest.TestCase):
26 def setUp(self):
27 class MyIter(bt2._UserMessageIterator):
28 def __init__(self, self_port_output):
29 self._at = 0
30 self._with_stream_msgs_clock_snapshots = self_port_output.user_data.get(
31 'with_stream_msgs_clock_snapshots', False
32 )
33
34 def __next__(self):
35 if test_obj._clock_class:
36 if self._at == 0:
37 if self._with_stream_msgs_clock_snapshots:
38 msg = self._create_stream_beginning_message(
39 test_obj._stream, default_clock_snapshot=self._at
40 )
41 else:
42 msg = self._create_stream_beginning_message(
43 test_obj._stream
44 )
45 elif self._at == 1:
46 msg = self._create_packet_beginning_message(
47 test_obj._packet, self._at
48 )
49 elif self._at == 2:
50 msg = self._create_event_message(
51 test_obj._event_class, test_obj._packet, self._at
52 )
53 elif self._at == 3:
54 msg = self._create_message_iterator_inactivity_message(
55 test_obj._clock_class, self._at
56 )
57 elif self._at == 4:
58 msg = self._create_discarded_events_message(
59 test_obj._stream, 890, self._at, self._at
60 )
61 elif self._at == 5:
62 msg = self._create_packet_end_message(
63 test_obj._packet, self._at
64 )
65 elif self._at == 6:
66 msg = self._create_discarded_packets_message(
67 test_obj._stream, 678, self._at, self._at
68 )
69 elif self._at == 7:
70 if self._with_stream_msgs_clock_snapshots:
71 msg = self._create_stream_end_message(
72 test_obj._stream, default_clock_snapshot=self._at
73 )
74 else:
75 msg = self._create_stream_end_message(test_obj._stream)
76 elif self._at >= 8:
77 raise bt2.Stop
78 else:
79 if self._at == 0:
80 msg = self._create_stream_beginning_message(test_obj._stream)
81 elif self._at == 1:
82 msg = self._create_packet_beginning_message(test_obj._packet)
83 elif self._at == 2:
84 msg = self._create_event_message(
85 test_obj._event_class, test_obj._packet
86 )
87 elif self._at == 3:
88 msg = self._create_discarded_events_message(
89 test_obj._stream, 890
90 )
91 elif self._at == 4:
92 msg = self._create_packet_end_message(test_obj._packet)
93 elif self._at == 5:
94 msg = self._create_discarded_packets_message(
95 test_obj._stream, 678
96 )
97 elif self._at == 6:
98 msg = self._create_stream_end_message(test_obj._stream)
99 elif self._at >= 7:
100 raise bt2.Stop
101
102 self._at += 1
103 return msg
104
105 class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
106 def __init__(self, params, obj):
107 self._add_output_port('out', params)
108
109 with_cc = bool(params['with_cc'])
110 tc = self._create_trace_class()
111 if with_cc:
112 cc = self._create_clock_class()
113 else:
114 cc = None
115
116 sc = tc.create_stream_class(
117 default_clock_class=cc,
118 supports_packets=True,
119 packets_have_beginning_default_clock_snapshot=with_cc,
120 packets_have_end_default_clock_snapshot=with_cc,
121 supports_discarded_events=True,
122 discarded_events_have_default_clock_snapshots=with_cc,
123 supports_discarded_packets=True,
124 discarded_packets_have_default_clock_snapshots=with_cc,
125 )
126
127 # Create payload field class
128 my_int_fc = tc.create_signed_integer_field_class(32)
129 payload_fc = tc.create_structure_field_class()
130 payload_fc += [('my_int', my_int_fc)]
131
132 ec = sc.create_event_class(name='salut', payload_field_class=payload_fc)
133
134 trace = tc()
135 stream = trace.create_stream(sc)
136 packet = stream.create_packet()
137
138 test_obj._trace = trace
139 test_obj._stream = stream
140 test_obj._packet = packet
141 test_obj._event_class = ec
142 test_obj._clock_class = cc
143
144 test_obj = self
145 self._graph = bt2.Graph()
146 self._src = MySrc
147 self._iter = MyIter
148
149 def test_all_msg_with_cc(self):
150 params = {'with_cc': True}
151 self._src_comp = self._graph.add_component(self._src, 'my_source', params)
152 self._msg_iter = TestOutputPortMessageIterator(
153 self._graph, self._src_comp.output_ports['out']
154 )
155
156 for i, msg in enumerate(self._msg_iter):
157 if i == 0:
158 self.assertIsInstance(msg, bt2._StreamBeginningMessage)
159 self.assertEqual(msg.stream.addr, self._stream.addr)
160 self.assertIsInstance(
161 msg.default_clock_snapshot, bt2._UnknownClockSnapshot
162 )
163 elif i == 1:
164 self.assertIsInstance(msg, bt2._PacketBeginningMessage)
165 self.assertEqual(msg.packet.addr, self._packet.addr)
166 self.assertEqual(msg.default_clock_snapshot.value, i)
167 elif i == 2:
168 self.assertIsInstance(msg, bt2._EventMessage)
169 self.assertEqual(msg.event.cls.addr, self._event_class.addr)
170 self.assertEqual(msg.default_clock_snapshot.value, i)
171 elif i == 3:
172 self.assertIsInstance(msg, bt2._MessageIteratorInactivityMessage)
173 self.assertEqual(msg.default_clock_snapshot.value, i)
174 elif i == 4:
175 self.assertIsInstance(msg, bt2._DiscardedEventsMessage)
176 self.assertEqual(msg.stream.addr, self._stream.addr)
177 self.assertEqual(msg.count, 890)
178 self.assertEqual(
179 msg.stream.cls.default_clock_class.addr, self._clock_class.addr
180 )
181 self.assertEqual(msg.beginning_default_clock_snapshot.value, i)
182 self.assertEqual(msg.end_default_clock_snapshot.value, i)
183 elif i == 5:
184 self.assertIsInstance(msg, bt2._PacketEndMessage)
185 self.assertEqual(msg.packet.addr, self._packet.addr)
186 self.assertEqual(msg.default_clock_snapshot.value, i)
187 elif i == 6:
188 self.assertIsInstance(msg, bt2._DiscardedPacketsMessage)
189 self.assertEqual(msg.stream.addr, self._stream.addr)
190 self.assertEqual(msg.count, 678)
191 self.assertEqual(
192 msg.stream.cls.default_clock_class.addr, self._clock_class.addr
193 )
194 self.assertEqual(msg.beginning_default_clock_snapshot.value, i)
195 self.assertEqual(msg.end_default_clock_snapshot.value, i)
196 elif i == 7:
197 self.assertIsInstance(msg, bt2._StreamEndMessage)
198 self.assertEqual(msg.stream.addr, self._stream.addr)
199 self.assertIsInstance(
200 msg.default_clock_snapshot, bt2._UnknownClockSnapshot
201 )
202 else:
203 raise Exception
204
205 def test_all_msg_without_cc(self):
206 params = {'with_cc': False}
207 self._src_comp = self._graph.add_component(self._src, 'my_source', params)
208 self._msg_iter = TestOutputPortMessageIterator(
209 self._graph, self._src_comp.output_ports['out']
210 )
211
212 for i, msg in enumerate(self._msg_iter):
213 if i == 0:
214 self.assertIsInstance(msg, bt2._StreamBeginningMessage)
215 self.assertEqual(msg.stream.addr, self._stream.addr)
216 with self.assertRaisesRegex(
217 ValueError, 'stream class has no default clock class'
218 ):
219 msg.default_clock_snapshot
220 elif i == 1:
221 self.assertIsInstance(msg, bt2._PacketBeginningMessage)
222 self.assertEqual(msg.packet.addr, self._packet.addr)
223 elif i == 2:
224 self.assertIsInstance(msg, bt2._EventMessage)
225 self.assertEqual(msg.event.cls.addr, self._event_class.addr)
226 with self.assertRaisesRegex(
227 ValueError, 'stream class has no default clock class'
228 ):
229 msg.default_clock_snapshot
230 elif i == 3:
231 self.assertIsInstance(msg, bt2._DiscardedEventsMessage)
232 self.assertEqual(msg.stream.addr, self._stream.addr)
233 self.assertEqual(msg.count, 890)
234 self.assertIsNone(msg.stream.cls.default_clock_class)
235 with self.assertRaisesRegex(
236 ValueError,
237 'such a message has no clock snapshots for this stream class',
238 ):
239 msg.beginning_default_clock_snapshot
240 with self.assertRaisesRegex(
241 ValueError,
242 'such a message has no clock snapshots for this stream class',
243 ):
244 msg.end_default_clock_snapshot
245 elif i == 4:
246 self.assertIsInstance(msg, bt2._PacketEndMessage)
247 self.assertEqual(msg.packet.addr, self._packet.addr)
248 elif i == 5:
249 self.assertIsInstance(msg, bt2._DiscardedPacketsMessage)
250 self.assertEqual(msg.stream.addr, self._stream.addr)
251 self.assertEqual(msg.count, 678)
252 self.assertIsNone(msg.stream.cls.default_clock_class)
253 with self.assertRaisesRegex(
254 ValueError,
255 'such a message has no clock snapshots for this stream class',
256 ):
257 msg.beginning_default_clock_snapshot
258 with self.assertRaisesRegex(
259 ValueError,
260 'such a message has no clock snapshots for this stream class',
261 ):
262 msg.end_default_clock_snapshot
263 elif i == 6:
264 self.assertIsInstance(msg, bt2._StreamEndMessage)
265 self.assertEqual(msg.stream.addr, self._stream.addr)
266 with self.assertRaisesRegex(
267 ValueError, 'stream class has no default clock class'
268 ):
269 msg.default_clock_snapshot
270 else:
271 raise Exception
272
273 def test_msg_stream_with_clock_snapshots(self):
274 params = {'with_cc': True, 'with_stream_msgs_clock_snapshots': True}
275
276 self._src_comp = self._graph.add_component(self._src, 'my_source', params)
277 self._msg_iter = TestOutputPortMessageIterator(
278 self._graph, self._src_comp.output_ports['out']
279 )
280 msgs = list(self._msg_iter)
281
282 msg_stream_beg = msgs[0]
283 self.assertIsInstance(msg_stream_beg, bt2._StreamBeginningMessage)
284 self.assertEqual(msg_stream_beg.default_clock_snapshot.value, 0)
285
286 msg_stream_end = msgs[7]
287 self.assertIsInstance(msg_stream_end, bt2._StreamEndMessage)
288 self.assertEqual(msg_stream_end.default_clock_snapshot.value, 7)
This page took 0.035538 seconds and 4 git commands to generate.