2 # Copyright (C) 2019 EfficiOS Inc.
4 # This program is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License
6 # as published by the Free Software Foundation; only version 2
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
22 from utils
import TestOutputPortMessageIterator
23 from bt2
import clock_snapshot
as bt2_clock_snapshot
24 from bt2
import event
as bt2_event
25 from bt2
import event_class
as bt2_event_class
26 from bt2
import field
as bt2_field
27 from bt2
import packet
as bt2_packet
28 from bt2
import stream
as bt2_stream
29 from bt2
import stream_class
as bt2_stream_class
30 from bt2
import trace
as bt2_trace
31 from bt2
import trace_class
as bt2_trace_class
34 class AllMessagesTestCase(unittest
.TestCase
):
36 class MyIter(bt2
._UserMessageIterator
):
37 def __init__(self
, config
, self_port_output
):
39 self
._with
_stream
_msgs
_clock
_snapshots
= self_port_output
.user_data
.get(
40 'with_stream_msgs_clock_snapshots', False
44 if test_obj
._clock
_class
:
46 if self
._with
_stream
_msgs
_clock
_snapshots
:
47 msg
= self
._create
_stream
_beginning
_message
(
48 test_obj
._stream
, default_clock_snapshot
=self
._at
51 msg
= self
._create
_stream
_beginning
_message
(
54 test_obj
.assertIs(type(msg
), bt2
._StreamBeginningMessage
)
56 msg
= self
._create
_packet
_beginning
_message
(
57 test_obj
._packet
, self
._at
59 test_obj
.assertIs(type(msg
), bt2
._PacketBeginningMessage
)
61 msg
= self
._create
_event
_message
(
62 test_obj
._event
_class
, test_obj
._packet
, self
._at
64 test_obj
.assertIs(type(msg
), bt2
._EventMessage
)
66 msg
= self
._create
_message
_iterator
_inactivity
_message
(
67 test_obj
._clock
_class
, self
._at
70 msg
= self
._create
_discarded
_events
_message
(
71 test_obj
._stream
, 890, self
._at
, self
._at
73 test_obj
.assertIs(type(msg
), bt2
._DiscardedEventsMessage
)
75 msg
= self
._create
_packet
_end
_message
(
76 test_obj
._packet
, self
._at
78 test_obj
.assertIs(type(msg
), bt2
._PacketEndMessage
)
80 msg
= self
._create
_discarded
_packets
_message
(
81 test_obj
._stream
, 678, self
._at
, self
._at
83 test_obj
.assertIs(type(msg
), bt2
._DiscardedPacketsMessage
)
85 if self
._with
_stream
_msgs
_clock
_snapshots
:
86 msg
= self
._create
_stream
_end
_message
(
87 test_obj
._stream
, default_clock_snapshot
=self
._at
90 msg
= self
._create
_stream
_end
_message
(test_obj
._stream
)
91 test_obj
.assertIs(type(msg
), bt2
._StreamEndMessage
)
96 msg
= self
._create
_stream
_beginning
_message
(test_obj
._stream
)
98 msg
= self
._create
_packet
_beginning
_message
(test_obj
._packet
)
100 msg
= self
._create
_event
_message
(
101 test_obj
._event
_class
, test_obj
._packet
104 msg
= self
._create
_discarded
_events
_message
(
105 test_obj
._stream
, 890
108 msg
= self
._create
_packet
_end
_message
(test_obj
._packet
)
110 msg
= self
._create
_discarded
_packets
_message
(
111 test_obj
._stream
, 678
114 msg
= self
._create
_stream
_end
_message
(test_obj
._stream
)
121 class MySrc(bt2
._UserSourceComponent
, message_iterator_class
=MyIter
):
122 def __init__(self
, config
, params
, obj
):
123 self
._add
_output
_port
('out', params
)
125 with_cc
= bool(params
['with_cc'])
126 tc
= self
._create
_trace
_class
()
128 cc
= self
._create
_clock
_class
()
132 sc
= tc
.create_stream_class(
133 default_clock_class
=cc
,
134 supports_packets
=True,
135 packets_have_beginning_default_clock_snapshot
=with_cc
,
136 packets_have_end_default_clock_snapshot
=with_cc
,
137 supports_discarded_events
=True,
138 discarded_events_have_default_clock_snapshots
=with_cc
,
139 supports_discarded_packets
=True,
140 discarded_packets_have_default_clock_snapshots
=with_cc
,
143 # Create payload field class
144 my_int_fc
= tc
.create_signed_integer_field_class(32)
145 payload_fc
= tc
.create_structure_field_class()
146 payload_fc
+= [('my_int', my_int_fc
)]
148 # Create specific context field class
149 my_int_fc
= tc
.create_signed_integer_field_class(32)
150 specific_fc
= tc
.create_structure_field_class()
151 specific_fc
+= [('my_int', my_int_fc
)]
153 ec
= sc
.create_event_class(
155 payload_field_class
=payload_fc
,
156 specific_context_field_class
=specific_fc
,
160 stream
= trace
.create_stream(sc
)
161 packet
= stream
.create_packet()
163 test_obj
._trace
= trace
164 test_obj
._stream
= stream
165 test_obj
._packet
= packet
166 test_obj
._event
_class
= ec
167 test_obj
._clock
_class
= cc
170 self
._graph
= bt2
.Graph()
174 def test_all_msg_with_cc(self
):
175 params
= {'with_cc': True}
176 self
._src
_comp
= self
._graph
.add_component(self
._src
, 'my_source', params
)
177 self
._msg
_iter
= TestOutputPortMessageIterator(
178 self
._graph
, self
._src
_comp
.output_ports
['out']
181 for i
, msg
in enumerate(self
._msg
_iter
):
183 self
.assertIs(type(msg
), bt2
._StreamBeginningMessageConst
)
184 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
185 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
186 self
.assertIsInstance(
187 msg
.default_clock_snapshot
, bt2
._UnknownClockSnapshot
190 self
.assertIs(type(msg
), bt2
._PacketBeginningMessageConst
)
191 self
.assertIs(type(msg
.packet
), bt2_packet
._PacketConst
)
193 type(msg
.default_clock_snapshot
),
194 bt2_clock_snapshot
._ClockSnapshotConst
,
196 self
.assertEqual(msg
.packet
.addr
, self
._packet
.addr
)
197 self
.assertEqual(msg
.default_clock_snapshot
.value
, i
)
199 self
.assertIs(type(msg
), bt2
._EventMessageConst
)
200 self
.assertIs(type(msg
.event
), bt2_event
._EventConst
)
202 type(msg
.default_clock_snapshot
),
203 bt2_clock_snapshot
._ClockSnapshotConst
,
206 type(msg
.event
.payload_field
), bt2_field
._StructureFieldConst
209 type(msg
.event
.payload_field
['my_int']),
210 bt2_field
._SignedIntegerFieldConst
,
213 self
.assertEqual(msg
.event
.cls
.addr
, self
._event
_class
.addr
)
214 self
.assertEqual(msg
.default_clock_snapshot
.value
, i
)
216 self
.assertIs(type(msg
), bt2
._MessageIteratorInactivityMessageConst
)
218 type(msg
.clock_snapshot
), bt2_clock_snapshot
._ClockSnapshotConst
220 self
.assertEqual(msg
.clock_snapshot
.value
, i
)
222 self
.assertIs(type(msg
), bt2
._DiscardedEventsMessageConst
)
223 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
224 self
.assertIs(type(msg
.stream
.cls
), bt2_stream_class
._StreamClassConst
)
226 type(msg
.beginning_default_clock_snapshot
),
227 bt2_clock_snapshot
._ClockSnapshotConst
,
230 type(msg
.end_default_clock_snapshot
),
231 bt2_clock_snapshot
._ClockSnapshotConst
,
234 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
235 self
.assertEqual(msg
.count
, 890)
237 msg
.stream
.cls
.default_clock_class
.addr
, self
._clock
_class
.addr
239 self
.assertEqual(msg
.beginning_default_clock_snapshot
.value
, i
)
240 self
.assertEqual(msg
.end_default_clock_snapshot
.value
, i
)
242 self
.assertIs(type(msg
), bt2
._PacketEndMessageConst
)
243 self
.assertIs(type(msg
.packet
), bt2_packet
._PacketConst
)
245 type(msg
.default_clock_snapshot
),
246 bt2_clock_snapshot
._ClockSnapshotConst
,
248 self
.assertEqual(msg
.packet
.addr
, self
._packet
.addr
)
249 self
.assertEqual(msg
.default_clock_snapshot
.value
, i
)
251 self
.assertIs(type(msg
), bt2
._DiscardedPacketsMessageConst
)
252 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
253 self
.assertIs(type(msg
.stream
.trace
), bt2_trace
._TraceConst
)
255 type(msg
.stream
.trace
.cls
), bt2_trace_class
._TraceClassConst
258 type(msg
.beginning_default_clock_snapshot
),
259 bt2_clock_snapshot
._ClockSnapshotConst
,
262 type(msg
.end_default_clock_snapshot
),
263 bt2_clock_snapshot
._ClockSnapshotConst
,
265 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
266 self
.assertEqual(msg
.count
, 678)
268 msg
.stream
.cls
.default_clock_class
.addr
, self
._clock
_class
.addr
270 self
.assertEqual(msg
.beginning_default_clock_snapshot
.value
, i
)
271 self
.assertEqual(msg
.end_default_clock_snapshot
.value
, i
)
273 self
.assertIs(type(msg
), bt2
._StreamEndMessageConst
)
274 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
275 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
277 type(msg
.default_clock_snapshot
), bt2
._UnknownClockSnapshot
282 def test_all_msg_without_cc(self
):
283 params
= {'with_cc': False}
284 self
._src
_comp
= self
._graph
.add_component(self
._src
, 'my_source', params
)
285 self
._msg
_iter
= TestOutputPortMessageIterator(
286 self
._graph
, self
._src
_comp
.output_ports
['out']
289 for i
, msg
in enumerate(self
._msg
_iter
):
291 self
.assertIsInstance(msg
, bt2
._StreamBeginningMessageConst
)
292 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
293 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
294 with self
.assertRaisesRegex(
295 ValueError, 'stream class has no default clock class'
297 msg
.default_clock_snapshot
299 self
.assertIsInstance(msg
, bt2
._PacketBeginningMessageConst
)
300 self
.assertIs(type(msg
.packet
), bt2_packet
._PacketConst
)
301 self
.assertEqual(msg
.packet
.addr
, self
._packet
.addr
)
303 self
.assertIsInstance(msg
, bt2
._EventMessageConst
)
304 self
.assertIs(type(msg
.event
), bt2_event
._EventConst
)
305 self
.assertIs(type(msg
.event
.cls
), bt2_event_class
._EventClassConst
)
306 self
.assertEqual(msg
.event
.cls
.addr
, self
._event
_class
.addr
)
307 with self
.assertRaisesRegex(
308 ValueError, 'stream class has no default clock class'
310 msg
.default_clock_snapshot
312 self
.assertIsInstance(msg
, bt2
._DiscardedEventsMessageConst
)
313 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
314 self
.assertIs(type(msg
.stream
.cls
), bt2_stream_class
._StreamClassConst
)
315 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
316 self
.assertEqual(msg
.count
, 890)
317 self
.assertIsNone(msg
.stream
.cls
.default_clock_class
)
318 with self
.assertRaisesRegex(
320 'such a message has no clock snapshots for this stream class',
322 msg
.beginning_default_clock_snapshot
323 with self
.assertRaisesRegex(
325 'such a message has no clock snapshots for this stream class',
327 msg
.end_default_clock_snapshot
329 self
.assertIsInstance(msg
, bt2
._PacketEndMessageConst
)
330 self
.assertEqual(msg
.packet
.addr
, self
._packet
.addr
)
331 self
.assertIs(type(msg
.packet
), bt2_packet
._PacketConst
)
333 self
.assertIsInstance(msg
, bt2
._DiscardedPacketsMessageConst
)
334 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
335 self
.assertIs(type(msg
.stream
.cls
), bt2_stream_class
._StreamClassConst
)
337 type(msg
.stream
.cls
.trace_class
), bt2_trace_class
._TraceClassConst
339 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
340 self
.assertEqual(msg
.count
, 678)
341 self
.assertIsNone(msg
.stream
.cls
.default_clock_class
)
342 with self
.assertRaisesRegex(
344 'such a message has no clock snapshots for this stream class',
346 msg
.beginning_default_clock_snapshot
347 with self
.assertRaisesRegex(
349 'such a message has no clock snapshots for this stream class',
351 msg
.end_default_clock_snapshot
353 self
.assertIsInstance(msg
, bt2
._StreamEndMessageConst
)
354 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
355 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
356 with self
.assertRaisesRegex(
357 ValueError, 'stream class has no default clock class'
359 msg
.default_clock_snapshot
363 def test_msg_stream_with_clock_snapshots(self
):
364 params
= {'with_cc': True, 'with_stream_msgs_clock_snapshots': True}
366 self
._src
_comp
= self
._graph
.add_component(self
._src
, 'my_source', params
)
367 self
._msg
_iter
= TestOutputPortMessageIterator(
368 self
._graph
, self
._src
_comp
.output_ports
['out']
370 msgs
= list(self
._msg
_iter
)
372 msg_stream_beg
= msgs
[0]
373 self
.assertIsInstance(msg_stream_beg
, bt2
._StreamBeginningMessageConst
)
375 type(msg_stream_beg
.default_clock_snapshot
),
376 bt2_clock_snapshot
._ClockSnapshotConst
,
378 self
.assertEqual(msg_stream_beg
.default_clock_snapshot
.value
, 0)
380 msg_stream_end
= msgs
[7]
381 self
.assertIsInstance(msg_stream_end
, bt2
._StreamEndMessageConst
)
383 type(msg_stream_end
.default_clock_snapshot
),
384 bt2_clock_snapshot
._ClockSnapshotConst
,
386 self
.assertEqual(msg_stream_end
.default_clock_snapshot
.value
, 7)
388 def test_stream_beg_msg(self
):
389 msg
= utils
.get_stream_beginning_message()
390 self
.assertIs(type(msg
.stream
), bt2_stream
._Stream
)
392 def test_stream_end_msg(self
):
393 msg
= utils
.get_stream_end_message()
394 self
.assertIs(type(msg
.stream
), bt2_stream
._Stream
)
396 def test_packet_beg_msg(self
):
397 msg
= utils
.get_packet_beginning_message()
398 self
.assertIs(type(msg
.packet
), bt2_packet
._Packet
)
400 def test_packet_end_msg(self
):
401 msg
= utils
.get_packet_end_message()
402 self
.assertIs(type(msg
.packet
), bt2_packet
._Packet
)
404 def test_event_msg(self
):
405 msg
= utils
.get_event_message()
406 self
.assertIs(type(msg
.event
), bt2_event
._Event
)
409 class CreateDiscardedEventMessageTestCase(unittest
.TestCase
):
411 def test_create(self
):
412 def create_stream_class(tc
, cc
):
413 return tc
.create_stream_class(supports_discarded_events
=True)
415 def msg_iter_next(msg_iter
, stream
):
416 return msg_iter
._create
_discarded
_events
_message
(stream
)
418 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
419 self
.assertIs(type(msg
), bt2
._DiscardedEventsMessage
)
420 self
.assertIs(msg
.count
, None)
423 def test_create_with_count(self
):
424 def create_stream_class(tc
, cc
):
425 return tc
.create_stream_class(supports_discarded_events
=True)
427 def msg_iter_next(msg_iter
, stream
):
428 return msg_iter
._create
_discarded
_events
_message
(stream
, count
=242)
430 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
431 self
.assertIs(type(msg
), bt2
._DiscardedEventsMessage
)
432 self
.assertEqual(msg
.count
, 242)
434 # With clock snapshots.
435 def test_create_with_clock_snapshots(self
):
436 def create_stream_class(tc
, cc
):
437 return tc
.create_stream_class(
438 default_clock_class
=cc
,
439 supports_discarded_events
=True,
440 discarded_events_have_default_clock_snapshots
=True,
443 def msg_iter_next(msg_iter
, stream
):
444 return msg_iter
._create
_discarded
_events
_message
(
445 stream
, beg_clock_snapshot
=10, end_clock_snapshot
=20
448 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
449 self
.assertIs(type(msg
), bt2
._DiscardedEventsMessage
)
450 self
.assertEqual(msg
.beginning_default_clock_snapshot
, 10)
451 self
.assertEqual(msg
.end_default_clock_snapshot
, 20)
453 # Trying to create when the stream does not support discarded events.
454 def test_create_unsupported_raises(self
):
455 def create_stream_class(tc
, cc
):
456 return tc
.create_stream_class()
458 def msg_iter_next(msg_iter
, stream
):
459 with self
.assertRaisesRegex(
460 ValueError, 'stream class does not support discarded events'
462 msg_iter
._create
_discarded
_events
_message
(stream
)
466 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
467 self
.assertEqual(res
, 123)
469 # Trying to create with clock snapshots when the stream does not support
471 def test_create_unsupported_clock_snapshots_raises(self
):
472 def create_stream_class(tc
, cc
):
473 return tc
.create_stream_class(supports_discarded_events
=True)
475 def msg_iter_next(msg_iter
, stream
):
476 with self
.assertRaisesRegex(
478 'discarded events have no default clock snapshots for this stream class',
480 msg_iter
._create
_discarded
_events
_message
(
481 stream
, beg_clock_snapshot
=10, end_clock_snapshot
=20
486 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
487 self
.assertEqual(res
, 123)
489 # Trying to create without clock snapshots when the stream requires them.
490 def test_create_missing_clock_snapshots_raises(self
):
491 def create_stream_class(tc
, cc
):
492 return tc
.create_stream_class(
493 default_clock_class
=cc
,
494 supports_discarded_events
=True,
495 discarded_events_have_default_clock_snapshots
=True,
498 def msg_iter_next(msg_iter
, stream
):
499 with self
.assertRaisesRegex(
501 'discarded events have default clock snapshots for this stream class',
503 msg_iter
._create
_discarded
_events
_message
(stream
)
507 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
508 self
.assertEqual(res
, 123)
511 class CreateDiscardedPacketMessageTestCase(unittest
.TestCase
):
513 def test_create(self
):
514 def create_stream_class(tc
, cc
):
515 return tc
.create_stream_class(
516 supports_packets
=True, supports_discarded_packets
=True
519 def msg_iter_next(msg_iter
, stream
):
520 return msg_iter
._create
_discarded
_packets
_message
(stream
)
522 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
523 self
.assertIs(type(msg
), bt2
._DiscardedPacketsMessage
)
524 self
.assertIs(msg
.count
, None)
527 def test_create_with_count(self
):
528 def create_stream_class(tc
, cc
):
529 return tc
.create_stream_class(
530 supports_packets
=True, supports_discarded_packets
=True
533 def msg_iter_next(msg_iter
, stream
):
534 return msg_iter
._create
_discarded
_packets
_message
(stream
, count
=242)
536 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
537 self
.assertIs(type(msg
), bt2
._DiscardedPacketsMessage
)
538 self
.assertEqual(msg
.count
, 242)
540 # With clock snapshots.
541 def test_create_with_clock_snapshots(self
):
542 def create_stream_class(tc
, cc
):
543 return tc
.create_stream_class(
544 default_clock_class
=cc
,
545 supports_packets
=True,
546 supports_discarded_packets
=True,
547 discarded_packets_have_default_clock_snapshots
=True,
550 def msg_iter_next(msg_iter
, stream
):
551 return msg_iter
._create
_discarded
_packets
_message
(
552 stream
, beg_clock_snapshot
=10, end_clock_snapshot
=20
555 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
556 self
.assertIs(type(msg
), bt2
._DiscardedPacketsMessage
)
557 self
.assertEqual(msg
.beginning_default_clock_snapshot
, 10)
558 self
.assertEqual(msg
.end_default_clock_snapshot
, 20)
560 # Trying to create when the stream does not support discarded packets.
561 def test_create_unsupported_raises(self
):
562 def create_stream_class(tc
, cc
):
563 return tc
.create_stream_class(
564 supports_packets
=True,
567 def msg_iter_next(msg_iter
, stream
):
568 with self
.assertRaisesRegex(
569 ValueError, 'stream class does not support discarded packets'
571 msg_iter
._create
_discarded
_packets
_message
(stream
)
575 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
576 self
.assertEqual(res
, 123)
578 # Trying to create with clock snapshots when the stream does not support
580 def test_create_unsupported_clock_snapshots_raises(self
):
581 def create_stream_class(tc
, cc
):
582 return tc
.create_stream_class(
583 supports_packets
=True, supports_discarded_packets
=True
586 def msg_iter_next(msg_iter
, stream
):
587 with self
.assertRaisesRegex(
589 'discarded packets have no default clock snapshots for this stream class',
591 msg_iter
._create
_discarded
_packets
_message
(
592 stream
, beg_clock_snapshot
=10, end_clock_snapshot
=20
597 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
598 self
.assertEqual(res
, 123)
600 # Trying to create without clock snapshots when the stream requires them.
601 def test_create_missing_clock_snapshots_raises(self
):
602 def create_stream_class(tc
, cc
):
603 return tc
.create_stream_class(
604 default_clock_class
=cc
,
605 supports_packets
=True,
606 supports_discarded_packets
=True,
607 discarded_packets_have_default_clock_snapshots
=True,
610 def msg_iter_next(msg_iter
, stream
):
611 with self
.assertRaisesRegex(
613 'discarded packets have default clock snapshots for this stream class',
615 msg_iter
._create
_discarded
_packets
_message
(stream
)
619 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
620 self
.assertEqual(res
, 123)
623 if __name__
== '__main__':