lib: remove stream activity messages
[babeltrace.git] / tests / bindings / python / bt2 / test_message.py
1 #
2 # Copyright (C) 2019 EfficiOS Inc.
3 #
4 # This program is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License
6 # as published by the Free Software Foundation; only version 2
7 # of the License.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 #
18
19 import collections
20 import unittest
21 import bt2
22
23
24 class AllMessagesTestCase(unittest.TestCase):
25 def setUp(self):
26
27 class MyIter(bt2._UserMessageIterator):
28 def __init__(self, self_port_output):
29 self._at = 0
30 self._with_stream_msgs_clock_snapshots = self_port_output.user_data.get('with_stream_msgs_clock_snapshots', False)
31
32 def __next__(self):
33 if test_obj._clock_class:
34 if self._at == 0:
35 if self._with_stream_msgs_clock_snapshots:
36 msg = self._create_stream_beginning_message(test_obj._stream, default_clock_snapshot=self._at)
37 else:
38 msg = self._create_stream_beginning_message(test_obj._stream)
39 elif self._at == 1:
40 msg = self._create_packet_beginning_message(test_obj._packet, self._at)
41 elif self._at == 2:
42 msg = self._create_event_message(test_obj._event_class, test_obj._packet, self._at)
43 elif self._at == 3:
44 msg = self._create_message_iterator_inactivity_message(test_obj._clock_class, self._at)
45 elif self._at == 4:
46 msg = self._create_discarded_events_message(test_obj._stream, 890, self._at, self._at)
47 elif self._at == 5:
48 msg = self._create_packet_end_message(test_obj._packet, self._at)
49 elif self._at == 6:
50 msg = self._create_discarded_packets_message(test_obj._stream, 678, self._at, self._at)
51 elif self._at == 7:
52 if self._with_stream_msgs_clock_snapshots:
53 msg = self._create_stream_end_message(test_obj._stream, default_clock_snapshot=self._at)
54 else:
55 msg = self._create_stream_end_message(test_obj._stream)
56 elif self._at >= 8:
57 raise bt2.Stop
58 else:
59 if self._at == 0:
60 msg = self._create_stream_beginning_message(test_obj._stream)
61 elif self._at == 1:
62 msg = self._create_packet_beginning_message(test_obj._packet)
63 elif self._at == 2:
64 msg = self._create_event_message(test_obj._event_class, test_obj._packet)
65 elif self._at == 3:
66 msg = self._create_discarded_events_message(test_obj._stream, 890)
67 elif self._at == 4:
68 msg = self._create_packet_end_message(test_obj._packet)
69 elif self._at == 5:
70 msg = self._create_discarded_packets_message(test_obj._stream, 678)
71 elif self._at == 6:
72 msg = self._create_stream_end_message(test_obj._stream)
73 elif self._at >= 7:
74 raise bt2.Stop
75
76 self._at += 1
77 return msg
78
79 class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
80 def __init__(self, params):
81 self._add_output_port('out', params)
82
83 with_cc = bool(params['with_cc'])
84 tc = self._create_trace_class()
85 if with_cc:
86 cc = self._create_clock_class()
87 else:
88 cc = None
89
90 sc = tc.create_stream_class(default_clock_class=cc,
91 packets_have_beginning_default_clock_snapshot=with_cc,
92 packets_have_end_default_clock_snapshot=with_cc,
93 supports_discarded_events=True,
94 discarded_events_have_default_clock_snapshots=with_cc,
95 supports_discarded_packets=True,
96 discarded_packets_have_default_clock_snapshots=with_cc)
97
98 # Create payload field class
99 my_int_fc = tc.create_signed_integer_field_class(32)
100 payload_fc = tc.create_structure_field_class()
101 payload_fc += collections.OrderedDict([
102 ('my_int', my_int_fc),
103 ])
104
105 ec = sc.create_event_class(name='salut', payload_field_class=payload_fc)
106
107 trace = tc()
108 stream = trace.create_stream(sc)
109 packet = stream.create_packet()
110
111 test_obj._trace = trace
112 test_obj._stream = stream
113 test_obj._packet = packet
114 test_obj._event_class = ec
115 test_obj._clock_class = cc
116
117 test_obj = self
118 self._graph = bt2.Graph()
119 self._src = MySrc
120 self._iter = MyIter
121
122 def test_all_msg_with_cc(self):
123 params = {'with_cc': True}
124 self._src_comp = self._graph.add_component(self._src, 'my_source', params)
125 self._msg_iter = self._graph.create_output_port_message_iterator(self._src_comp.output_ports['out'])
126
127 for i, msg in enumerate(self._msg_iter):
128 if i == 0:
129 self.assertIsInstance(msg, bt2.message._StreamBeginningMessage)
130 self.assertEqual(msg.stream.addr, self._stream.addr)
131 self.assertIsInstance(msg.default_clock_snapshot, bt2.clock_snapshot._UnknownClockSnapshot)
132 elif i == 1:
133 self.assertIsInstance(msg, bt2.message._PacketBeginningMessage)
134 self.assertEqual(msg.packet.addr, self._packet.addr)
135 self.assertEqual(msg.default_clock_snapshot.value, i)
136 elif i == 2:
137 self.assertIsInstance(msg, bt2.message._EventMessage)
138 self.assertEqual(msg.event.cls.addr, self._event_class.addr)
139 self.assertEqual(msg.default_clock_snapshot.value, i)
140 elif i == 3:
141 self.assertIsInstance(msg, bt2.message._MessageIteratorInactivityMessage)
142 self.assertEqual(msg.default_clock_snapshot.value, i)
143 elif i == 4:
144 self.assertIsInstance(msg, bt2.message._DiscardedEventsMessage)
145 self.assertEqual(msg.stream.addr, self._stream.addr)
146 self.assertEqual(msg.count, 890)
147 self.assertEqual(msg.stream.cls.default_clock_class.addr, self._clock_class.addr)
148 self.assertEqual(msg.beginning_default_clock_snapshot.value, i)
149 self.assertEqual(msg.end_default_clock_snapshot.value, i)
150 elif i == 5:
151 self.assertIsInstance(msg, bt2.message._PacketEndMessage)
152 self.assertEqual(msg.packet.addr, self._packet.addr)
153 self.assertEqual(msg.default_clock_snapshot.value, i)
154 elif i == 6:
155 self.assertIsInstance(msg, bt2.message._DiscardedPacketsMessage)
156 self.assertEqual(msg.stream.addr, self._stream.addr)
157 self.assertEqual(msg.count, 678)
158 self.assertEqual(msg.stream.cls.default_clock_class.addr, self._clock_class.addr)
159 self.assertEqual(msg.beginning_default_clock_snapshot.value, i)
160 self.assertEqual(msg.end_default_clock_snapshot.value, i)
161 elif i == 7:
162 self.assertIsInstance(msg, bt2.message._StreamEndMessage)
163 self.assertEqual(msg.stream.addr, self._stream.addr)
164 self.assertIsInstance(msg.default_clock_snapshot, bt2.clock_snapshot._UnknownClockSnapshot)
165 else:
166 raise Exception
167
168 def test_all_msg_without_cc(self):
169 params = {'with_cc': False}
170 self._src_comp = self._graph.add_component(self._src, 'my_source', params)
171 self._msg_iter = self._graph.create_output_port_message_iterator(self._src_comp.output_ports['out'])
172
173 for i, msg in enumerate(self._msg_iter):
174 if i == 0:
175 self.assertIsInstance(msg, bt2.message._StreamBeginningMessage)
176 self.assertEqual(msg.stream.addr, self._stream.addr)
177 with self.assertRaises(bt2.NonexistentClockSnapshot):
178 msg.default_clock_snapshot
179 elif i == 1:
180 self.assertIsInstance(msg, bt2.message._PacketBeginningMessage)
181 self.assertEqual(msg.packet.addr, self._packet.addr)
182 elif i == 2:
183 self.assertIsInstance(msg, bt2.message._EventMessage)
184 self.assertEqual(msg.event.cls.addr, self._event_class.addr)
185 with self.assertRaises(bt2.NonexistentClockSnapshot):
186 msg.default_clock_snapshot
187 elif i == 3:
188 self.assertIsInstance(msg, bt2.message._DiscardedEventsMessage)
189 self.assertEqual(msg.stream.addr, self._stream.addr)
190 self.assertEqual(msg.count, 890)
191 self.assertIsNone(msg.stream.cls.default_clock_class)
192 with self.assertRaises(bt2.NonexistentClockSnapshot):
193 msg.beginning_default_clock_snapshot
194 with self.assertRaises(bt2.NonexistentClockSnapshot):
195 msg.end_default_clock_snapshot
196 elif i == 4:
197 self.assertIsInstance(msg, bt2.message._PacketEndMessage)
198 self.assertEqual(msg.packet.addr, self._packet.addr)
199 elif i == 5:
200 self.assertIsInstance(msg, bt2.message._DiscardedPacketsMessage)
201 self.assertEqual(msg.stream.addr, self._stream.addr)
202 self.assertEqual(msg.count, 678)
203 self.assertIsNone(msg.stream.cls.default_clock_class)
204 with self.assertRaises(bt2.NonexistentClockSnapshot):
205 msg.beginning_default_clock_snapshot
206 with self.assertRaises(bt2.NonexistentClockSnapshot):
207 msg.end_default_clock_snapshot
208 elif i == 6:
209 self.assertIsInstance(msg, bt2.message._StreamEndMessage)
210 self.assertEqual(msg.stream.addr, self._stream.addr)
211 with self.assertRaises(bt2.NonexistentClockSnapshot):
212 msg.default_clock_snapshot
213 else:
214 raise Exception
215
216 def test_msg_stream_with_clock_snapshots(self):
217 params = {
218 'with_cc': True,
219 'with_stream_msgs_clock_snapshots': True,
220 }
221
222 self._src_comp = self._graph.add_component(self._src, 'my_source', params)
223 self._msg_iter = self._graph.create_output_port_message_iterator(self._src_comp.output_ports['out'])
224 msgs = list(self._msg_iter)
225
226 msg_stream_beg = msgs[0]
227 self.assertIsInstance(msg_stream_beg, bt2.message._StreamBeginningMessage)
228 self.assertEqual(msg_stream_beg.default_clock_snapshot.value, 0)
229
230 msg_stream_end = msgs[7]
231 self.assertIsInstance(msg_stream_end, bt2.message._StreamEndMessage)
232 self.assertEqual(msg_stream_end.default_clock_snapshot.value, 7)
This page took 0.036236 seconds and 5 git commands to generate.