+# SPDX-License-Identifier: GPL-2.0-only
#
# Copyright (C) 2019 EfficiOS Inc.
#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; only version 2
-# of the License.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
-#
import unittest
import bt2
class AllMessagesTestCase(unittest.TestCase):
def setUp(self):
class MyIter(bt2._UserMessageIterator):
- def __init__(self, self_port_output):
+ def __init__(self, config, self_port_output):
self._at = 0
self._with_stream_msgs_clock_snapshots = self_port_output.user_data.get(
'with_stream_msgs_clock_snapshots', False
return msg
class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params, obj):
+ def __init__(self, config, params, obj):
self._add_output_port('out', params)
with_cc = bool(params['with_cc'])
elif i == 3:
self.assertIs(type(msg), bt2._MessageIteratorInactivityMessageConst)
self.assertIs(
- type(msg.default_clock_snapshot),
- bt2_clock_snapshot._ClockSnapshotConst,
+ type(msg.clock_snapshot), bt2_clock_snapshot._ClockSnapshotConst
)
- self.assertEqual(msg.default_clock_snapshot.value, i)
+ self.assertEqual(msg.clock_snapshot.value, i)
elif i == 4:
self.assertIs(type(msg), bt2._DiscardedEventsMessageConst)
self.assertIs(type(msg.stream), bt2_stream._StreamConst)
self.assertIs(type(msg.event), bt2_event._Event)
+class CreateDiscardedEventMessageTestCase(unittest.TestCase):
+ # Most basic case.
+ def test_create(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(supports_discarded_events=True)
+
+ def msg_iter_next(msg_iter, stream):
+ return msg_iter._create_discarded_events_message(stream)
+
+ msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertIs(type(msg), bt2._DiscardedEventsMessage)
+ self.assertIs(msg.count, None)
+
+ # With event count.
+ def test_create_with_count(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(supports_discarded_events=True)
+
+ def msg_iter_next(msg_iter, stream):
+ return msg_iter._create_discarded_events_message(stream, count=242)
+
+ msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertIs(type(msg), bt2._DiscardedEventsMessage)
+ self.assertEqual(msg.count, 242)
+
+ # With event count == 0.
+ def test_create_with_count_zero_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(supports_discarded_events=True)
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError, 'discarded event count is 0',
+ ):
+ msg_iter._create_discarded_events_message(stream, count=0)
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+ # With clock snapshots.
+ def test_create_with_clock_snapshots(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ default_clock_class=cc,
+ supports_discarded_events=True,
+ discarded_events_have_default_clock_snapshots=True,
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ return msg_iter._create_discarded_events_message(
+ stream, beg_clock_snapshot=10, end_clock_snapshot=20
+ )
+
+ msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertIs(type(msg), bt2._DiscardedEventsMessage)
+ self.assertEqual(msg.beginning_default_clock_snapshot, 10)
+ self.assertEqual(msg.end_default_clock_snapshot, 20)
+
+ # Trying to create when the stream does not support discarded events.
+ def test_create_unsupported_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class()
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError, 'stream class does not support discarded events'
+ ):
+ msg_iter._create_discarded_events_message(stream)
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+ # Trying to create with clock snapshots when the stream does not support
+ # them.
+ def test_create_unsupported_clock_snapshots_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(supports_discarded_events=True)
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError,
+ 'discarded events have no default clock snapshots for this stream class',
+ ):
+ msg_iter._create_discarded_events_message(
+ stream, beg_clock_snapshot=10, end_clock_snapshot=20
+ )
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+ # Trying to create without clock snapshots when the stream requires them.
+ def test_create_missing_clock_snapshots_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ default_clock_class=cc,
+ supports_discarded_events=True,
+ discarded_events_have_default_clock_snapshots=True,
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError,
+ 'discarded events have default clock snapshots for this stream class',
+ ):
+ msg_iter._create_discarded_events_message(stream)
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+ # End clock snapshot greater than beginning clock snapshot.
+ def test_create_clock_snapshots_end_gt_begin_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ default_clock_class=cc,
+ supports_discarded_events=True,
+ discarded_events_have_default_clock_snapshots=True,
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError,
+ r'beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)',
+ ):
+ msg_iter._create_discarded_events_message(
+ stream, beg_clock_snapshot=20, end_clock_snapshot=10
+ )
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+
+class CreateDiscardedPacketMessageTestCase(unittest.TestCase):
+ # Most basic case.
+ def test_create(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ supports_packets=True, supports_discarded_packets=True
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ return msg_iter._create_discarded_packets_message(stream)
+
+ msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertIs(type(msg), bt2._DiscardedPacketsMessage)
+ self.assertIs(msg.count, None)
+
+ # With packet count.
+ def test_create_with_count(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ supports_packets=True, supports_discarded_packets=True
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ return msg_iter._create_discarded_packets_message(stream, count=242)
+
+ msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertIs(type(msg), bt2._DiscardedPacketsMessage)
+ self.assertEqual(msg.count, 242)
+
+ # With packet count == 0.
+ def test_create_with_count_zero_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ supports_packets=True, supports_discarded_packets=True
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError, 'discarded packet count is 0',
+ ):
+ msg_iter._create_discarded_packets_message(stream, count=0)
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+ # With clock snapshots.
+ def test_create_with_clock_snapshots(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ default_clock_class=cc,
+ supports_packets=True,
+ supports_discarded_packets=True,
+ discarded_packets_have_default_clock_snapshots=True,
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ return msg_iter._create_discarded_packets_message(
+ stream, beg_clock_snapshot=10, end_clock_snapshot=20
+ )
+
+ msg = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertIs(type(msg), bt2._DiscardedPacketsMessage)
+ self.assertEqual(msg.beginning_default_clock_snapshot, 10)
+ self.assertEqual(msg.end_default_clock_snapshot, 20)
+
+ # Trying to create when the stream does not support discarded packets.
+ def test_create_unsupported_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(supports_packets=True,)
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError, 'stream class does not support discarded packets'
+ ):
+ msg_iter._create_discarded_packets_message(stream)
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+ # Trying to create with clock snapshots when the stream does not support
+ # them.
+ def test_create_unsupported_clock_snapshots_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ supports_packets=True, supports_discarded_packets=True
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError,
+ 'discarded packets have no default clock snapshots for this stream class',
+ ):
+ msg_iter._create_discarded_packets_message(
+ stream, beg_clock_snapshot=10, end_clock_snapshot=20
+ )
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+ # Trying to create without clock snapshots when the stream requires them.
+ def test_create_missing_clock_snapshots_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ default_clock_class=cc,
+ supports_packets=True,
+ supports_discarded_packets=True,
+ discarded_packets_have_default_clock_snapshots=True,
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError,
+ 'discarded packets have default clock snapshots for this stream class',
+ ):
+ msg_iter._create_discarded_packets_message(stream)
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+ # End clock snapshot greater than beginning clock snapshot.
+ def test_create_clock_snapshots_end_gt_begin_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ default_clock_class=cc,
+ supports_packets=True,
+ supports_discarded_packets=True,
+ discarded_packets_have_default_clock_snapshots=True,
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError,
+ r'beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)',
+ ):
+ msg_iter._create_discarded_packets_message(
+ stream, beg_clock_snapshot=20, end_clock_snapshot=10
+ )
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
+
if __name__ == '__main__':
unittest.main()