X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=tests%2Fbindings%2Fpython%2Fbt2%2Ftest_message.py;h=ed6c31296b93a3158088f89a32d75389f1824793;hb=c6af194f4d5f4478fbd14a019e41cff93f9c207e;hp=6a61189ec82b77a6fb8ee307a52b1fec74acfe0b;hpb=5602ef8155de326402dcb33f40ee4c7d5d693ca5;p=babeltrace.git diff --git a/tests/bindings/python/bt2/test_message.py b/tests/bindings/python/bt2/test_message.py index 6a61189e..ed6c3129 100644 --- a/tests/bindings/python/bt2/test_message.py +++ b/tests/bindings/python/bt2/test_message.py @@ -1,574 +1,425 @@ -from bt2 import values import collections import unittest -import copy import bt2 -class _MessageTestCase(unittest.TestCase): +class AllMessagesTestCase(unittest.TestCase): def setUp(self): - self._trace = bt2.Trace() - self._sc = bt2.StreamClass() - self._ec = bt2.EventClass('salut') - self._my_int_ft = bt2.IntegerFieldType(32) - self._ec.payload_field_type = bt2.StructureFieldType() - self._ec.payload_field_type += collections.OrderedDict([ - ('my_int', self._my_int_ft), - ]) - self._sc.add_event_class(self._ec) - self._clock_class = bt2.ClockClass('allo', 1000) - self._trace.add_clock_class(self._clock_class) - self._trace.packet_header_field_type = bt2.StructureFieldType() - self._trace.packet_header_field_type += collections.OrderedDict([ - ('hello', self._my_int_ft), - ]) - self._trace.add_stream_class(self._sc) - self._cc_prio_map = bt2.ClockClassPriorityMap() - self._cc_prio_map[self._clock_class] = 231 - self._stream = self._sc() - self._packet = self._stream.create_packet() - self._packet.header_field['hello'] = 19487 - self._event = self._ec() - self._event.clock_values.add(self._clock_class(1772)) - self._event.payload_field['my_int'] = 23 - self._event.packet = self._packet - - def tearDown(self): - del self._trace - del self._sc - del self._ec - del self._my_int_ft - del self._clock_class - del self._cc_prio_map - del self._stream - del self._packet - del self._event - - -@unittest.skip("this is broken") -class EventMessageTestCase(_MessageTestCase): - def test_create_no_cc_prio_map(self): - msg = bt2.EventMessage(self._event) - self.assertEqual(msg.event.addr, self._event.addr) - self.assertEqual(len(msg.clock_class_priority_map), 0) - - def test_create_with_cc_prio_map(self): - msg = bt2.EventMessage(self._event, self._cc_prio_map) - self.assertEqual(msg.event.addr, self._event.addr) - self.assertEqual(len(msg.clock_class_priority_map), 1) - self.assertEqual(msg.clock_class_priority_map.highest_priority_clock_class.addr, - self._clock_class.addr) - self.assertEqual(msg.clock_class_priority_map[self._clock_class], 231) - - def test_eq(self): - msg = bt2.EventMessage(self._event, self._cc_prio_map) - event_copy = copy.copy(self._event) - event_copy.packet = self._packet - cc_prio_map_copy = copy.copy(self._cc_prio_map) - msg2 = bt2.EventMessage(event_copy, cc_prio_map_copy) - self.assertEqual(msg, msg2) - - def test_ne_event(self): - msg = bt2.EventMessage(self._event, self._cc_prio_map) - event_copy = copy.copy(self._event) - event_copy.payload_field['my_int'] = 17 - event_copy.packet = self._packet - cc_prio_map_copy = copy.copy(self._cc_prio_map) - msg2 = bt2.EventMessage(event_copy, cc_prio_map_copy) - self.assertNotEqual(msg, msg2) - - def test_ne_cc_prio_map(self): - msg = bt2.EventMessage(self._event) - event_copy = copy.copy(self._event) - event_copy.packet = self._packet - cc_prio_map_copy = copy.copy(self._cc_prio_map) - msg2 = bt2.EventMessage(event_copy, cc_prio_map_copy) - self.assertNotEqual(msg, msg2) - - def test_eq_invalid(self): - msg = bt2.EventMessage(self._event) - self.assertNotEqual(msg, 23) - - def test_copy(self): - msg = bt2.EventMessage(self._event, self._cc_prio_map) - msg2 = copy.copy(msg) - self.assertEqual(msg, msg2) - - def test_deepcopy(self): - msg = bt2.EventMessage(self._event, self._cc_prio_map) - msg2 = copy.deepcopy(msg) - self.assertEqual(msg, msg2) - - -@unittest.skip("this is broken") -class PacketBeginningMessageTestCase(_MessageTestCase): - def test_create(self): - msg = bt2.PacketBeginningMessage(self._packet) - self.assertEqual(msg.packet.addr, self._packet.addr) - - def test_eq(self): - msg = bt2.PacketBeginningMessage(self._packet) - packet_copy = copy.copy(self._packet) - msg2 = bt2.PacketBeginningMessage(packet_copy) - self.assertEqual(msg, msg2) - - def test_ne_packet(self): - msg = bt2.PacketBeginningMessage(self._packet) - packet_copy = copy.copy(self._packet) - packet_copy.header_field['hello'] = 1847 - msg2 = bt2.PacketBeginningMessage(packet_copy) - self.assertNotEqual(msg, msg2) - - def test_eq_invalid(self): - msg = bt2.PacketBeginningMessage(self._packet) - self.assertNotEqual(msg, 23) - - def test_copy(self): - msg = bt2.PacketBeginningMessage(self._packet) - msg2 = copy.copy(msg) - self.assertEqual(msg, msg2) - - def test_deepcopy(self): - msg = bt2.PacketBeginningMessage(self._packet) - msg2 = copy.deepcopy(msg) - self.assertEqual(msg, msg2) - - -@unittest.skip("this is broken") -class PacketEndMessageTestCase(_MessageTestCase): - def test_create(self): - msg = bt2.PacketEndMessage(self._packet) - self.assertEqual(msg.packet.addr, self._packet.addr) - - def test_eq(self): - msg = bt2.PacketEndMessage(self._packet) - packet_copy = copy.copy(self._packet) - msg2 = bt2.PacketEndMessage(packet_copy) - self.assertEqual(msg, msg2) - - def test_ne_packet(self): - msg = bt2.PacketEndMessage(self._packet) - packet_copy = copy.copy(self._packet) - packet_copy.header_field['hello'] = 1847 - msg2 = bt2.PacketEndMessage(packet_copy) - self.assertNotEqual(msg, msg2) - - def test_eq_invalid(self): - msg = bt2.PacketEndMessage(self._packet) - self.assertNotEqual(msg, 23) - - def test_copy(self): - msg = bt2.PacketEndMessage(self._packet) - msg2 = copy.copy(msg) - self.assertEqual(msg, msg2) - - def test_deepcopy(self): - msg = bt2.PacketEndMessage(self._packet) - msg2 = copy.deepcopy(msg) - self.assertEqual(msg, msg2) - - -@unittest.skip("this is broken") -class StreamBeginningMessageTestCase(_MessageTestCase): - def test_create(self): - msg = bt2.StreamBeginningMessage(self._stream) - self.assertEqual(msg.stream.addr, self._stream.addr) - - def test_eq(self): - msg = bt2.StreamBeginningMessage(self._stream) - stream_copy = copy.copy(self._stream) - msg2 = bt2.StreamBeginningMessage(stream_copy) - self.assertEqual(msg, msg2) - - def test_ne_stream(self): - msg = bt2.StreamBeginningMessage(self._stream) - stream_copy = self._sc(name='salut') - msg2 = bt2.StreamBeginningMessage(stream_copy) - self.assertNotEqual(msg, msg2) - - def test_eq_invalid(self): - msg = bt2.StreamBeginningMessage(self._stream) - self.assertNotEqual(msg, 23) - - def test_copy(self): - msg = bt2.StreamBeginningMessage(self._stream) - msg2 = copy.copy(msg) - self.assertEqual(msg, msg2) - - def test_deepcopy(self): - msg = bt2.StreamBeginningMessage(self._stream) - msg2 = copy.deepcopy(msg) - self.assertEqual(msg, msg2) - - -@unittest.skip("this is broken") -class StreamEndMessageTestCase(_MessageTestCase): - def test_create(self): - msg = bt2.StreamEndMessage(self._stream) - self.assertEqual(msg.stream.addr, self._stream.addr) - - def test_eq(self): - msg = bt2.StreamEndMessage(self._stream) - stream_copy = copy.copy(self._stream) - msg2 = bt2.StreamEndMessage(stream_copy) - self.assertEqual(msg, msg2) - - def test_ne_stream(self): - msg = bt2.StreamEndMessage(self._stream) - stream_copy = self._sc(name='salut') - msg2 = bt2.StreamEndMessage(stream_copy) - self.assertNotEqual(msg, msg2) - - def test_eq_invalid(self): - msg = bt2.StreamEndMessage(self._stream) - self.assertNotEqual(msg, 23) - - def test_copy(self): - msg = bt2.StreamEndMessage(self._stream) - msg2 = copy.copy(msg) - self.assertEqual(msg, msg2) - - def test_deepcopy(self): - msg = bt2.StreamEndMessage(self._stream) - msg2 = copy.deepcopy(msg) - self.assertEqual(msg, msg2) - - -@unittest.skip("this is broken") -class InactivityMessageTestCase(unittest.TestCase): - def setUp(self): - self._cc1 = bt2.ClockClass('cc1', 1000) - self._cc2 = bt2.ClockClass('cc2', 2000) - self._cc_prio_map = bt2.ClockClassPriorityMap() - self._cc_prio_map[self._cc1] = 25 - self._cc_prio_map[self._cc2] = 50 - - def tearDown(self): - del self._cc1 - del self._cc2 - del self._cc_prio_map - - def test_create_no_cc_prio_map(self): - msg = bt2.InactivityMessage() - self.assertEqual(len(msg.clock_class_priority_map), 0) - - def test_create_with_cc_prio_map(self): - msg = bt2.InactivityMessage(self._cc_prio_map) - msg.clock_values.add(self._cc1(123)) - msg.clock_values.add(self._cc2(19487)) - self.assertEqual(len(msg.clock_class_priority_map), 2) - self.assertEqual(msg.clock_class_priority_map, self._cc_prio_map) - self.assertEqual(msg.clock_values[self._cc1], 123) - self.assertEqual(msg.clock_values[self._cc2], 19487) - - def test_eq(self): - msg = bt2.InactivityMessage(self._cc_prio_map) - msg.clock_values.add(self._cc1(123)) - msg.clock_values.add(self._cc2(19487)) - cc_prio_map_copy = copy.copy(self._cc_prio_map) - msg2 = bt2.InactivityMessage(cc_prio_map_copy) - msg2.clock_values.add(self._cc1(123)) - msg2.clock_values.add(self._cc2(19487)) - self.assertEqual(msg, msg2) - - def test_ne_cc_prio_map(self): - msg = bt2.InactivityMessage(self._cc_prio_map) - msg.clock_values.add(self._cc1(123)) - msg.clock_values.add(self._cc2(19487)) - cc_prio_map_copy = copy.copy(self._cc_prio_map) - cc_prio_map_copy[self._cc2] = 23 - msg2 = bt2.InactivityMessage(cc_prio_map_copy) - self.assertNotEqual(msg, msg2) - - def test_ne_clock_value(self): - msg = bt2.InactivityMessage(self._cc_prio_map) - msg.clock_values.add(self._cc1(123)) - msg.clock_values.add(self._cc2(19487)) - msg2 = bt2.InactivityMessage(self._cc_prio_map) - msg.clock_values.add(self._cc1(123)) - msg.clock_values.add(self._cc2(1847)) - self.assertNotEqual(msg, msg2) - - def test_eq_invalid(self): - msg = bt2.InactivityMessage(self._cc_prio_map) - self.assertNotEqual(msg, 23) - - def test_copy(self): - msg = bt2.InactivityMessage(self._cc_prio_map) - msg.clock_values.add(self._cc1(123)) - msg.clock_values.add(self._cc2(19487)) - msg_copy = copy.copy(msg) - self.assertEqual(msg, msg_copy) - self.assertNotEqual(msg.addr, msg_copy.addr) - self.assertEqual(msg.clock_class_priority_map.addr, - msg_copy.clock_class_priority_map.addr) - self.assertEqual(msg_copy.clock_values[self._cc1], 123) - self.assertEqual(msg_copy.clock_values[self._cc2], 19487) - - def test_deepcopy(self): - msg = bt2.InactivityMessage(self._cc_prio_map) - msg.clock_values.add(self._cc1(123)) - msg.clock_values.add(self._cc2(19487)) - msg_copy = copy.deepcopy(msg) - self.assertEqual(msg, msg_copy) - self.assertNotEqual(msg.addr, msg_copy.addr) - self.assertNotEqual(msg.clock_class_priority_map.addr, - msg_copy.clock_class_priority_map.addr) - self.assertEqual(msg.clock_class_priority_map, - msg_copy.clock_class_priority_map) - self.assertNotEqual(list(msg.clock_class_priority_map)[0].addr, - list(msg_copy.clock_class_priority_map)[0].addr) - self.assertIsNone(msg_copy.clock_values[self._cc1]) - self.assertIsNone(msg_copy.clock_values[self._cc2]) - self.assertEqual(msg_copy.clock_values[list(msg_copy.clock_class_priority_map)[0]], 123) - self.assertEqual(msg_copy.clock_values[list(msg_copy.clock_class_priority_map)[1]], 19487) - - -@unittest.skip("this is broken") -class DiscardedPacketsMessageTestCase(unittest.TestCase): - def setUp(self): - self._trace = bt2.Trace() - self._sc = bt2.StreamClass() - self._ec = bt2.EventClass('salut') - self._clock_class = bt2.ClockClass('yo', 1000) - self._uint64_int_ft = bt2.IntegerFieldType(64, mapped_clock_class=self._clock_class) - self._my_int_ft = bt2.IntegerFieldType(32) - self._ec.payload_field_type = bt2.StructureFieldType() - self._ec.payload_field_type += collections.OrderedDict([ - ('my_int', self._my_int_ft), - ]) - self._sc.add_event_class(self._ec) - self._sc.packet_context_field_type = bt2.StructureFieldType() - self._sc.packet_context_field_type += collections.OrderedDict([ - ('packet_seq_num', self._my_int_ft), - ('timestamp_begin', self._uint64_int_ft), - ('timestamp_end', self._uint64_int_ft), - ]) - self._trace.add_clock_class(self._clock_class) - self._trace.add_stream_class(self._sc) - self._stream = self._sc() - - def tearDown(self): - del self._trace - del self._sc - del self._ec - del self._clock_class - del self._uint64_int_ft - del self._my_int_ft - del self._stream - - def _create_event(self, packet): - event = self._ec() - event.payload_field['my_int'] = 23 - event.packet = packet - return event - - def _get_msg(self): + class MyIter(bt2._UserMessageIterator): - def __init__(iter_self): - packet1 = self._stream.create_packet() - packet1.context_field['packet_seq_num'] = 0 - packet1.context_field['timestamp_begin'] = 3 - packet1.context_field['timestamp_end'] = 6 - packet2 = self._stream.create_packet() - packet2.context_field['packet_seq_num'] = 5 - packet2.context_field['timestamp_begin'] = 7 - packet2.context_field['timestamp_end'] = 10 - iter_self._ev1 = self._create_event(packet1) - iter_self._ev2 = self._create_event(packet2) - iter_self._at = 0 + def __init__(self, self_port_output): + self._at = 0 def __next__(self): - if self._at == 0: - msg = bt2.EventMessage(self._ev1) - elif self._at == 1: - msg = bt2.EventMessage(self._ev2) + if test_obj._clock_class: + if self._at == 0: + msg = self._create_stream_beginning_message(test_obj._stream) + elif self._at == 1: + msg = self._create_stream_activity_beginning_message(test_obj._stream, default_clock_snapshot=self._at) + elif self._at == 2: + msg = self._create_packet_beginning_message(test_obj._packet, self._at) + elif self._at == 3: + msg = self._create_event_message(test_obj._event_class, test_obj._packet, self._at) + elif self._at == 4: + msg = self._create_message_iterator_inactivity_message(test_obj._clock_class, self._at) + elif self._at == 5: + msg = self._create_discarded_events_message(test_obj._stream, 890, self._at, self._at) + elif self._at == 6: + msg = self._create_packet_end_message(test_obj._packet, self._at) + elif self._at == 7: + msg = self._create_discarded_packets_message(test_obj._stream, 678, self._at, self._at) + elif self._at == 8: + msg = self._create_stream_activity_end_message(test_obj._stream, default_clock_snapshot=self._at) + elif self._at == 9: + msg = self._create_stream_end_message(test_obj._stream) + elif self._at >= 10: + raise bt2.Stop else: - raise bt2.Stop + if self._at == 0: + msg = self._create_stream_beginning_message(test_obj._stream) + elif self._at == 1: + msg = self._create_stream_activity_beginning_message(test_obj._stream) + elif self._at == 2: + msg = self._create_packet_beginning_message(test_obj._packet) + elif self._at == 3: + msg = self._create_event_message(test_obj._event_class, test_obj._packet) + elif self._at == 4: + msg = self._create_discarded_events_message(test_obj._stream, 890) + elif self._at == 5: + msg = self._create_packet_end_message(test_obj._packet) + elif self._at == 6: + msg = self._create_discarded_packets_message(test_obj._stream, 678) + elif self._at == 7: + msg = self._create_stream_activity_end_message(test_obj._stream) + elif self._at == 8: + msg = self._create_stream_end_message(test_obj._stream) + elif self._at >= 9: + raise bt2.Stop self._at += 1 return msg - class MySource(bt2._UserSourceComponent, - message_iterator_class=MyIter): + class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter): def __init__(self, params): self._add_output_port('out') - class MySink(bt2._UserSinkComponent): - def __init__(self, params): - self._add_input_port('in') - - def _consume(comp_self): - nonlocal the_msg - msg = next(comp_self._msg_iter) - - if type(msg) is bt2._DiscardedPacketsMessage: - the_msg = msg - raise bt2.Stop - - def _port_connected(self, port, other_port): - self._msg_iter = port.connection.create_message_iterator() - - the_msg = None - graph = bt2.Graph() - src = graph.add_component(MySource, 'src') - sink = graph.add_component(MySink, 'sink') - conn = graph.connect_ports(src.output_ports['out'], - sink.input_ports['in']) - graph.run() - return the_msg - - def test_create(self): - self.assertIsInstance(self._get_msg(), bt2._DiscardedPacketsMessage) - - def test_count(self): - self.assertEqual(self._get_msg().count, 4) - - def test_stream(self): - self.assertEqual(self._get_msg().stream.addr, self._stream.addr) - - def test_beginning_clock_value(self): - msg = self._get_msg() - beginning_clock_value = msg.beginning_clock_value - self.assertEqual(beginning_clock_value.clock_class, self._clock_class) - self.assertEqual(beginning_clock_value, 6) - - def test_end_clock_value(self): - msg = self._get_msg() - end_clock_value = msg.end_clock_value - self.assertEqual(end_clock_value.clock_class, self._clock_class) - self.assertEqual(end_clock_value, 7) - - def test_eq(self): - msg1 = self._get_msg() - msg2 = self._get_msg() - self.assertEqual(msg1, msg2) - - def test_eq_invalid(self): - msg1 = self._get_msg() - self.assertNotEqual(msg1, 23) - - -@unittest.skip("this is broken") -class DiscardedEventsMessageTestCase(unittest.TestCase): - def setUp(self): - self._trace = bt2.Trace() - self._sc = bt2.StreamClass() - self._ec = bt2.EventClass('salut') - self._clock_class = bt2.ClockClass('yo', 1000) - self._uint64_int_ft = bt2.IntegerFieldType(64, mapped_clock_class=self._clock_class) - self._my_int_ft = bt2.IntegerFieldType(32) - self._ec.payload_field_type = bt2.StructureFieldType() - self._ec.payload_field_type += collections.OrderedDict([ - ('my_int', self._my_int_ft), - ]) - self._sc.add_event_class(self._ec) - self._sc.packet_context_field_type = bt2.StructureFieldType() - self._sc.packet_context_field_type += collections.OrderedDict([ - ('events_discarded', self._my_int_ft), - ('timestamp_begin', self._uint64_int_ft), - ('timestamp_end', self._uint64_int_ft), - ]) - self._trace.add_clock_class(self._clock_class) - self._trace.add_stream_class(self._sc) - self._stream = self._sc() - - def tearDown(self): - del self._trace - del self._sc - del self._ec - del self._clock_class - del self._uint64_int_ft - del self._my_int_ft - del self._stream - - def _create_event(self, packet): - event = self._ec() - event.payload_field['my_int'] = 23 - event.packet = packet - return event - - def _get_msg(self): + with_cc = bool(params['with_cc']) + tc = self._create_trace_class() + if with_cc: + cc = self._create_clock_class() + else: + cc = None + + sc = tc.create_stream_class(default_clock_class=cc, + packets_have_beginning_default_clock_snapshot=with_cc, + packets_have_end_default_clock_snapshot=with_cc, + supports_discarded_events=True, + discarded_events_have_default_clock_snapshots=with_cc, + supports_discarded_packets=True, + discarded_packets_have_default_clock_snapshots=with_cc) + + # Create payload field class + my_int_fc = tc.create_signed_integer_field_class(32) + payload_fc = tc.create_structure_field_class() + payload_fc += collections.OrderedDict([ + ('my_int', my_int_fc), + ]) + + ec = sc.create_event_class(name='salut', payload_field_class=payload_fc) + + trace = tc() + stream = trace.create_stream(sc) + packet = stream.create_packet() + + test_obj._trace = trace + test_obj._stream = stream + test_obj._packet = packet + test_obj._event_class = ec + test_obj._clock_class = cc + + test_obj = self + self._graph = bt2.Graph() + self._src = MySrc + self._iter = MyIter + + def test_all_msg_with_cc(self): + params = {'with_cc': True} + self._src_comp = self._graph.add_component(self._src, 'my_source', params) + self._msg_iter = self._graph.create_output_port_message_iterator(self._src_comp.output_ports['out']) + + for i, msg in enumerate(self._msg_iter): + if i == 0: + self.assertIsInstance(msg, bt2.message._StreamBeginningMessage) + self.assertEqual(msg.stream.addr, self._stream.addr) + elif i == 1: + self.assertIsInstance(msg, bt2.message._StreamActivityBeginningMessage) + self.assertEqual(msg.default_clock_snapshot.value, i) + elif i == 2: + self.assertIsInstance(msg, bt2.message._PacketBeginningMessage) + self.assertEqual(msg.packet.addr, self._packet.addr) + self.assertEqual(msg.default_clock_snapshot.value, i) + elif i == 3: + self.assertIsInstance(msg, bt2.message._EventMessage) + self.assertEqual(msg.event.cls.addr, self._event_class.addr) + self.assertEqual(msg.default_clock_snapshot.value, i) + elif i == 4: + self.assertIsInstance(msg, bt2.message._MessageIteratorInactivityMessage) + self.assertEqual(msg.default_clock_snapshot.value, i) + elif i == 5: + self.assertIsInstance(msg, bt2.message._DiscardedEventsMessage) + self.assertEqual(msg.stream.addr, self._stream.addr) + self.assertEqual(msg.count, 890) + self.assertEqual(msg.stream.cls.default_clock_class.addr, self._clock_class.addr) + self.assertEqual(msg.beginning_default_clock_snapshot.value, i) + self.assertEqual(msg.end_default_clock_snapshot.value, i) + elif i == 6: + self.assertIsInstance(msg, bt2.message._PacketEndMessage) + self.assertEqual(msg.packet.addr, self._packet.addr) + self.assertEqual(msg.default_clock_snapshot.value, i) + elif i == 7: + self.assertIsInstance(msg, bt2.message._DiscardedPacketsMessage) + self.assertEqual(msg.stream.addr, self._stream.addr) + self.assertEqual(msg.count, 678) + self.assertEqual(msg.stream.cls.default_clock_class.addr, self._clock_class.addr) + self.assertEqual(msg.beginning_default_clock_snapshot.value, i) + self.assertEqual(msg.end_default_clock_snapshot.value, i) + elif i == 8: + self.assertIsInstance(msg, bt2.message._StreamActivityEndMessage) + self.assertEqual(msg.stream.addr, self._stream.addr) + self.assertEqual(msg.default_clock_snapshot.value, i) + elif i == 9: + self.assertIsInstance(msg, bt2.message._StreamEndMessage) + self.assertEqual(msg.stream.addr, self._stream.addr) + else: + raise Exception + + def test_all_msg_without_cc(self): + params = {'with_cc': False} + self._src_comp = self._graph.add_component(self._src, 'my_source', params) + self._msg_iter = self._graph.create_output_port_message_iterator(self._src_comp.output_ports['out']) + + for i, msg in enumerate(self._msg_iter): + if i == 0: + self.assertIsInstance(msg, bt2.message._StreamBeginningMessage) + self.assertEqual(msg.stream.addr, self._stream.addr) + elif i == 1: + self.assertIsInstance(msg, bt2.message._StreamActivityBeginningMessage) + self.assertIsInstance(msg.default_clock_snapshot, + bt2._UnknownClockSnapshot) + elif i == 2: + self.assertIsInstance(msg, bt2.message._PacketBeginningMessage) + self.assertEqual(msg.packet.addr, self._packet.addr) + elif i == 3: + self.assertIsInstance(msg, bt2.message._EventMessage) + self.assertEqual(msg.event.cls.addr, self._event_class.addr) + with self.assertRaises(bt2.NonexistentClockSnapshot): + msg.default_clock_snapshot + elif i == 4: + self.assertIsInstance(msg, bt2.message._DiscardedEventsMessage) + self.assertEqual(msg.stream.addr, self._stream.addr) + self.assertEqual(msg.count, 890) + self.assertIsNone(msg.stream.cls.default_clock_class) + with self.assertRaises(bt2.NonexistentClockSnapshot): + msg.beginning_default_clock_snapshot + with self.assertRaises(bt2.NonexistentClockSnapshot): + msg.end_default_clock_snapshot + elif i == 5: + self.assertIsInstance(msg, bt2.message._PacketEndMessage) + self.assertEqual(msg.packet.addr, self._packet.addr) + elif i == 6: + self.assertIsInstance(msg, bt2.message._DiscardedPacketsMessage) + self.assertEqual(msg.stream.addr, self._stream.addr) + self.assertEqual(msg.count, 678) + self.assertIsNone(msg.stream.cls.default_clock_class) + with self.assertRaises(bt2.NonexistentClockSnapshot): + msg.beginning_default_clock_snapshot + with self.assertRaises(bt2.NonexistentClockSnapshot): + msg.end_default_clock_snapshot + elif i == 7: + self.assertIsInstance(msg, bt2.message._StreamActivityEndMessage) + self.assertEqual(msg.stream.addr, self._stream.addr) + self.assertIsInstance(msg.default_clock_snapshot, + bt2._UnknownClockSnapshot) + elif i == 8: + self.assertIsInstance(msg, bt2.message._StreamEndMessage) + self.assertEqual(msg.stream.addr, self._stream.addr) + else: + raise Exception + + +class StreamActivityMessagesTestCase(unittest.TestCase): + def _test_create_msg(self, with_cc, test_create_beginning_func, test_create_end_func): class MyIter(bt2._UserMessageIterator): - def __init__(iter_self): - packet1 = self._stream.create_packet() - packet1.context_field['events_discarded'] = 0 - packet1.context_field['timestamp_begin'] = 3 - packet1.context_field['timestamp_end'] = 6 - packet2 = self._stream.create_packet() - packet2.context_field['events_discarded'] = 10 - packet2.context_field['timestamp_begin'] = 7 - packet2.context_field['timestamp_end'] = 10 - iter_self._ev1 = self._create_event(packet1) - iter_self._ev2 = self._create_event(packet2) - iter_self._at = 0 + def __init__(self, self_port_output): + self._at = 0 def __next__(self): if self._at == 0: - msg = bt2.EventMessage(self._ev1) + msg = self._create_stream_beginning_message(self._component._stream) elif self._at == 1: - msg = bt2.EventMessage(self._ev2) - else: + msg = test_create_beginning_func(self, self._component._stream) + elif self._at == 2: + msg = test_create_end_func(self, self._component._stream) + elif self._at == 3: + msg = self._create_stream_end_message(self._component._stream) + elif self._at >= 4: raise bt2.Stop self._at += 1 return msg - class MySource(bt2._UserSourceComponent, - message_iterator_class=MyIter): + class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter): def __init__(self, params): self._add_output_port('out') + tc = self._create_trace_class() - class MySink(bt2._UserSinkComponent): - def __init__(self, params): - self._add_input_port('in') + if with_cc: + cc = self._create_clock_class() + sc = tc.create_stream_class(default_clock_class=cc) + else: + sc = tc.create_stream_class() - def _consume(comp_self): - nonlocal the_msg - msg = next(comp_self._msg_iter) + # Create payload field class + trace = tc() + self._stream = trace.create_stream(sc) - if type(msg) is bt2._DiscardedEventsMessage: - the_msg = msg - raise bt2.Stop + graph = bt2.Graph() + src_comp = graph.add_component(MySrc, 'src') + msg_iter = graph.create_output_port_message_iterator(src_comp.output_ports['out']) - def _port_connected(self, port, other_port): - self._msg_iter = port.connection.create_message_iterator() + for msg in msg_iter: + pass - the_msg = None - graph = bt2.Graph() - src = graph.add_component(MySource, 'src') - sink = graph.add_component(MySink, 'sink') - conn = graph.connect_ports(src.output_ports['out'], - sink.input_ports['in']) - graph.run() - return the_msg - - def test_create(self): - self.assertIsInstance(self._get_msg(), bt2._DiscardedEventsMessage) - - def test_count(self): - self.assertEqual(self._get_msg().count, 10) - - def test_stream(self): - self.assertEqual(self._get_msg().stream.addr, self._stream.addr) - - def test_beginning_clock_value(self): - msg = self._get_msg() - beginning_clock_value = msg.beginning_clock_value - self.assertEqual(beginning_clock_value.clock_class, self._clock_class) - self.assertEqual(beginning_clock_value, 6) - - def test_end_clock_value(self): - msg = self._get_msg() - end_clock_value = msg.end_clock_value - self.assertEqual(end_clock_value.clock_class, self._clock_class) - self.assertEqual(end_clock_value, 10) - - def test_eq(self): - msg1 = self._get_msg() - msg2 = self._get_msg() - self.assertEqual(msg1, msg2) - - def test_eq_invalid(self): - msg1 = self._get_msg() - self.assertNotEqual(msg1, 23) + def test_create_beginning_with_cc_with_known_default_cs(self): + def create_beginning(msg_iter, stream): + msg = msg_iter._create_stream_activity_beginning_message(stream, 172) + self.assertEqual(msg.default_clock_snapshot.value, 172) + return msg + + def create_end(msg_iter, stream): + return msg_iter._create_stream_activity_end_message(stream, 199) + + self._test_create_msg(True, create_beginning, create_end) + + def test_create_end_with_cc_with_known_default_cs(self): + def create_beginning(msg_iter, stream): + return msg_iter._create_stream_activity_beginning_message(stream, 172) + + def create_end(msg_iter, stream): + msg = msg_iter._create_stream_activity_end_message(stream, 199) + self.assertEqual(msg.default_clock_snapshot.value, 199) + return msg + + self._test_create_msg(True, create_beginning, create_end) + + def test_create_beginning_with_cc_with_unknown_default_cs(self): + def create_beginning(msg_iter, stream): + msg = msg_iter._create_stream_activity_beginning_message(stream, + msg_iter._unknown_clock_snapshot) + self.assertIsInstance(msg.default_clock_snapshot, + bt2._UnknownClockSnapshot) + return msg + + def create_end(msg_iter, stream): + return msg_iter._create_stream_activity_end_message(stream, 199) + + self._test_create_msg(True, create_beginning, create_end) + + def test_create_end_with_cc_with_unknown_default_cs(self): + def create_beginning(msg_iter, stream): + return msg_iter._create_stream_activity_beginning_message(stream, 172) + + def create_end(msg_iter, stream): + msg = msg_iter._create_stream_activity_end_message(stream, + msg_iter._unknown_clock_snapshot) + self.assertIsInstance(msg.default_clock_snapshot, + bt2._UnknownClockSnapshot) + return msg + + self._test_create_msg(True, create_beginning, create_end) + + def test_create_beginning_with_cc_with_infinite_default_cs(self): + def create_beginning(msg_iter, stream): + msg = msg_iter._create_stream_activity_beginning_message(stream, + msg_iter._infinite_clock_snapshot) + self.assertIsInstance(msg.default_clock_snapshot, + bt2._InfiniteClockSnapshot) + return msg + + def create_end(msg_iter, stream): + return msg_iter._create_stream_activity_end_message(stream, 199) + + self._test_create_msg(True, create_beginning, create_end) + + def test_create_end_with_cc_with_infinite_default_cs(self): + def create_beginning(msg_iter, stream): + return msg_iter._create_stream_activity_beginning_message(stream, 172) + + def create_end(msg_iter, stream): + msg = msg_iter._create_stream_activity_end_message(stream, + msg_iter._infinite_clock_snapshot) + self.assertIsInstance(msg.default_clock_snapshot, + bt2._InfiniteClockSnapshot) + return msg + + self._test_create_msg(True, create_beginning, create_end) + + def test_create_beginning_without_cc_with_known_default_cs(self): + def create_beginning(msg_iter, stream): + with self.assertRaises(ValueError): + msg_iter._create_stream_activity_beginning_message(stream, 172) + + return msg_iter._create_stream_activity_beginning_message(stream) + + def create_end(msg_iter, stream): + return msg_iter._create_stream_activity_end_message(stream) + + self._test_create_msg(False, create_beginning, create_end) + + def test_create_end_without_cc_with_known_default_cs(self): + def create_beginning(msg_iter, stream): + return msg_iter._create_stream_activity_beginning_message(stream) + + def create_end(msg_iter, stream): + with self.assertRaises(ValueError): + msg_iter._create_stream_activity_end_message(stream, 199) + + return msg_iter._create_stream_activity_end_message(stream) + + self._test_create_msg(False, create_beginning, create_end) + + def test_create_beginning_without_cc_with_unknown_default_cs(self): + def create_beginning(msg_iter, stream): + msg = msg_iter._create_stream_activity_beginning_message(stream, + msg_iter._unknown_clock_snapshot) + self.assertIsInstance(msg.default_clock_snapshot, + bt2._UnknownClockSnapshot) + return msg + + def create_end(msg_iter, stream): + return msg_iter._create_stream_activity_end_message(stream) + + self._test_create_msg(False, create_beginning, create_end) + + def test_create_end_without_cc_with_unknown_default_cs(self): + def create_beginning(msg_iter, stream): + return msg_iter._create_stream_activity_beginning_message(stream) + + def create_end(msg_iter, stream): + msg = msg_iter._create_stream_activity_end_message(stream, + msg_iter._unknown_clock_snapshot) + self.assertIsInstance(msg.default_clock_snapshot, + bt2._UnknownClockSnapshot) + return msg + + self._test_create_msg(False, create_beginning, create_end) + + def test_create_beginning_without_cc_with_infinite_default_cs(self): + def create_beginning(msg_iter, stream): + msg = msg_iter._create_stream_activity_beginning_message(stream, + msg_iter._infinite_clock_snapshot) + self.assertIsInstance(msg.default_clock_snapshot, + bt2._InfiniteClockSnapshot) + return msg + + def create_end(msg_iter, stream): + return msg_iter._create_stream_activity_end_message(stream) + + self._test_create_msg(False, create_beginning, create_end) + + def test_create_end_without_cc_with_infinite_default_cs(self): + def create_beginning(msg_iter, stream): + return msg_iter._create_stream_activity_beginning_message(stream) + + def create_end(msg_iter, stream): + msg = msg_iter._create_stream_activity_end_message(stream, + msg_iter._infinite_clock_snapshot) + self.assertIsInstance(msg.default_clock_snapshot, + bt2._InfiniteClockSnapshot) + return msg + + self._test_create_msg(False, create_beginning, create_end) + + def test_create_beginning_default_cs_wrong_type(self): + def create_beginning(msg_iter, stream): + with self.assertRaises(TypeError): + msg_iter._create_stream_activity_beginning_message(stream, 'infinite') + + return msg_iter._create_stream_activity_beginning_message(stream) + + def create_end(msg_iter, stream): + return msg_iter._create_stream_activity_end_message(stream) + + self._test_create_msg(False, create_beginning, create_end) + + def test_create_end_without_default_cs_wrong_type(self): + def create_beginning(msg_iter, stream): + return msg_iter._create_stream_activity_beginning_message(stream) + + def create_end(msg_iter, stream): + with self.assertRaises(TypeError): + msg_iter._create_stream_activity_end_message(stream, 'unknown') + + return msg_iter._create_stream_activity_end_message(stream) + + self._test_create_msg(False, create_beginning, create_end)