def __init__(self, config, self_port_output):
self._at = 0
self._with_stream_msgs_clock_snapshots = self_port_output.user_data.get(
- 'with_stream_msgs_clock_snapshots', False
+ "with_stream_msgs_clock_snapshots", False
)
def __next__(self):
class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
def __init__(self, config, params, obj):
- self._add_output_port('out', params)
+ self._add_output_port("out", params)
- with_cc = bool(params['with_cc'])
+ with_cc = bool(params["with_cc"])
tc = self._create_trace_class()
if with_cc:
cc = self._create_clock_class()
# Create payload field class
my_int_fc = tc.create_signed_integer_field_class(32)
payload_fc = tc.create_structure_field_class()
- payload_fc += [('my_int', my_int_fc)]
+ payload_fc += [("my_int", my_int_fc)]
# Create specific context field class
my_int_fc = tc.create_signed_integer_field_class(32)
specific_fc = tc.create_structure_field_class()
- specific_fc += [('my_int', my_int_fc)]
+ specific_fc += [("my_int", my_int_fc)]
ec = sc.create_event_class(
- name='salut',
+ name="salut",
payload_field_class=payload_fc,
specific_context_field_class=specific_fc,
)
self._iter = MyIter
def test_all_msg_with_cc(self):
- params = {'with_cc': True}
- self._src_comp = self._graph.add_component(self._src, 'my_source', params)
+ params = {"with_cc": True}
+ self._src_comp = self._graph.add_component(self._src, "my_source", params)
self._msg_iter = TestOutputPortMessageIterator(
- self._graph, self._src_comp.output_ports['out']
+ self._graph, self._src_comp.output_ports["out"]
)
for i, msg in enumerate(self._msg_iter):
type(msg.event.payload_field), bt2_field._StructureFieldConst
)
self.assertIs(
- type(msg.event.payload_field['my_int']),
+ type(msg.event.payload_field["my_int"]),
bt2_field._SignedIntegerFieldConst,
)
raise Exception
def test_all_msg_without_cc(self):
- params = {'with_cc': False}
- self._src_comp = self._graph.add_component(self._src, 'my_source', params)
+ params = {"with_cc": False}
+ self._src_comp = self._graph.add_component(self._src, "my_source", params)
self._msg_iter = TestOutputPortMessageIterator(
- self._graph, self._src_comp.output_ports['out']
+ self._graph, self._src_comp.output_ports["out"]
)
for i, msg in enumerate(self._msg_iter):
self.assertIs(type(msg.stream), bt2_stream._StreamConst)
self.assertEqual(msg.stream.addr, self._stream.addr)
with self.assertRaisesRegex(
- ValueError, 'stream class has no default clock class'
+ ValueError, "stream class has no default clock class"
):
msg.default_clock_snapshot
elif i == 1:
self.assertIs(type(msg.event.cls), bt2_event_class._EventClassConst)
self.assertEqual(msg.event.cls.addr, self._event_class.addr)
with self.assertRaisesRegex(
- ValueError, 'stream class has no default clock class'
+ ValueError, "stream class has no default clock class"
):
msg.default_clock_snapshot
elif i == 3:
self.assertIsNone(msg.stream.cls.default_clock_class)
with self.assertRaisesRegex(
ValueError,
- 'such a message has no clock snapshots for this stream class',
+ "such a message has no clock snapshots for this stream class",
):
msg.beginning_default_clock_snapshot
with self.assertRaisesRegex(
ValueError,
- 'such a message has no clock snapshots for this stream class',
+ "such a message has no clock snapshots for this stream class",
):
msg.end_default_clock_snapshot
elif i == 4:
self.assertIsNone(msg.stream.cls.default_clock_class)
with self.assertRaisesRegex(
ValueError,
- 'such a message has no clock snapshots for this stream class',
+ "such a message has no clock snapshots for this stream class",
):
msg.beginning_default_clock_snapshot
with self.assertRaisesRegex(
ValueError,
- 'such a message has no clock snapshots for this stream class',
+ "such a message has no clock snapshots for this stream class",
):
msg.end_default_clock_snapshot
elif i == 6:
self.assertIs(type(msg.stream), bt2_stream._StreamConst)
self.assertEqual(msg.stream.addr, self._stream.addr)
with self.assertRaisesRegex(
- ValueError, 'stream class has no default clock class'
+ ValueError, "stream class has no default clock class"
):
msg.default_clock_snapshot
else:
raise Exception
def test_msg_stream_with_clock_snapshots(self):
- params = {'with_cc': True, 'with_stream_msgs_clock_snapshots': True}
+ params = {"with_cc": True, "with_stream_msgs_clock_snapshots": True}
- self._src_comp = self._graph.add_component(self._src, 'my_source', params)
+ self._src_comp = self._graph.add_component(self._src, "my_source", params)
self._msg_iter = TestOutputPortMessageIterator(
- self._graph, self._src_comp.output_ports['out']
+ self._graph, self._src_comp.output_ports["out"]
)
msgs = list(self._msg_iter)
def msg_iter_next(msg_iter, stream):
with self.assertRaisesRegex(
ValueError,
- 'discarded event count is 0',
+ "discarded event count is 0",
):
msg_iter._create_discarded_events_message(stream, count=0)
def msg_iter_next(msg_iter, stream):
with self.assertRaisesRegex(
- ValueError, 'stream class does not support discarded events'
+ ValueError, "stream class does not support discarded events"
):
msg_iter._create_discarded_events_message(stream)
def msg_iter_next(msg_iter, stream):
with self.assertRaisesRegex(
ValueError,
- 'discarded events have no default clock snapshots for this stream class',
+ "discarded events have no default clock snapshots for this stream class",
):
msg_iter._create_discarded_events_message(
stream, beg_clock_snapshot=10, end_clock_snapshot=20
def msg_iter_next(msg_iter, stream):
with self.assertRaisesRegex(
ValueError,
- 'discarded events have default clock snapshots for this stream class',
+ "discarded events have default clock snapshots for this stream class",
):
msg_iter._create_discarded_events_message(stream)
def msg_iter_next(msg_iter, stream):
with self.assertRaisesRegex(
ValueError,
- r'beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)',
+ r"beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)",
):
msg_iter._create_discarded_events_message(
stream, beg_clock_snapshot=20, end_clock_snapshot=10
def msg_iter_next(msg_iter, stream):
with self.assertRaisesRegex(
ValueError,
- 'discarded packet count is 0',
+ "discarded packet count is 0",
):
msg_iter._create_discarded_packets_message(stream, count=0)
def msg_iter_next(msg_iter, stream):
with self.assertRaisesRegex(
- ValueError, 'stream class does not support discarded packets'
+ ValueError, "stream class does not support discarded packets"
):
msg_iter._create_discarded_packets_message(stream)
def msg_iter_next(msg_iter, stream):
with self.assertRaisesRegex(
ValueError,
- 'discarded packets have no default clock snapshots for this stream class',
+ "discarded packets have no default clock snapshots for this stream class",
):
msg_iter._create_discarded_packets_message(
stream, beg_clock_snapshot=10, end_clock_snapshot=20
def msg_iter_next(msg_iter, stream):
with self.assertRaisesRegex(
ValueError,
- 'discarded packets have default clock snapshots for this stream class',
+ "discarded packets have default clock snapshots for this stream class",
):
msg_iter._create_discarded_packets_message(stream)
def msg_iter_next(msg_iter, stream):
with self.assertRaisesRegex(
ValueError,
- r'beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)',
+ r"beginning default clock snapshot value \(20\) is greater than end default clock snapshot value \(10\)",
):
msg_iter._create_discarded_packets_message(
stream, beg_clock_snapshot=20, end_clock_snapshot=10
self.assertEqual(res, 123)
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main()