X-Git-Url: http://git.efficios.com/?p=babeltrace.git;a=blobdiff_plain;f=tests%2Fbindings%2Fpython%2Fbt2%2Ftest_message.py;h=5b513361ded29b500cf1591e625380e1aa536170;hp=a432ec71db6de421ef5931c9e9015e722cd96ada;hb=cfbd7cf3bde05e8a6606478889dcd663604ef7b5;hpb=e3f7fd922b3dc8fd92ad9397f4f9e175ff843b2a diff --git a/tests/bindings/python/bt2/test_message.py b/tests/bindings/python/bt2/test_message.py index a432ec71..5b513361 100644 --- a/tests/bindings/python/bt2/test_message.py +++ b/tests/bindings/python/bt2/test_message.py @@ -23,34 +23,53 @@ import bt2 class AllMessagesTestCase(unittest.TestCase): def setUp(self): - class MyIter(bt2._UserMessageIterator): def __init__(self, self_port_output): self._at = 0 - self._with_stream_msgs_clock_snapshots = self_port_output.user_data.get('with_stream_msgs_clock_snapshots', False) + self._with_stream_msgs_clock_snapshots = self_port_output.user_data.get( + 'with_stream_msgs_clock_snapshots', False + ) def __next__(self): if test_obj._clock_class: if self._at == 0: if self._with_stream_msgs_clock_snapshots: - msg = self._create_stream_beginning_message(test_obj._stream, default_clock_snapshot=self._at) + msg = self._create_stream_beginning_message( + test_obj._stream, default_clock_snapshot=self._at + ) else: - msg = self._create_stream_beginning_message(test_obj._stream) + msg = self._create_stream_beginning_message( + test_obj._stream + ) elif self._at == 1: - msg = self._create_packet_beginning_message(test_obj._packet, self._at) + msg = self._create_packet_beginning_message( + test_obj._packet, self._at + ) elif self._at == 2: - msg = self._create_event_message(test_obj._event_class, test_obj._packet, self._at) + msg = self._create_event_message( + test_obj._event_class, test_obj._packet, self._at + ) elif self._at == 3: - msg = self._create_message_iterator_inactivity_message(test_obj._clock_class, self._at) + msg = self._create_message_iterator_inactivity_message( + test_obj._clock_class, self._at + ) elif self._at == 4: - msg = self._create_discarded_events_message(test_obj._stream, 890, self._at, self._at) + msg = self._create_discarded_events_message( + test_obj._stream, 890, self._at, self._at + ) elif self._at == 5: - msg = self._create_packet_end_message(test_obj._packet, self._at) + msg = self._create_packet_end_message( + test_obj._packet, self._at + ) elif self._at == 6: - msg = self._create_discarded_packets_message(test_obj._stream, 678, self._at, self._at) + msg = self._create_discarded_packets_message( + test_obj._stream, 678, self._at, self._at + ) elif self._at == 7: if self._with_stream_msgs_clock_snapshots: - msg = self._create_stream_end_message(test_obj._stream, default_clock_snapshot=self._at) + msg = self._create_stream_end_message( + test_obj._stream, default_clock_snapshot=self._at + ) else: msg = self._create_stream_end_message(test_obj._stream) elif self._at >= 8: @@ -61,13 +80,19 @@ class AllMessagesTestCase(unittest.TestCase): elif self._at == 1: msg = self._create_packet_beginning_message(test_obj._packet) elif self._at == 2: - msg = self._create_event_message(test_obj._event_class, test_obj._packet) + msg = self._create_event_message( + test_obj._event_class, test_obj._packet + ) elif self._at == 3: - msg = self._create_discarded_events_message(test_obj._stream, 890) + msg = self._create_discarded_events_message( + test_obj._stream, 890 + ) elif self._at == 4: msg = self._create_packet_end_message(test_obj._packet) elif self._at == 5: - msg = self._create_discarded_packets_message(test_obj._stream, 678) + msg = self._create_discarded_packets_message( + test_obj._stream, 678 + ) elif self._at == 6: msg = self._create_stream_end_message(test_obj._stream) elif self._at >= 7: @@ -87,21 +112,21 @@ class AllMessagesTestCase(unittest.TestCase): else: cc = None - sc = tc.create_stream_class(default_clock_class=cc, - supports_packets=True, - packets_have_beginning_default_clock_snapshot=with_cc, - packets_have_end_default_clock_snapshot=with_cc, - supports_discarded_events=True, - discarded_events_have_default_clock_snapshots=with_cc, - supports_discarded_packets=True, - discarded_packets_have_default_clock_snapshots=with_cc) + sc = tc.create_stream_class( + default_clock_class=cc, + supports_packets=True, + packets_have_beginning_default_clock_snapshot=with_cc, + packets_have_end_default_clock_snapshot=with_cc, + supports_discarded_events=True, + discarded_events_have_default_clock_snapshots=with_cc, + supports_discarded_packets=True, + discarded_packets_have_default_clock_snapshots=with_cc, + ) # Create payload field class my_int_fc = tc.create_signed_integer_field_class(32) payload_fc = tc.create_structure_field_class() - payload_fc += [ - ('my_int', my_int_fc), - ] + payload_fc += [('my_int', my_int_fc)] ec = sc.create_event_class(name='salut', payload_field_class=payload_fc) @@ -123,13 +148,17 @@ class AllMessagesTestCase(unittest.TestCase): def test_all_msg_with_cc(self): params = {'with_cc': True} self._src_comp = self._graph.add_component(self._src, 'my_source', params) - self._msg_iter = self._graph.create_output_port_message_iterator(self._src_comp.output_ports['out']) + self._msg_iter = self._graph.create_output_port_message_iterator( + self._src_comp.output_ports['out'] + ) for i, msg in enumerate(self._msg_iter): if i == 0: self.assertIsInstance(msg, bt2.message._StreamBeginningMessage) self.assertEqual(msg.stream.addr, self._stream.addr) - self.assertIsInstance(msg.default_clock_snapshot, bt2.clock_snapshot._UnknownClockSnapshot) + self.assertIsInstance( + msg.default_clock_snapshot, bt2.clock_snapshot._UnknownClockSnapshot + ) elif i == 1: self.assertIsInstance(msg, bt2.message._PacketBeginningMessage) self.assertEqual(msg.packet.addr, self._packet.addr) @@ -139,13 +168,17 @@ class AllMessagesTestCase(unittest.TestCase): self.assertEqual(msg.event.cls.addr, self._event_class.addr) self.assertEqual(msg.default_clock_snapshot.value, i) elif i == 3: - self.assertIsInstance(msg, bt2.message._MessageIteratorInactivityMessage) + self.assertIsInstance( + msg, bt2.message._MessageIteratorInactivityMessage + ) self.assertEqual(msg.default_clock_snapshot.value, i) elif i == 4: self.assertIsInstance(msg, bt2.message._DiscardedEventsMessage) self.assertEqual(msg.stream.addr, self._stream.addr) self.assertEqual(msg.count, 890) - self.assertEqual(msg.stream.cls.default_clock_class.addr, self._clock_class.addr) + self.assertEqual( + msg.stream.cls.default_clock_class.addr, self._clock_class.addr + ) self.assertEqual(msg.beginning_default_clock_snapshot.value, i) self.assertEqual(msg.end_default_clock_snapshot.value, i) elif i == 5: @@ -156,20 +189,26 @@ class AllMessagesTestCase(unittest.TestCase): self.assertIsInstance(msg, bt2.message._DiscardedPacketsMessage) self.assertEqual(msg.stream.addr, self._stream.addr) self.assertEqual(msg.count, 678) - self.assertEqual(msg.stream.cls.default_clock_class.addr, self._clock_class.addr) + self.assertEqual( + msg.stream.cls.default_clock_class.addr, self._clock_class.addr + ) self.assertEqual(msg.beginning_default_clock_snapshot.value, i) self.assertEqual(msg.end_default_clock_snapshot.value, i) elif i == 7: self.assertIsInstance(msg, bt2.message._StreamEndMessage) self.assertEqual(msg.stream.addr, self._stream.addr) - self.assertIsInstance(msg.default_clock_snapshot, bt2.clock_snapshot._UnknownClockSnapshot) + self.assertIsInstance( + msg.default_clock_snapshot, bt2.clock_snapshot._UnknownClockSnapshot + ) else: raise Exception def test_all_msg_without_cc(self): params = {'with_cc': False} self._src_comp = self._graph.add_component(self._src, 'my_source', params) - self._msg_iter = self._graph.create_output_port_message_iterator(self._src_comp.output_ports['out']) + self._msg_iter = self._graph.create_output_port_message_iterator( + self._src_comp.output_ports['out'] + ) for i, msg in enumerate(self._msg_iter): if i == 0: @@ -215,13 +254,12 @@ class AllMessagesTestCase(unittest.TestCase): raise Exception def test_msg_stream_with_clock_snapshots(self): - params = { - 'with_cc': True, - 'with_stream_msgs_clock_snapshots': True, - } + params = {'with_cc': True, 'with_stream_msgs_clock_snapshots': True} self._src_comp = self._graph.add_component(self._src, 'my_source', params) - self._msg_iter = self._graph.create_output_port_message_iterator(self._src_comp.output_ports['out']) + self._msg_iter = self._graph.create_output_port_message_iterator( + self._src_comp.output_ports['out'] + ) msgs = list(self._msg_iter) msg_stream_beg = msgs[0]