1 # SPDX-License-Identifier: GPL-2.0-only
3 # Copyright (C) 2019 EfficiOS Inc.
9 from utils
import TestOutputPortMessageIterator
10 from bt2
import clock_snapshot
as bt2_clock_snapshot
11 from bt2
import event
as bt2_event
12 from bt2
import event_class
as bt2_event_class
13 from bt2
import field
as bt2_field
14 from bt2
import packet
as bt2_packet
15 from bt2
import stream
as bt2_stream
16 from bt2
import stream_class
as bt2_stream_class
17 from bt2
import trace
as bt2_trace
18 from bt2
import trace_class
as bt2_trace_class
21 class AllMessagesTestCase(unittest
.TestCase
):
23 class MyIter(bt2
._UserMessageIterator
):
24 def __init__(self
, config
, self_port_output
):
26 self
._with
_stream
_msgs
_clock
_snapshots
= self_port_output
.user_data
.get(
27 'with_stream_msgs_clock_snapshots', False
31 if test_obj
._clock
_class
:
33 if self
._with
_stream
_msgs
_clock
_snapshots
:
34 msg
= self
._create
_stream
_beginning
_message
(
35 test_obj
._stream
, default_clock_snapshot
=self
._at
38 msg
= self
._create
_stream
_beginning
_message
(
41 test_obj
.assertIs(type(msg
), bt2
._StreamBeginningMessage
)
43 msg
= self
._create
_packet
_beginning
_message
(
44 test_obj
._packet
, self
._at
46 test_obj
.assertIs(type(msg
), bt2
._PacketBeginningMessage
)
48 msg
= self
._create
_event
_message
(
49 test_obj
._event
_class
, test_obj
._packet
, self
._at
51 test_obj
.assertIs(type(msg
), bt2
._EventMessage
)
53 msg
= self
._create
_message
_iterator
_inactivity
_message
(
54 test_obj
._clock
_class
, self
._at
57 msg
= self
._create
_discarded
_events
_message
(
58 test_obj
._stream
, 890, self
._at
, self
._at
60 test_obj
.assertIs(type(msg
), bt2
._DiscardedEventsMessage
)
62 msg
= self
._create
_packet
_end
_message
(
63 test_obj
._packet
, self
._at
65 test_obj
.assertIs(type(msg
), bt2
._PacketEndMessage
)
67 msg
= self
._create
_discarded
_packets
_message
(
68 test_obj
._stream
, 678, self
._at
, self
._at
70 test_obj
.assertIs(type(msg
), bt2
._DiscardedPacketsMessage
)
72 if self
._with
_stream
_msgs
_clock
_snapshots
:
73 msg
= self
._create
_stream
_end
_message
(
74 test_obj
._stream
, default_clock_snapshot
=self
._at
77 msg
= self
._create
_stream
_end
_message
(test_obj
._stream
)
78 test_obj
.assertIs(type(msg
), bt2
._StreamEndMessage
)
83 msg
= self
._create
_stream
_beginning
_message
(test_obj
._stream
)
85 msg
= self
._create
_packet
_beginning
_message
(test_obj
._packet
)
87 msg
= self
._create
_event
_message
(
88 test_obj
._event
_class
, test_obj
._packet
91 msg
= self
._create
_discarded
_events
_message
(
95 msg
= self
._create
_packet
_end
_message
(test_obj
._packet
)
97 msg
= self
._create
_discarded
_packets
_message
(
101 msg
= self
._create
_stream
_end
_message
(test_obj
._stream
)
108 class MySrc(bt2
._UserSourceComponent
, message_iterator_class
=MyIter
):
109 def __init__(self
, config
, params
, obj
):
110 self
._add
_output
_port
('out', params
)
112 with_cc
= bool(params
['with_cc'])
113 tc
= self
._create
_trace
_class
()
115 cc
= self
._create
_clock
_class
()
119 sc
= tc
.create_stream_class(
120 default_clock_class
=cc
,
121 supports_packets
=True,
122 packets_have_beginning_default_clock_snapshot
=with_cc
,
123 packets_have_end_default_clock_snapshot
=with_cc
,
124 supports_discarded_events
=True,
125 discarded_events_have_default_clock_snapshots
=with_cc
,
126 supports_discarded_packets
=True,
127 discarded_packets_have_default_clock_snapshots
=with_cc
,
130 # Create payload field class
131 my_int_fc
= tc
.create_signed_integer_field_class(32)
132 payload_fc
= tc
.create_structure_field_class()
133 payload_fc
+= [('my_int', my_int_fc
)]
135 # Create specific context field class
136 my_int_fc
= tc
.create_signed_integer_field_class(32)
137 specific_fc
= tc
.create_structure_field_class()
138 specific_fc
+= [('my_int', my_int_fc
)]
140 ec
= sc
.create_event_class(
142 payload_field_class
=payload_fc
,
143 specific_context_field_class
=specific_fc
,
147 stream
= trace
.create_stream(sc
)
148 packet
= stream
.create_packet()
150 test_obj
._trace
= trace
151 test_obj
._stream
= stream
152 test_obj
._packet
= packet
153 test_obj
._event
_class
= ec
154 test_obj
._clock
_class
= cc
157 self
._graph
= bt2
.Graph()
161 def test_all_msg_with_cc(self
):
162 params
= {'with_cc': True}
163 self
._src
_comp
= self
._graph
.add_component(self
._src
, 'my_source', params
)
164 self
._msg
_iter
= TestOutputPortMessageIterator(
165 self
._graph
, self
._src
_comp
.output_ports
['out']
168 for i
, msg
in enumerate(self
._msg
_iter
):
170 self
.assertIs(type(msg
), bt2
._StreamBeginningMessageConst
)
171 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
172 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
173 self
.assertIsInstance(
174 msg
.default_clock_snapshot
, bt2
._UnknownClockSnapshot
177 self
.assertIs(type(msg
), bt2
._PacketBeginningMessageConst
)
178 self
.assertIs(type(msg
.packet
), bt2_packet
._PacketConst
)
180 type(msg
.default_clock_snapshot
),
181 bt2_clock_snapshot
._ClockSnapshotConst
,
183 self
.assertEqual(msg
.packet
.addr
, self
._packet
.addr
)
184 self
.assertEqual(msg
.default_clock_snapshot
.value
, i
)
186 self
.assertIs(type(msg
), bt2
._EventMessageConst
)
187 self
.assertIs(type(msg
.event
), bt2_event
._EventConst
)
189 type(msg
.default_clock_snapshot
),
190 bt2_clock_snapshot
._ClockSnapshotConst
,
193 type(msg
.event
.payload_field
), bt2_field
._StructureFieldConst
196 type(msg
.event
.payload_field
['my_int']),
197 bt2_field
._SignedIntegerFieldConst
,
200 self
.assertEqual(msg
.event
.cls
.addr
, self
._event
_class
.addr
)
201 self
.assertEqual(msg
.default_clock_snapshot
.value
, i
)
203 self
.assertIs(type(msg
), bt2
._MessageIteratorInactivityMessageConst
)
205 type(msg
.clock_snapshot
), bt2_clock_snapshot
._ClockSnapshotConst
207 self
.assertEqual(msg
.clock_snapshot
.value
, i
)
209 self
.assertIs(type(msg
), bt2
._DiscardedEventsMessageConst
)
210 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
211 self
.assertIs(type(msg
.stream
.cls
), bt2_stream_class
._StreamClassConst
)
213 type(msg
.beginning_default_clock_snapshot
),
214 bt2_clock_snapshot
._ClockSnapshotConst
,
217 type(msg
.end_default_clock_snapshot
),
218 bt2_clock_snapshot
._ClockSnapshotConst
,
221 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
222 self
.assertEqual(msg
.count
, 890)
224 msg
.stream
.cls
.default_clock_class
.addr
, self
._clock
_class
.addr
226 self
.assertEqual(msg
.beginning_default_clock_snapshot
.value
, i
)
227 self
.assertEqual(msg
.end_default_clock_snapshot
.value
, i
)
229 self
.assertIs(type(msg
), bt2
._PacketEndMessageConst
)
230 self
.assertIs(type(msg
.packet
), bt2_packet
._PacketConst
)
232 type(msg
.default_clock_snapshot
),
233 bt2_clock_snapshot
._ClockSnapshotConst
,
235 self
.assertEqual(msg
.packet
.addr
, self
._packet
.addr
)
236 self
.assertEqual(msg
.default_clock_snapshot
.value
, i
)
238 self
.assertIs(type(msg
), bt2
._DiscardedPacketsMessageConst
)
239 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
240 self
.assertIs(type(msg
.stream
.trace
), bt2_trace
._TraceConst
)
242 type(msg
.stream
.trace
.cls
), bt2_trace_class
._TraceClassConst
245 type(msg
.beginning_default_clock_snapshot
),
246 bt2_clock_snapshot
._ClockSnapshotConst
,
249 type(msg
.end_default_clock_snapshot
),
250 bt2_clock_snapshot
._ClockSnapshotConst
,
252 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
253 self
.assertEqual(msg
.count
, 678)
255 msg
.stream
.cls
.default_clock_class
.addr
, self
._clock
_class
.addr
257 self
.assertEqual(msg
.beginning_default_clock_snapshot
.value
, i
)
258 self
.assertEqual(msg
.end_default_clock_snapshot
.value
, i
)
260 self
.assertIs(type(msg
), bt2
._StreamEndMessageConst
)
261 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
262 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
264 type(msg
.default_clock_snapshot
), bt2
._UnknownClockSnapshot
269 def test_all_msg_without_cc(self
):
270 params
= {'with_cc': False}
271 self
._src
_comp
= self
._graph
.add_component(self
._src
, 'my_source', params
)
272 self
._msg
_iter
= TestOutputPortMessageIterator(
273 self
._graph
, self
._src
_comp
.output_ports
['out']
276 for i
, msg
in enumerate(self
._msg
_iter
):
278 self
.assertIsInstance(msg
, bt2
._StreamBeginningMessageConst
)
279 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
280 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
281 with self
.assertRaisesRegex(
282 ValueError, 'stream class has no default clock class'
284 msg
.default_clock_snapshot
286 self
.assertIsInstance(msg
, bt2
._PacketBeginningMessageConst
)
287 self
.assertIs(type(msg
.packet
), bt2_packet
._PacketConst
)
288 self
.assertEqual(msg
.packet
.addr
, self
._packet
.addr
)
290 self
.assertIsInstance(msg
, bt2
._EventMessageConst
)
291 self
.assertIs(type(msg
.event
), bt2_event
._EventConst
)
292 self
.assertIs(type(msg
.event
.cls
), bt2_event_class
._EventClassConst
)
293 self
.assertEqual(msg
.event
.cls
.addr
, self
._event
_class
.addr
)
294 with self
.assertRaisesRegex(
295 ValueError, 'stream class has no default clock class'
297 msg
.default_clock_snapshot
299 self
.assertIsInstance(msg
, bt2
._DiscardedEventsMessageConst
)
300 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
301 self
.assertIs(type(msg
.stream
.cls
), bt2_stream_class
._StreamClassConst
)
302 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
303 self
.assertEqual(msg
.count
, 890)
304 self
.assertIsNone(msg
.stream
.cls
.default_clock_class
)
305 with self
.assertRaisesRegex(
307 'such a message has no clock snapshots for this stream class',
309 msg
.beginning_default_clock_snapshot
310 with self
.assertRaisesRegex(
312 'such a message has no clock snapshots for this stream class',
314 msg
.end_default_clock_snapshot
316 self
.assertIsInstance(msg
, bt2
._PacketEndMessageConst
)
317 self
.assertEqual(msg
.packet
.addr
, self
._packet
.addr
)
318 self
.assertIs(type(msg
.packet
), bt2_packet
._PacketConst
)
320 self
.assertIsInstance(msg
, bt2
._DiscardedPacketsMessageConst
)
321 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
322 self
.assertIs(type(msg
.stream
.cls
), bt2_stream_class
._StreamClassConst
)
324 type(msg
.stream
.cls
.trace_class
), bt2_trace_class
._TraceClassConst
326 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
327 self
.assertEqual(msg
.count
, 678)
328 self
.assertIsNone(msg
.stream
.cls
.default_clock_class
)
329 with self
.assertRaisesRegex(
331 'such a message has no clock snapshots for this stream class',
333 msg
.beginning_default_clock_snapshot
334 with self
.assertRaisesRegex(
336 'such a message has no clock snapshots for this stream class',
338 msg
.end_default_clock_snapshot
340 self
.assertIsInstance(msg
, bt2
._StreamEndMessageConst
)
341 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
342 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
343 with self
.assertRaisesRegex(
344 ValueError, 'stream class has no default clock class'
346 msg
.default_clock_snapshot
350 def test_msg_stream_with_clock_snapshots(self
):
351 params
= {'with_cc': True, 'with_stream_msgs_clock_snapshots': True}
353 self
._src
_comp
= self
._graph
.add_component(self
._src
, 'my_source', params
)
354 self
._msg
_iter
= TestOutputPortMessageIterator(
355 self
._graph
, self
._src
_comp
.output_ports
['out']
357 msgs
= list(self
._msg
_iter
)
359 msg_stream_beg
= msgs
[0]
360 self
.assertIsInstance(msg_stream_beg
, bt2
._StreamBeginningMessageConst
)
362 type(msg_stream_beg
.default_clock_snapshot
),
363 bt2_clock_snapshot
._ClockSnapshotConst
,
365 self
.assertEqual(msg_stream_beg
.default_clock_snapshot
.value
, 0)
367 msg_stream_end
= msgs
[7]
368 self
.assertIsInstance(msg_stream_end
, bt2
._StreamEndMessageConst
)
370 type(msg_stream_end
.default_clock_snapshot
),
371 bt2_clock_snapshot
._ClockSnapshotConst
,
373 self
.assertEqual(msg_stream_end
.default_clock_snapshot
.value
, 7)
375 def test_stream_beg_msg(self
):
376 msg
= utils
.get_stream_beginning_message()
377 self
.assertIs(type(msg
.stream
), bt2_stream
._Stream
)
379 def test_stream_end_msg(self
):
380 msg
= utils
.get_stream_end_message()
381 self
.assertIs(type(msg
.stream
), bt2_stream
._Stream
)
383 def test_packet_beg_msg(self
):
384 msg
= utils
.get_packet_beginning_message()
385 self
.assertIs(type(msg
.packet
), bt2_packet
._Packet
)
387 def test_packet_end_msg(self
):
388 msg
= utils
.get_packet_end_message()
389 self
.assertIs(type(msg
.packet
), bt2_packet
._Packet
)
391 def test_event_msg(self
):
392 msg
= utils
.get_event_message()
393 self
.assertIs(type(msg
.event
), bt2_event
._Event
)
396 class CreateDiscardedEventMessageTestCase(unittest
.TestCase
):
398 def test_create(self
):
399 def create_stream_class(tc
, cc
):
400 return tc
.create_stream_class(supports_discarded_events
=True)
402 def msg_iter_next(msg_iter
, stream
):
403 return msg_iter
._create
_discarded
_events
_message
(stream
)
405 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
406 self
.assertIs(type(msg
), bt2
._DiscardedEventsMessage
)
407 # Broken at the moment.
408 # self.assertIs(msg.count, None)
411 def test_create_with_count(self
):
412 def create_stream_class(tc
, cc
):
413 return tc
.create_stream_class(supports_discarded_events
=True)
415 def msg_iter_next(msg_iter
, stream
):
416 return msg_iter
._create
_discarded
_events
_message
(stream
, count
=242)
418 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
419 self
.assertIs(type(msg
), bt2
._DiscardedEventsMessage
)
420 # Broken at the moment.
421 # self.assertEqual(msg.count, 242)
423 # With clock snapshots.
424 def test_create_with_clock_snapshots(self
):
425 def create_stream_class(tc
, cc
):
426 return tc
.create_stream_class(
427 default_clock_class
=cc
,
428 supports_discarded_events
=True,
429 discarded_events_have_default_clock_snapshots
=True,
432 def msg_iter_next(msg_iter
, stream
):
433 return msg_iter
._create
_discarded
_events
_message
(
434 stream
, beg_clock_snapshot
=10, end_clock_snapshot
=20
437 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
438 self
.assertIs(type(msg
), bt2
._DiscardedEventsMessage
)
439 # Broken at the moment.
440 # self.assertEqual(msg.beginning_default_clock_snapshot, 10);
441 # self.assertEqual(msg.end_default_clock_snapshot, 20);
443 # Trying to create when the stream does not support discarded events.
444 def test_create_unsupported_raises(self
):
445 def create_stream_class(tc
, cc
):
446 return tc
.create_stream_class()
448 def msg_iter_next(msg_iter
, stream
):
449 with self
.assertRaisesRegex(
450 ValueError, 'stream class does not support discarded events'
452 msg_iter
._create
_discarded
_events
_message
(stream
)
456 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
457 self
.assertEqual(res
, 123)
459 # Trying to create with clock snapshots when the stream does not support
461 def test_create_unsupported_clock_snapshots_raises(self
):
462 def create_stream_class(tc
, cc
):
463 return tc
.create_stream_class(supports_discarded_events
=True)
465 def msg_iter_next(msg_iter
, stream
):
466 with self
.assertRaisesRegex(
468 'discarded events have no default clock snapshots for this stream class',
470 msg_iter
._create
_discarded
_events
_message
(
471 stream
, beg_clock_snapshot
=10, end_clock_snapshot
=20
476 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
477 self
.assertEqual(res
, 123)
479 # Trying to create without clock snapshots when the stream requires them.
480 def test_create_missing_clock_snapshots_raises(self
):
481 def create_stream_class(tc
, cc
):
482 return tc
.create_stream_class(
483 default_clock_class
=cc
,
484 supports_discarded_events
=True,
485 discarded_events_have_default_clock_snapshots
=True,
488 def msg_iter_next(msg_iter
, stream
):
489 with self
.assertRaisesRegex(
491 'discarded events have default clock snapshots for this stream class',
493 msg_iter
._create
_discarded
_events
_message
(stream
)
497 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
498 self
.assertEqual(res
, 123)
501 class CreateDiscardedPacketMessageTestCase(unittest
.TestCase
):
503 def test_create(self
):
504 def create_stream_class(tc
, cc
):
505 return tc
.create_stream_class(
506 supports_packets
=True, supports_discarded_packets
=True
509 def msg_iter_next(msg_iter
, stream
):
510 return msg_iter
._create
_discarded
_packets
_message
(stream
)
512 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
513 self
.assertIs(type(msg
), bt2
._DiscardedPacketsMessage
)
514 self
.assertIs(msg
.count
, None)
517 def test_create_with_count(self
):
518 def create_stream_class(tc
, cc
):
519 return tc
.create_stream_class(
520 supports_packets
=True, supports_discarded_packets
=True
523 def msg_iter_next(msg_iter
, stream
):
524 return msg_iter
._create
_discarded
_packets
_message
(stream
, count
=242)
526 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
527 self
.assertIs(type(msg
), bt2
._DiscardedPacketsMessage
)
528 self
.assertEqual(msg
.count
, 242)
530 # With clock snapshots.
531 def test_create_with_clock_snapshots(self
):
532 def create_stream_class(tc
, cc
):
533 return tc
.create_stream_class(
534 default_clock_class
=cc
,
535 supports_packets
=True,
536 supports_discarded_packets
=True,
537 discarded_packets_have_default_clock_snapshots
=True,
540 def msg_iter_next(msg_iter
, stream
):
541 return msg_iter
._create
_discarded
_packets
_message
(
542 stream
, beg_clock_snapshot
=10, end_clock_snapshot
=20
545 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
546 self
.assertIs(type(msg
), bt2
._DiscardedPacketsMessage
)
547 self
.assertEqual(msg
.beginning_default_clock_snapshot
, 10)
548 self
.assertEqual(msg
.end_default_clock_snapshot
, 20)
550 # Trying to create when the stream does not support discarded packets.
551 def test_create_unsupported_raises(self
):
552 def create_stream_class(tc
, cc
):
553 return tc
.create_stream_class(supports_packets
=True,)
555 def msg_iter_next(msg_iter
, stream
):
556 with self
.assertRaisesRegex(
557 ValueError, 'stream class does not support discarded packets'
559 msg_iter
._create
_discarded
_packets
_message
(stream
)
563 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
564 self
.assertEqual(res
, 123)
566 # Trying to create with clock snapshots when the stream does not support
568 def test_create_unsupported_clock_snapshots_raises(self
):
569 def create_stream_class(tc
, cc
):
570 return tc
.create_stream_class(
571 supports_packets
=True, supports_discarded_packets
=True
574 def msg_iter_next(msg_iter
, stream
):
575 with self
.assertRaisesRegex(
577 'discarded packets have no default clock snapshots for this stream class',
579 msg_iter
._create
_discarded
_packets
_message
(
580 stream
, beg_clock_snapshot
=10, end_clock_snapshot
=20
585 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
586 self
.assertEqual(res
, 123)
588 # Trying to create without clock snapshots when the stream requires them.
589 def test_create_missing_clock_snapshots_raises(self
):
590 def create_stream_class(tc
, cc
):
591 return tc
.create_stream_class(
592 default_clock_class
=cc
,
593 supports_packets
=True,
594 supports_discarded_packets
=True,
595 discarded_packets_have_default_clock_snapshots
=True,
598 def msg_iter_next(msg_iter
, stream
):
599 with self
.assertRaisesRegex(
601 'discarded packets have default clock snapshots for this stream class',
603 msg_iter
._create
_discarded
_packets
_message
(stream
)
607 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
608 self
.assertEqual(res
, 123)
611 if __name__
== '__main__':